Неожиданный параллелизм при обработке сообщений в Kafka

Есть распространённое заблуждение, что в рамках одной консюмер-группы сообщения одной и той же партиции топика Apache Kafka обрабатываются последовательно, одно за другим. И конкурентный доступ к сообщениям партиции со стороны консюмера невозможен. Спешу вас разуверить, такое возможно, и вот вам моя история.

Рассмотрим последовательность событий:

  1. Партиция MyTopic#0 назначена консюмеру MyConsumer#0.
  2. MyConsumer#0 считал из партиции N сообщений.
  3. MyConsumer#0 начал обработку сообщений.
  4. Случилась ребалансировка.
  5. Иных консюмеров нет.
  6. Партиция MyTopic#0 вновь назначена MyConsumer#0.
  7. MyConsumer#0 вновь считал из партиции те же сообщения.
  8. MyConsumer#0 вновь начал обработку тех же сообщений.

Что тут может пойти не так?

Если консюмер осуществляет обработку событий в отдельном пуле потоков (worker threads pool), то копии считанных сообщений будут находиться в памяти клиентского приложения. Следовательно, в момент, когда консюмер после ребалансировки еще раз считает и начнёт обработку тех же самых сообщений, но в рамках другого потока из пула, возникает коллизия. Как минимум, могут существовать два потока, которые одновременно обрабатывают одни и те же сообщения.

Может ли такое произойти в вашем проекте?

В целом, может, и вот по какой причине.

С одной стороны, Kafka предлагает достаточно простой и универсальный API для обслуживания сообщений топика. Продюсер пишет сообщения, консюмер их читает, обрабатывает и сдвигает указатель на следующую пачку сообщений. Партиция, действительно назначается одному-единственному консюмеру в группе с гарантией, что только этот консюмер будет её обслуживать.

С другой стороны, Kafka API определяет событийную модель и предполагает, что все клиенты следуют некоторому предопределённому регламенту обработки этих событий. Из основных событий можно выделить назначение и отзыв партиций. То, как именно будут обрабатываться данные события, во многом определяется реализацией используемого Kafka Client или прикладным кодом. И тут начинается самое интересное.

В целом, основная ответственность ложится на прикладной код: будете ли вы обрабатывать момент ребалансировки, чтобы отменить уже запущенную обработку. И тут нужно ответить на два вопроса:

  • Насколько критично, если сообщение обработается дважды (idempotence)?
  • Насколько критично, если одни и те же сообщения будут обрабатываться параллельно (concurrency)?

К первому чаще всего многие готовы, а вот ко второму – нет.

К сожалению, в моём текущем проекте не был готов и я, ибо для работы с Kafka пришлось использовать проприетарную библиотеку, в которой не было никакой возможности обрабатывать момент ребалансировки. Конечно, сейчас я к этому готов, но осадочек остался, поскольку в какой-то степени понадеялся на фреймворк.

Надеюсь, что эта небольшая, но поучительная история, поможет вам обойти стороной подобную проблему.



Понравилась статья?

Посмею напомнить, что у меня есть Telegram-канал Архитектоника в ИТ, где я публикую материал на похожие темы примерно раз в неделю. Подписчики меня мотивируют, но ещё больше мотивируют живые дискуссии, ведь именно в них рождается истина. Поэтому подписывайтесь на канал и будем оставаться на связи! ;-)

Статьи из той же категории: