Неожиданный параллелизм при обработке сообщений в Kafka
Есть распространённое заблуждение, что в рамках одной консюмер-группы сообщения одной и той же партиции топика Apache Kafka обрабатываются последовательно, одно за другим. И конкурентный доступ к сообщениям партиции со стороны консюмера невозможен. Спешу вас разуверить, такое возможно, и вот вам моя история.

Рассмотрим последовательность событий:
- Партиция
MyTopic#0назначена консюмеруMyConsumer#0. -
MyConsumer#0считал из партицииNсообщений. -
MyConsumer#0начал обработку сообщений. - Случилась ребалансировка.
- Иных консюмеров нет.
- Партиция
MyTopic#0вновь назначенаMyConsumer#0. -
MyConsumer#0вновь считал из партиции те же сообщения. -
MyConsumer#0вновь начал обработку тех же сообщений.
Что тут может пойти не так?
Если консюмер осуществляет обработку событий в отдельном пуле потоков (worker threads pool), то копии считанных сообщений будут находиться в памяти клиентского приложения. Следовательно, в момент, когда консюмер после ребалансировки еще раз считает и начнёт обработку тех же самых сообщений, но в рамках другого потока из пула, возникает коллизия. Как минимум, могут существовать два потока, которые одновременно обрабатывают одни и те же сообщения.
Может ли такое произойти в вашем проекте?
В целом, может, и вот по какой причине.
С одной стороны, Kafka предлагает достаточно простой и универсальный API для обслуживания сообщений топика. Продюсер пишет сообщения, консюмер их читает, обрабатывает и сдвигает указатель на следующую пачку сообщений. Партиция, действительно назначается одному-единственному консюмеру в группе с гарантией, что только этот консюмер будет её обслуживать.
С другой стороны, Kafka API определяет событийную модель и предполагает, что все клиенты следуют некоторому предопределённому регламенту обработки этих событий. Из основных событий можно выделить назначение и отзыв партиций. То, как именно будут обрабатываться данные события, во многом определяется реализацией используемого Kafka Client или прикладным кодом. И тут начинается самое интересное.
В целом, основная ответственность ложится на прикладной код: будете ли вы обрабатывать момент ребалансировки, чтобы отменить уже запущенную обработку. И тут нужно ответить на два вопроса:
- Насколько критично, если сообщение обработается дважды (idempotence)?
- Насколько критично, если одни и те же сообщения будут обрабатываться параллельно (concurrency)?
К первому чаще всего многие готовы, а вот ко второму – нет.
К сожалению, в моём текущем проекте не был готов и я, ибо для работы с Kafka пришлось использовать проприетарную библиотеку, в которой не было никакой возможности обрабатывать момент ребалансировки. Конечно, сейчас я к этому готов, но осадочек остался, поскольку в какой-то степени понадеялся на фреймворк.
Надеюсь, что эта небольшая, но поучительная история, поможет вам обойти стороной подобную проблему.
Понравилась статья?
Посмею напомнить, что у меня есть Telegram-канал Архитектоника в ИТ, где я публикую материал на похожие темы примерно раз в неделю. Подписчики меня мотивируют, но ещё больше мотивируют живые дискуссии, ведь именно в них рождается истина. Поэтому подписывайтесь на канал и будем оставаться на связи! ;-)
Статьи из той же категории: