Flink’s restart strategies are designed to automatically recover your jobs when they fail, but the default behavior might not be what you expect when you’re trying to get your pipelines back online quickly.
Let’s see a Flink job in action, specifically how it handles a transient error. Imagine a Kafka consumer that suddenly loses its connection to a broker for a few seconds.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
public class KafkaRecoveryJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092"); // Your Kafka broker
properties.setProperty("group.id", "flink-recovery-group");
properties.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"input-topic", // Your input topic
new SimpleStringSchema(),
properties);
env.addSource(kafkaConsumer)
.print(); // Simple print to stdout
env.execute("Kafka Recovery Example");
}
}
If localhost:9092 becomes unavailable for a moment, Flink will attempt to restart the job according to its configured strategy. Without explicit configuration, Flink uses the fixed-delay restart strategy with a fixed delay of 10 seconds and an infinite number of retries. This means if your Kafka broker comes back online within 10 seconds, the job will likely resume seamlessly.
The core problem Flink’s restart strategies solve is state management during failures. When a job fails, Flink needs to know how to resume processing from a consistent state. Restart strategies dictate how and when Flink attempts this resumption.
Internally, Flink checkpoints its state periodically. When a job fails, it rolls back to the last successful checkpoint and restarts the execution graph from that point. The restart strategy governs the triggering of this rollback and the subsequent restart attempt.
You control Flink’s restart behavior primarily through configuration, either in flink-conf.yaml or programmatically within your job. The key parameters are:
restart-strategy: The type of strategy (e.g.,fixed-delay,exponential-backoff,failure-rate).restart-strategy.fixed-delay.attempts: The maximum number of restart attempts.restart-strategy.fixed-delay.delay: The time to wait between restart attempts, in milliseconds.restart-strategy.failure-rate.max-failures-per-interval: The maximum number of failures allowed within a specified interval.restart-strategy.failure-rate.failure-interval: The interval in milliseconds for counting failures.restart-strategy.failure-rate.delay: The time to wait before the first restart attempt after the failure rate is exceeded.
Let’s say you want a more aggressive recovery for a short-lived transient issue, but you don’t want Flink to retry indefinitely if the problem persists. You can configure a fixed-delay strategy with a limited number of attempts:
In flink-conf.yaml:
rest-cluster-connection-retries: 10
rest-cluster-connection-delay: 10000
rest-cluster-connection-timeout: 60000
# Fixed delay restart strategy
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 5
restart-strategy.fixed-delay.delay: 10000 # 10 seconds
This configuration tells Flink to try restarting the job up to 5 times, waiting 10 seconds between each attempt. If the job fails a 6th time, Flink will stop the job permanently.
Alternatively, for a strategy that backs off if failures are frequent, you might use exponential-backoff:
In flink-conf.yaml:
restart-strategy: exponential-backoff
restart-strategy.exponential-backoff.initial-delay: 10000 # 10 seconds
restart-strategy.exponential-backoff.reset-on-success: true
restart-strategy.exponential-backoff.max-delay: 60000 # 1 minute
Here, Flink starts with a 10-second delay. If a restart fails, the delay doubles (up to a maximum of 60 seconds). If the job runs successfully for one checkpoint cycle, the delay resets to the initial value.
The failure-rate strategy is useful when you want to tolerate a certain number of failures within a time window but stop the job if it’s consistently unstable:
In flink-conf.yaml:
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-interval: 60000 # 1 minute
restart-strategy.failure-rate.delay: 15000 # 15 seconds
This means Flink will allow up to 3 failures within any 1-minute window. If a 4th failure occurs within that minute, Flink will wait 15 seconds before attempting a restart. After 15 seconds, the failure count resets, and Flink will try again. If the job fails 3 times in a minute again, it will stop.
A common pitfall is neglecting the auto-offset-reset setting in Kafka consumers when combined with restart strategies. If your Kafka topic has a short retention period and your job fails for longer than that retention, even with a restart strategy, you might lose data because the consumer can’t rewind to a valid offset. Ensure your Kafka retention policies align with your Flink job’s potential downtime and restart strategy.
If you’ve configured a restart strategy with a finite number of attempts (e.g., 5), the next error you’ll see after all attempts are exhausted is the job failing and not automatically restarting, typically indicated by the job status changing to "FAILED" in the Flink UI and logs, without any further restart attempts.