Handling variable-length Kafka tasks using Python
Reza A. Alipour / December 16, 2022
13 min read
Let's say you have a service that takes customers' requests, distributes them as events amongst several workers, waits for the workers to perform a long-running process for each request, and gathers the results to send back to customers. If you intend to use Apache Kafka as the medium to populate events and you are not careful, chances are you are going to encounter a certain predicament: Your requests will be processed more than once by different workers!
TL;DR: Handling Kafka event consumption when processing events needs a long variable-length time is particularly challenging. This post is a discussion on this challenge and possible solutions for it. A simple implementation of a chosen solution, which is accessible via this link, is also described.
Introduction
Kafka is a prominent event-streaming infrastructure. In Kafka's world, events (or requests in our simple scenario) are generated by a multitude of entities called producers. Kafka classifies these events into a predefined set of semantically close groups called topics. Producers decide to which topic their event belongs. Consumers are entities that read these events and do something over them accordingly. Each topic can have multiple producers and consumers.
Consumers can cooperate and form a group to share a topic and read from it collectively. Producers and consumers are meant to be fully decoupled as they can write (push) or read (pull) events whenever they want. Kafka is responsible to manage and retain those events. Since topics are accessible to read from and write to through more than one interface called brokers, Kafka partitions events of a topic once more to avoid any inconsistencies between the topic's consumers connected to different brokers. Each topic-partition is assigned to a unique consumer. Consumers can have more than one topic-partition. Upon event receipt, each consumer of a group must send a delivery acknowledgment (commit) to the broker so Kafka doesn't send it again to the group.
Note that this design doesn't permit consumers of a topic to be more than partitions. Partitions are where Kafka respects the order of events and will serve events to the consumer in a FIFO1 manner. Ordering is not preserved at the topic level in Kafka.
The problem
While you are the one in charge of defining topics, the number of partitions, and the number of consumers up to the number of partitions, Kafka decides which topic-parition is assigned to which consumer from the consumer group. It constantly checks for live consumers and designates a topic-partition. If a consumer disconnects from a broker, Kafka reassigns its topic-partition to another consumer, a procedure called Rebalancing.
The point is, Kafka needs to know which of the consumers are alive and which of them have disconnected
from the broker to be able to perform rebalancing. Therefore, Kafka mandates every consumer in the
consumer group to send a heartbeat at least every max.poll.interval.ms
milliseconds to affirm their
presence. This is also the interval in which the consumer will request new events from the Kafka broker.
So far so good. If your consumers receive an event, perform a quick process on it, commit,
and are ready to take the next event in a time much shorter than max.poll.interval.ms
,
congratulation! You are all set. The problem arises when your single-threaded application takes
more than max.poll.interval.ms
to process an event and consequently misses the heartbeat deadline.
Let's see why you don't want this to happen. Suppose events A
is in topic t
and topic-partition
t-p
and two single-threaded consumers alpha
and beta
have formed a group to read from topic t
. Also, suppose
Kafka assigns t-p
to alpha
and it consumes event A
immediately. If alpha
takes more than
max.poll.interval.ms
to send the next heartbeat, Kafka assumes alpha
has been disconnected from the
group and reassigns t-p
to the beta
at the next Rebalancing. Now since there is no commit to exclude
A
, Kafka will resend the A
to beta
. If the processing time for A
is long enough to repeat the
pattern for B
as well, your service will be stuck in a livelock. It's important to mention that if
alpha
tries to send commits during or after rebalancing, it will fail.
How to mitigate the situation
The trivial solution would be to set max.poll.interval.ms
big enough to avoid such a situation
altogether. There are two downfalls to this quick patch. First, if the time to process your tasks
varies significantly, as is indeed the case for many machine learning tasks where computation time
depends on the size of the dataset or algorithm, your chances of finding a threshold that guarantees
all your tasks will fall below it are slim. In addition, as discussed here,
if you want to introduce new consumers to the group, or you need a rebalance due to failed consumers,
you will have to wait a significant amount of time.
Another somewhat obvious workaround would be to commit right after event receipt and before processing it, but that exacts reducing processing acknowledgment to just a delivery acknowledgment. If the process fails for a reason, the event is lost. Some might argue having another pipeline from the consumer to the producer will notify the producer of the lost event. That will not be of much help since you won't get any response pertaining to the failed event. That means you have to specify a duration threshold to distinguish between in-process events from failed events and resend failed events to the broker, which is not easy when your tasks are variable-length.
The third solution expands on the previous one and assumes the application has a dedicated main thread for dealing with the Kafka broker and a bunch of worker threads to process the incoming events. The main thread receives events and caches them in a local queue, from which it can feed the workers. The main thread distributes events while sending constant heartbeats to the Kafka broker. It then gathers the results from workers, commits accomplished tasks, and reassigns failed tasks.
This is particularly tricky to implement for Kafka as Kafka doesn't accept commits of individual events. What you have to commit is just an offset that points to the last place where events have been processed. As a result, the main thread must ensure every event up to a certain point is processed before sending a commit of that point. Therefore, if the main thread crashes before all consecutive events up to a point are processed, those events will all be processed again after rebalance.
A workaround for the above problem would be to transform every worker into a unique consumer and assign at least one topic-parition to each worker discretely instead of having them obtain events from the main thread. Every worker would have two threads: one responsible to accept events and sending constant heartbeats and another to process events as they arrive. Events will be immediately acknowledged as their process completes and also the consumers won't occupy themselves with queue management and separation of concerns will follow. This is our solution of choice in this post.
Next, we are going to talk about how to implement this solution in Python.
Python Implementation
This implementation is not meant to be bug-free and production-ready and is just a glimpse into what the proposed solution might look like. Additionally, we assume the producer application is already in place and produces events. The consumer application receives events as they are produced, process them, commits them on Kafka, and sends a new event containing the results to a separate topic to inform the producer of the result of the process. We also use Protocol Buffers to serialize our communication.
What every worker is about to do is cpu-bound. Therefore, separating workers as threads is not ideal as threads are meant for io-bound tasks rather than cpu-bound tasks. We need a module to provide a platform to spawn multiple worker instances as processes and execute them. The following module would do the job:
from pathos.multiprocessing import Pool
from tools.log.logger import Logger
from typing import Callable
import os
import signal
# to ignore events being sent to process children. The parent must
# take care of the event handling, e.g., keyboard interrupts
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class WorkerPool(Logger):
'''
worker pool gets a worker routine function and spawns a certain
number of processes with that routine.
worker_routine: the worker function to be spawn and executed
number_of_workers: number of workers to be created and executed
after instantiation call the execute method and pass along the argument
you want to send to the worker.
'''
def __init__(self, worker_routine: Callable, number_of_workers: int=os.cpu_count()) -> None:
# instantiate a process-safe logger
Logger.__init__(self, "worker_pool")
# validate the inputs before proceeding
if not isinstance(number_of_workers, int):
raise TypeError("number of workers is not correct")
if not callable(worker_routine):
raise TypeError("no worker specified")
# limit the number of worker to the number of cpus
self.__number_of_workers = min(number_of_workers, os.cpu_count())
self.__worker_routine = worker_routine
if(self.__number_of_workers < number_of_workers):
self.logger.warning(f"number of cpus limit dictated {self.__number_of_workers} workers instead of {number_of_workers} you provided")
def execute(self, *args, **kwargs):
self.logger.info(f"starting worker pool with {os.cpu_count()} cpus and {self.__number_of_workers} workers")
# augment worker's arguments with its id
def worker_routine_wrapper(worker_number):
kwargs["worker_number"] = worker_number
self.__worker_routine(*args, **kwargs)
# Use initializer to ignore SIGINT in child processes
with Pool(initializer=init_worker) as mp_pool:
try:
mp_pool.map(
worker_routine_wrapper,
range(self.__number_of_workers)
)
except KeyboardInterrupt:
self.logger.info("worker pool shutting down")
return
Note that we have used the pathos
fork to create the process pool. You can follow this discussion
to understand the reasoning behind it. If we were to use the native multiprocessing
, we had to
define the worker routine inside the module instead of getting it as an argument.
Next, we have to specify our worker routine. Therefore, we define a function that takes the Kafka
configuration object, populates a template function object with the configuration, and returns
the resultant function, which could be used as the routine for our WorkerPool
. The config for our
setup includes Kafka's broker endpoint, the topic where events will be sent, the consumer group's id,
and a response topic to which consumers will write process results. We use pydantic
to get this information from environment variables:
from pydantic import BaseSettings
class KafkaConfig(BaseSettings):
kafka_bootstrap_server: str
kafka_request_topic: str
kafka_ack_topic: str
kafka_consumer_group_id: str
class Config:
env_prefix = ''
case_sensitive = False
allow_mutation = False
In addition, the Protocol Buffers to enact the communication is as follows:
syntax = "proto3";
package communication;
message Request {
int32 sequence = 1;
string payload = 2;
}
message Response {
int32 sequence = 1;
bool isprocessed = 2;
}
We use the confluent_kafka
library, a prominent Python client to connect to Kafka's broker. The worker
routine creates a consumer and a producer to send acknowledgments. It loops to poll events from
Kafka. When one is received, a service thread is spawned and given the event for processing. It then
constantly sends heartbeats to the Kafka broker while waiting for service results. After receiving the result
from the service thread, it resumes getting new messages. The handler function that returns the worker
routine function seems like this:
def KafkaHandler(config: config.KafkaConfig) -> Callable:
# the worker pool provides a worker_number to each worker
# for logging purposes.
def routine(worker_number, *args, **kwargs):
# callback to be called when worker thread has done its work
def callback(future):
logger.info(f"{future.result()}")
try:
# instantiate a logger
logger = Logger(f"kafka_handler[{worker_number}]").getLogger()
# each worker is supposed to act as a unique consumer
# in the consumer group with one or more topic-partitions
# assigned to it
c = Consumer({
'bootstrap.servers': config.kafka_bootstrap_server,
'group.id': config.kafka_consumer_group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
# each worker will send acknowledgement events to producers
# to inform them of the process result.
p = Producer({
'bootstrap.servers': config.kafka_bootstrap_server,
})
logger.info(f"starting kafka handler")
# subscribe to the requst topic to receive events
c.subscribe([config.kafka_request_topic])
# iterate indefinitely to poll new events and send heartbeats
# if the worker is through a process the polling must pause
# message reception and merely send heartbeats to the broker.
while True:
msg = c.poll(2.0)
if msg is not None:
if msg.error():
logger.error(f"consumer error: {msg.error()}")
continue
# generate a thread for every event and get a future
# to check.
executor = ThreadPoolExecutor(max_workers= 1)
# service is a Callabe containing the business logic
# to process events. service will receive the event itself,
# the consumer, the producer, config, and every argument that
# is passed to the worker routine at execution time.
future = executor.submit(
service,
msg=msg,
consumer=c,
producer=p,
config=config,
logger= logger,
*args,
**kwargs
)
future.add_done_callback(callback)
# graceful shutdown
finally:
p.flush()
c.close()
return routine
We define service
in KafkaHandler
in the same module. The service receives the event alongside
the consumer and producer. First, it pauses event reception at the worker, then it proceeds to parse the
incoming protobuf message and process it. Upon successful completion of the process, it tries to
send the result in the ack topic and waits for the Kafka broker to confirm that the ack has been
registered at the broker. If the ack is sent without error, the service thread commits to the request topic,
otherwise, it just terminates the service. The service function is as follows:
def service(msg,
consumer: Consumer,
producer: Producer,
config: config.KafkaConfig,
logger: Logger,
*args,
**kwargs):
# when worker is done processing the event, it has to send
# an acknowledgement event to another topic to inform producers
# of the process result. The worker will commit the event only when
# it is assured that this ack has been set in Kafka, otherwise the event
# is not considered complete and worker won't commit.
# this is a callback function for the producer that sends the ack
def acked(error, message):
if error is None:
# deserialize ack event
parsedData = communication_pb2.Response()
parsedData.ParseFromString(message.value())
consumer.commit(asynchronous=False)
logger.info(f"ack sent: {parsedData.sequence}")
else:
logger.error(f"error sending ack: {error}, will not commit")
# main thread can pass arbitrary arguments to workers at
# execution time to be used in workers. Note that these
# arguments are not worker specfic and each worker receives
# them
### process defined arguments
dummy = kwargs["dummy"]
try:
# pause event consumption until the process is complete
consumer.pause(consumer.assignment())
# deserialize incoming event
parsedData = communication_pb2.Request()
parsedData.ParseFromString(msg.value())
### process data here
### send the ack result to the kafka
ack = communication_pb2.Response()
ack.sequence = parsedData.sequence
ack.isprocessed = True
# each worker is also a producer that sends ack events
# to a response topic. Producers consume these events
# to get a sense of their events.
producer.produce(
config.kafka_ack_topic,
key=msg.key(),
value=ack.SerializeToString(),
callback=acked
)
# wait a certain amount of time for the producer
# to send the ack. if the operation fails somehow
# worker won't commit and event will be processed again
producer.poll(1)
result = f"event has been processed: {parsedData.sequence}"
except Exception as ex:
result = f"error processing the event: {ex}"
# resume reception of events
finally:
consumer.resume(consumer.assignment())
return result
Now we are ready to fire up our consumer in the main module:
from controller.kafka.kafka_handler import KafkaHandler
from tools.pool.worker_pool import WorkerPool
from tools.kafka.config import KafkaConfig
def runner():
config = KafkaConfig()
handler = KafkaHandler(config)
pool = WorkerPool(handler)
# every argument for the service functin needs to be passed here
pool.execute(dummy="dummy")
if __name__ == "__main__":
runner()
Thanks for reading.
Footnotes
-
First-In First-Out ↩