The Kafka consumer client within Flink is failing to find partitions for a topic it’s supposed to be reading from, indicating a fundamental disconnect between Flink’s understanding of the Kafka topic and Kafka’s actual state.
Common Causes and Fixes
1. Kafka Topic Deleted or Renamed:
- Diagnosis: Check Kafka’s topic list.
kafka-topics.sh --bootstrap-server localhost:9092 --list - Fix: If the topic is missing or has a different name, recreate it with the correct name or adjust the Flink source configuration.
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic your_flink_topic --partitions 3 --replication-factor 1 - Why it works: Flink is looking for a specific topic identifier. If that identifier doesn’t exist in Kafka, the consumer cannot proceed. Recreating or renaming ensures Flink’s expectation matches reality.
2. Incorrect bootstrap.servers Configuration in Flink:
- Diagnosis: Verify the
bootstrap.serverssetting in your Flink Kafka consumer configuration.# In flink-conf.yaml or KafkaSourceBuilder properties properties.put("bootstrap.servers", "kafka1:9092,kafka2:9092"); - Fix: Ensure the provided Kafka broker addresses are correct and reachable from the Flink job manager and task managers.
properties.put("bootstrap.servers", "kafka-broker-1.example.com:9092,kafka-broker-2.example.com:9092"); - Why it works: Flink’s Kafka client needs to connect to Kafka brokers to discover topic metadata, including partition information. If it can’t reach any brokers, it can’t get this essential data.
3. Flink Job Manager/Task Manager Network Issues to Kafka:
- Diagnosis: From the Flink Task Manager pods/VMs, try to telnet to the Kafka bootstrap servers on the specified port.
# On a Flink Task Manager node telnet kafka-broker-1.example.com 9092 - Fix: Adjust network security groups, firewall rules, or Kubernetes network policies to allow egress traffic from Flink nodes to Kafka brokers on port 9092 (or your configured Kafka listener port).
- Why it works: Even with correct configuration, network restrictions can prevent Flink from establishing the necessary connections to Kafka’s metadata endpoints.
4. Kafka Topic Partition Count Mismatch (Less common for "Not Found", but can cause related issues):
- Diagnosis: Compare the number of partitions reported by Flink (if it can connect at all) with Kafka’s actual partition count.
# Flink UI or logs might show expected partitions # Kafka check: kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic your_flink_topic - Fix: If the partition count has changed in Kafka after the Flink job started and Flink is configured for static partition discovery, you might need to restart the Flink job. If Flink is configured to auto-discover partition changes, this is less likely. For static discovery, ensure the Flink job is started with the correct partition count in mind or use dynamic partition discovery.
KafkaSource<String> source = KafkaSource.<String>builder() .setTopics("your_flink_topic") // Can also specify partitions here if static .setGroupId("my-group") .setBootstrapServers("localhost:9092") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setPartitionDiscoveryIntervalMillis(5000) // Enable dynamic discovery .build(); - Why it works: If Flink is configured to look for a specific set of partitions (e.g., from a previous snapshot or initial configuration) and Kafka has fewer partitions, it might report them as "not found" if it expects more than exist. Enabling dynamic discovery allows Flink to adapt to Kafka’s current state.
5. Kafka ACLs Blocking Metadata Access:
- Diagnosis: Check Kafka broker logs for authorization errors related to the Flink client’s principal (e.g., Kerberos principal or SASL username).
- Fix: Grant the Flink client principal
DESCRIBEandREADpermissions on the topic andCLUSTER_ACTIONpermission on the cluster in Kafka’s ACLs.# Example ACL command (adjust principal and permissions) kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:flink_user --operation READ --topic your_flink_topic kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:flink_user --operation DESCRIBE --topic your_flink_topic kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:flink_user --operation CLUSTER_ACTION --cluster - Why it works: Kafka’s security layer can prevent clients from even querying topic metadata if they lack the necessary permissions, leading to the appearance that partitions don’t exist.
6. ZooKeeper/KRaft Issues (if Flink relies on it for cluster metadata):
- Diagnosis: Ensure your Kafka cluster’s ZooKeeper ensemble (or KRaft controller) is healthy and reachable by Kafka brokers. Check Kafka broker logs for connectivity issues to ZooKeeper/KRaft.
- Fix: Troubleshoot and resolve ZooKeeper/KRaft quorum issues or network problems. Ensure Kafka brokers can communicate with the metadata quorum.
- Why it works: Kafka brokers use ZooKeeper or KRaft to maintain cluster state, including topic and partition information. If this metadata store is unavailable or unhealthy, brokers cannot reliably serve partition discovery requests.
7. Flink Checkpoint/Savepoint Corruption or Inconsistency:
- Diagnosis: If the error occurs after restoring from a savepoint, examine the savepoint’s metadata. It might reference non-existent partitions or offsets.
- Fix: If a savepoint is the cause, you may need to manually edit the savepoint metadata (advanced and risky) or, more commonly, start a new job with
OffsetsInitializer.earliest()orOffsetsInitializer.latest()and discard the problematic savepoint. - Why it works: A savepoint captures the state of the Kafka consumer offsets and partition assignments. If the underlying Kafka topic state has changed incompatibly since the savepoint was taken, restoring from it can lead to partition lookup failures.
The next error you’ll likely encounter is a KafkaException: Failed to create Kafka consumer or a LeaderNotAvailableException if Flink can see the topic but not its leaders.