kafka-python verify that a topic exists

Revision history
Tags: kafka python

I was struggeling to assert that a topic existed on the Kafka broker before starting to poll for messages. Due to a bug in kafka-python, polling for messages from a KafkaConsumer on a topic that doesn’t exist will cause an infinite loop with no delay, which in turn makes CPU usage sky rocket until the process is restarted.

Finally I found a solution to first assert that a topic exists before moving on to polling:

client = KafkaClient(self.bootstrap_servers)
broker_topics = client.topic_partitions
instance_topics = ['topic-2', 'topic-3', 'topic-7']

# Make sure all topics that are to be used actually exist. This prevents
# the consumer going into an infinite loop and 100% CPU usage when it
# attempts to poll from a non-exising topic.
# TODO: This will most probably be fixed in later versions of kafka-python
for topic in instance_topics:
    if topic and topic not in broker_topics:
        print("Topic '%s' does not exist. Exiting!" % topic)
        sys.exit(1)

I’m actually exiting the process here. I run this script in a docker container and want it to exit and restart when it can’t settle in its environment.

References

If you have any comments or feedback, please send me an e-mail. (stig at stigok dotcom).

Did you find any typos, incorrect information, or have something to add? Then please propose a change to this post.

Creative Commons License This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.