kafka streams что это

Почему стриминг на KSQL и Kafka Streams — это непросто

Меня зовут Саша, я лид-разработчик в GlowByte Consulting. Мы с командой сделали неплохой стриминговый движок для одного крупного банка. Сейчас в продакшене крутится онлайн обработка банковских авторизаций, визитов клиентов в офис и еще ряд более мелких процессов, при этом все работает на KSQL и Kafka Streams. Хочу поделиться тем, на какие грабли мы наступили в процессе.

Если интересны подробности, прошу под кат.

Для тех, кто в танке

Если вы не знакомы с Kafka, советую почитать вводную часть с официального сайта Apache: kafka.apache.org/intro, либо с сайта Confluent:
docs.confluent.io/current/kafka/introduction.html, чтобы было понятно, о чем вообще речь.

Если же вы работали с Kafka как с шиной для обмена сообщениями, то у вас мог возникнуть вопрос: как сделать на этом стриминговый движок? Отвечаю, все благодаря Kafka Streams API и KSQL.

Kafka Streams — это Java API, которое позволяет перекладывать данные из топика в топик, по пути совершая различные преобразования этих данных.

KSQL — это надстройка над Kafka Streams, которая позволяет вместо написания Java кода использовать SQL-подобный язык, который автоматически генерирует Kafka Streams приложения.

Что мы сделали

Архитектура решения GlowByte

Наш PROD-кластер Kafka состоит из 3 нод. Конфигурация каждой примерно такая:

Пример сценария (не обязательно настоящего)

Клиент совершает покупку по карте в каком-либо магазине. Определяется, что магазин находится в крупном торговом центре, где также есть магазины-партнеры банка, выпустившего карту. Так что клиенту почти сразу после совершения покупки приходит сообщение с предложением посетить также и эти магазины.

Почти всю логику мы реализовали на KSQL-скриптах. Пишутся они, в отличие от обычного SQL не так просто. Но все же проще, чем Java-код. К тому же, их можно исправлять без компиляции (удобно менять какие-нибудь простые фильтры), а также они могут быть прочитаны человеком, абсолютно не знакомым с Java и с классическим программированием вообще.

Kafka Streams API мы использовали только в тех случаях, где KSQL оказался бессилен. Например, при сложных one-to-many джойнах.

Как надо и как не надо

Теперь перейду к основной сути поста. А конкретно, к граблям, на которые мы напоролись.

НЕ НАДО: Надеяться, что все заработает «из коробки»
НАДО: Изучить все технические детали

Казалось бы, очевидный пункт. Но когда реализуешь большое и технически сложное решение, все равно на что-то да наткнешься. Основные наши проблемы были:

Несколько раз при попытке планово перезапустить Kafka (для внесения пары незначительных параметров), Kafka просто не поднималась. Приходилось оперативно разбираться с проблемой и чаще всего увеличивать те или иные timeout’ы в конфигах.

Диски и железо

НЕ НАДО: Использовать одни и те же диски для всего, просто потому что места хватает
НАДО: Хорошо подобрать конфигурацию железа (особенно диски)

Основная проблема, с которой мы столкнулись — использование обычных HDD под State Store.
Kafka Streams и, соответственно, KSQL под собой всегда имеют State Store в виде RocksDB для джойнов и вычисления агрегатов. На State Store приходится очень большое количество операций чтения-записи. Собственно, мы не ожидали, насколько много. Один обычный HDD вполне тянет (по IO) пару средних KSQL-процессов (два-три джойна) или один очень большой (пять джойнов + агрегация). Но процессов у нас очень много (десятки). А дисков всего 12 на ноду, причем почти все заняты топиками Kafka и нагружать их еще сверху — плохая идея.

В итоге мы вынесли почти весь State Store в RAM. Работает хорошо, но занимает несколько сотен ГБ памяти.

Смена парадигмы

НЕ НАДО: Просто переносить SQL-прототипы в стриминг, ожидая, что все заработает
НАДО: Перейти от парадигмы ETL в парадигму стриминга

Наша компания очень активно делает разработку регламентных ETL-процессов для различных заказчиков. ETL-процессы обычно представляют собой батчевые расчеты, проводимые раз в сутки, раз в неделю или реже. В 90%+ случаев под всеми обертками ETL-процесс — это последовательное выполнение SQL-запросов в той или иной БД.

Так как писали мы на KSQL, то в начале вполне логичным решением казалось просто адаптировать все SQL-прототипы, сделанные аналитиками, под синтаксис KSQL. Оказалось, так это не работает. KSQL — это лишь SQL-подобный язык, но работает он совершенно по-другому.

Ниже картинка, показывающая разницу в обновлении данных в БД и в Kafka. В Kafka записи не обновляются. Старая остается и добавляется новая.

Еще пример, на этот раз с перемешиванием записей. В процессе преобразования какие либо два сообщения могут сначала быть в одной партиции, потом сменить ключ, а потом снова оказаться в одной партиции. В итоге их порядок может измениться. А так как в некоторых случаях порядок важен, получаем проблему.

Ниже будет пример адаптации одного скрипта, который в парадигме ETL считался довольно простым:

Батчи вместо стриминга

НЕ НАДО: Делать на стриминге то, для чего хватит батчей
НАДО: Правильно выбирать, какие задачи можно сделать, а какие нет

Вначале планировалось, что все необходимые процессы будут реализованы на KSQL и Kafka Streams. В процессе же оказалось, что некоторые вещи, оперативность которых не столь критична, а сложность реализации очень высока, лучше вынести в батч. Такие батчи можно запускать раз в час, несколько раз в день или реже, в зависимости от критичности. Но это будет явно лучше, чем рассчитывать в онлайне все подряд.
Для себя мы решили:

Можно делать в стриминге:

Проблема пересчета

НЕ НАДО: Ставить небольшой retention на все входящие события
НАДО: В задачах онлайн-расчетов быть готовыми к тому, что все придется пересчитать

Читайте также:  что такое грехи вопиющие к небесной каре

В жизни никогда нельзя быть уверенным, что ты все сразу сделал правильно. Может возникнуть ситуация, что в расчеты, которые делались онлайн половину месяца, закралась ошибка (или требования поменялись задним числом). Конечно, во всех stateless операциях это не важно. Что было, то прошло. Но вот если в онлайне считаются, например, агрегаты (например, оборот клиента по карте), то это проблема. Если ничего не сделать, то они продолжат считаться неправильно. Приходится все пересчитывать. Поэтому, в отдельных случаях мы храним данные в топиках Kafka вплоть до трех месяцев. При этом пересчет в стриминге — штука не простая.

Источник

Kafka Streams — непростая жизнь в production

Привет, Хабр! Вокруг меня сформировался позитивный информационный фон на тему обработки событий через Kafka Streams. Этот инструмент привлекает множеством видео-докладов и статей на Хабре, подробной документацией, понятным API и красивой архитектурой. Некоторые мои знакомые и коллеги разрабатывают с его помощью свои системы. Но что происходит в реальной жизни, когда эти системы уходят в production?

В этой статье я опущу введение в Kafka Streams, предполагая, что читатель уже знаком с ней, и расскажу о нашем опыте жизни с этой библиотекой на примере достаточно нагруженной системы.

Коротко о проекте

Внутренней командой вместе с партнерами мы работаем над биржей Ad Exchange, которая помогает перепродавать рекламный трафик. Специфику подобных инструментов мы уже когда-то описывали в статье на Хабре. По мере роста числа партнеров среди SSP и DSP, нагрузка на сервера биржи растет. А для повышения ценности самой биржи мы должны собирать с этого трафика развернутую аналитику. Тут-то мы и попытались применить Kafka Streams.

Статья не претендует на объективность и описывает только лишь наш опыт и проблемы, с которыми мы столкнулись, работая с этой технологией. Надеюсь, кому-то она поможет избежать ошибок при использовании Kafka Streams. А может быть даст повод посмотреть по сторонам.

Агрегаты

Вроде бы стандартный кейс для Kafka Streams: используем функции groupBy и aggregate и получаем нужный результат. Всё работает именно так, причем с отличной скоростью за счёт внутреннего кэша: несколько последовательных изменений по одному и тому же ключу сначала выполняются в кэше и только в определённые моменты отправляются в changelog-топик. Далее Kafka в фоне удаляет устаревшие дублирующиеся ключи через механизм log compaction. Что здесь может пойти не так?

Репартиционирование

Если ваш ключ группировки отличается от ключа, под которым изначально пришло событие, то Kafka Streams создаёт специальный repartition-топик, отправляет в него событие уже под новым ключом, а потом считывает его оттуда и только после этого проводит агрегацию и отправку в changelog-топик. В нашем примере вполне может быть, что событие «Показ рекламы» пришло с ключом в виде UUID. Почему нет? Если вам надо сделать группировку, например по трём другим ключам, то это будет три разных repartition-топика. На каждый топик будет одно дополнительное чтение и одна дополнительная запись в Kafka. Чувствуете, к чему я веду?

Предположим, на входе у вас 100 тысяч показов рекламы в секунду. В нашем примере вы создадите дополнительную нагрузку на брокер сообщений в размере +600 тысяч сообщений в секунду (300 на запись и 300 на чтение). И ведь не только на брокер. Для таких объёмов надо добавлять дополнительные сервера с сервисами Kafka Streams. Можете посчитать, во сколько тысяч долларов обойдется такое решение с учётом цен на железо.

Запросы к агрегатам

Данные пишут для того, чтобы с ними потом можно было работать. Например делать к ним запросы. И здесь наши возможности оказались серьезно ограничены. Исчерпывающий список того, как мы можем запрашивать данные у Kafka Streams, есть здесь или здесь для оконных агрегатов. Если используется стандартная реализация state-store на основе RocksDB (а скорее всего это так), фактически данные можно получать только по ключам.

Стабильность Kafka Streams

У нас бывают проблемы с хостинг провайдером. Иногда выходит из строя оборудование, иногда человеческий фактор, иногда и то, и другое. Если по какой-то причине теряется связь с Kafka, то Kafka Streams переводит все свои потоки в состояние DEAD и ждёт, когда мы проснёмся и сделаем ей рестарт. При этом рядом стоят соседние сервисы, которые работают с той же Kafka через Spring и @KafkaListener. Они восстанавливаются сами, как ни в чём не бывало.

Нам пришлось дописать в каждый сервис Kafka Streams дополнительный модуль, который работает как watchdog: поднимает Kafka Streams, если видит, что она умерла.

Кстати, если вы работаете с Kafka Streams через Spring, то не забудьте переопределить стандартный StreamsBuilderFactoryBean, указав в нём свой CleanupConfig. Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB. Напомню, что это приведёт к тому, что при каждом рестарте все сервера начнут активно считывать данные из changelog-топика. Поверьте, вам это не нужно.

KStream-KStream Join

Здесь можно было бы обойтись одной фразой: никогда это не используйте. Джоин двух потоков создаёт десятки топиков в Kafka и огромную нагрузку на всю систему. Просто не делайте этого. Ну или хотя бы проверьте все под нагрузкой, прежде чем ставить в production.

Вообще Kafka Streams любит создавать топики под различные свои нужды. Если вы не изучили под лупой документацию и не протестировали, как это работает, то это может стать для вас и ваших DevOps неприятным сюрпризом. Чем больше топиков, тем сложнее их администрировать.

Масштабируемость

Если вы используете джоины, то все ваши топики должны быть ко-партиционированы (co-partitioning), что, в числе прочего, означает, что у них должно быть одинаковое количество партиций. Так в чём же проблема?

Читайте также:  Что значит слесарь мср

Представим, что вы сделали несколько топологий: с агрегатами, джоинами, всё как положено. Поставили в production, оно работает, вы радуетесь. Дальше бизнес пошел в гору, нагрузка начала расти, вы добавили серверов и хотите увеличить количество партиций, чтобы загрузить эти новые сервера. Но Kafka Streams наплодил десятки топиков. И у всех надо поднимать количество партиций, иначе следующий рестарт ваших сервисов может стать последним. Если Kafka Streams увидит, что топики не ко-партиционированы, она выдаст ошибку и просто не запустится.

На вопрос, что с этим делать, у меня сегодня ответа нет. Вероятно, можно потушить все рабочие инстансы Kafka Streams, потом поднять число партиций на всех причастных топиках, затем поднять Kafka Streams обратно и молиться. А может быть последовать совету отсюда: Matthias J. Sax пишет, что это нужно делать, создавая новый топик с новым количеством партиций и подключать к нему Kafka Streams с новым application.id. Там же есть совет, что если вы знаете заранее, что нагрузка будет большая, то лучше сделать партиций с запасом.

Заключение

Источник

Обеспечение высокой доступности приложений с Kafka Streams

Kafka Streams — это Java-библиотека для анализа и обработки данных, хранящихся в Apache Kafka. Как и в любой другой платформе потоковой обработки, она способна выполнять обработку данных с сохранением и/или без сохранения состояния в режиме реального времени. В этом посте я попытаюсь описать, почему достижение высокой доступности (99,99%) проблематично в Kafka Streams и что мы можем сделать для того, чтобы ее достичь.

Что нам нужно знать

Прежде чем описывать проблему и возможные решения, давайте рассмотрим основные концепции Kafka Streams. Если вы работали с API-интерфейсами Kafka для консьюмеров/продьюсеров, то большинство из этих парадигм вам уже знакомы. В следующих разделах я попытаюсь в нескольких словах описать хранение данных в партициях, перебалансировку групп консьюмеров и как основные концепции Kafka клиентов вписываются в библиотеку Kafka Streams.

Kafka: Партицирование данных

Каждый экземпляр консьюмера в группе консьюмеров отвечает за обработку данных из уникального набора партиций входного топика.

Экземпляры консьюмеров по сути являются средством масштабирования обработки в вашей группе консьюмеров.

Kafka: Ребалансировка группы консьюмеров

Kafka Streams: Потоки

В начале этого поста мы ознакомились с тем, что библиотека Kafka Streams построена на основе API-интерфейсов продьюсеров и консьюмеров и обработка данных организована точно так же, как стандартное решение на Kafka. В конфигурации Kafka Streams поле application.id эквивалентно group.id в API-интерфейса консьюмера. Kafka Streams предварительно создает определенное количество потоков и каждый из них выполняет обработку данных из одной или нескольких партиций входных топиков. Говоря в терминологии API консьмеров, потоки по существу совпадают с экземплярами консьюмеров из одной группы. Потоки являются основным способом масштабирования обработки данных в Kafka Streams, это можно сделать вертикально, увеличив число потоков для каждого приложения Kafka Streams на одной машине, или горизонтально, добавив дополнительную машину с тем же application.id.

В Kafka Streams есть еще много элементов, таких как задачи, топология обработки, threading model и т.д., которые мы не рассмотрим в этом посте. Более подробную информацию можно найти здесь.

Kafka Streams: Хранение состояния

В потоковой обработке существует операции с сохранением и без сохранения состояния. Состояние — это то, что позволяет приложению запоминать необходимую информацию, выходящую за рамки обрабатываемой в данный момент записи.

Операции с состоянием, такие как count, любой тип aggregation, joins и т.д., намного сложнее. Это связано с тем, что имея только одну запись нельзя определить последнее состояние (скажем, count) для данного ключа, поэтому необходимо хранить состояние вашего потока в вашем приложении. Как мы уже обсуждали ранее, каждый поток обрабатывает набор уникальных партиций, следовательно поток обрабатывает только подмножество всего набора данных. Это означает, что каждый поток приложения Kafka Streams с одним и тем же application.id поддерживает свое собственное изолированное состояние. Мы не будем вдаваться в подробности о том, как формируется состояние в Kafka Streams, но важно понимать, что состояния восстанавливается с помощью топика журнала изменений(change-log топик) и сохраняется не только на локальном диске, но и в Kafka Broker. Сохранение журнала изменений состояния в Kafka Broker в качестве отдельного топика сделано не только для отказоустойчивости, но и для того, чтобы вы могли легко развернуть новые экземпляры Kafka Streams с тем же application.id. Поскольку состояние хранится в виде change-log топика на стороне брокера, новый экземпляр может загрузить свое собственное состояние из этого топика.

Более подробную информацию о хранении состояния можно найти здесь.

Почему обеспечение высокой доступности проблематично с Kafka Streams?

Мы рассмотрели основные концепции и принципы обработки данных с Kafka Streams. Теперь давайте попробуем объединить все части вместе и проанализировать, почему достижение высокой доступности может быть проблематичным. Из предыдущих разделов мы должны помнить:

TransferWise SPaaS (Stream Processing as a Service)

Прежде чем осветить суть этого поста, позвольте мне сначала рассказать, что мы создали в TransferWise и почему высокая доступность очень важна для нас.

В TransferWise у нас работает несколько узлов для стриминговой обработки, и каждый узел содержит несколько экземпляров Kafka Streams для каждой продуктовой команды. Экземпляры Kafka Streams, которые предназначены для конкретной команды разработчиков, имеют специальный application.id и обычно имеют более 5 потоков. В целом команды обычно имеют 10-20 потоков (эквивалентно числу инстансов консьюмеров) по всему кластеру. Приложения, которые развернуты на узлах, прослушивают входные топики и выполняют несколько типов операций с состоянием и/или без состояния над входными данными и предоставляют обновления данных в реальном времени для последующих нисходящих микросервисов.

Читайте также:  какой код на робуксы в 2021 году

Продуктовые команды нуждаются в обновлении агрегированных данных в режиме реального времени. Это необходимо для того, чтобы предоставить нашим клиентам возможность мгновенного перевода денег. Наш обычный SLA:

В любой день 99,99% агрегированных данных должны быть доступны менее чем за 10 секунд.

Чтобы дать вам представление, во время стресс-тестирования приложение Kafka Streams смогло обрабатывать и агрегировать 20 085 входных сообщений в секунду. Таким образом 10 секунд SLA при нормальной нагрузке звучали вполне достижимыми. К сожалению, наш SLA не был достигнут во время выполнения cкользящего (rolling) обновления узлов, на которых разворачиваются приложения, и ниже я опишу почему это происходило.

Скользящее обновление узлов

В TransferWise мы сильно верим в непрерывную доставку нашего программного обеспечения и обычно выпускаем новые версии наших сервисов пару раз в день. Давайте рассмотрим пример простого непрерывного обновления сервиса и посмотрим, что происходит в процессе выпуска. Опять же, мы должны помнить, что:

Резервные реплики(Standby replicas)

Чтобы сократить время ребалансировки для Kafka Streams приложений, существует концепция резервных реплик, которые определяются в конфиге как num.standby.replicas. Резервные реплики являются копиями локального хранилища состояния. Этот механизм дает возможность реплицировать хранилище состояний из одного экземпляра Kafka Streams в другой. Когда поток Kafka Streams по какой-либо причине умирает, продолжительность процесса восстановления состояния может быть минимизирована. К сожалению, по причинам, которые я объясню ниже, даже резервные реплики не помогут при скользящем обновлении сервиса.

Предположим, у нас есть два экземпляра Kafka Streams на двух разных машинах: node-a и node-b. Для каждого из экземпляров Kafka Streams на этих 2 узлах указано num.standby.replicas = 1. При такой конфигурации каждый экземпляр Kafka Streams поддерживает свою копию хранилища на другом узле. Во время скользящем обновлении мы имеем следующую ситуацию:

Достижение цели: Высокая доступность с Kafka Streams

Несмотря на то, что клиентские библиотеки Kafka не предоставляют встроенную функциональность для упомянутой выше проблемы, существуют некоторые приемы, которые можно использовать для достижения высокой доступности кластера во время скользящего обновления. Идея, лежащая в основе резервных реплик, остается в силе, и наличие резервных машин, когда наступит подходящее время, является хорошим решением, которое мы используем для обеспечения высокой доступности в случае отказа инстансов.

Проблема с нашей первоначальной настройкой заключалась в том, что у нас была одна группа консьюмеров для всех команд на всех узлах. Теперь вместо одной группы консьюмеров у нас есть две, и вторая действует как «горячий» кластер. В проде узлы имеют специальную переменную CLUSTER_ID, которая добавляется в application.id инстансов Kafka Streams. Вот пример конфигурации Spring Boot application.yml:

В один момент времени только один из кластеров находится в активном режиме, соответственно резервный кластер не отправляет сообщения в реальном времени на нисходящие микросервисы. Во время выпуска релиза, активным становится резервный класетр, что позволяет выполнить скользящее обновление на первом кластере. Поскольку это совершенно другая группа консьюмеров, наши клиенты даже не замечают каких-либо нарушений в обработке, а последующие сервисы продолжают получать сообщения из недавно активного кластера. Одним из очевидных недостатков использования резервной группы консьюмеров являются дополнительные накладные расходы и потребление ресурсов, но, тем не менее, такая архитектура обеспечивает дополнительные гарантии, контроль и отказоустойчивость нашей системы потоковой обработки.

Помимо добавления дополнительного кластера, есть еще приемы, которые позволяют смягчить проблему с частой ребалансировкой.

Увеличение group.initial.rebalance.delay.ms

Количество времени в миллисекундах, на которое GroupCoordinator будет задерживать начальную ребалансировку консьюмер группы.

Например, если мы зададим в этой настройке 60000 миллисекунд, то при скользящем обновлении у нас может быть минутное окно для выпуска релиза. Если инстанс Kafka Streams успешно «перезапустится» в этом временном окне, ребалансировака не вызовится. Обратите внимание, что данные, за которые отвечал перезапускаемый инстанс Kafka Streams, будут по-прежнему недоступны до тех пор, пока узел не вернется в оперативный режим. Допустим, если перезагрузка инстанса занимает около восьми секунд, у вас будет восемь секунд простоя для данных, за которые отвечает этот инстанс.

Следует заметить, основной минус этой концепции заключается в том, что в случае сбоя узла вы получите дополнительную задержку в одну минуту при восстановлении с учетом текущей конфигурации.

Уменьшение размера сегмента в change-log топиках

Большая задержка при перебалансировке Kafka Stream связана с восстановлением хранилищ состояний из change-log топиков. Change-log топики являются сжатыми топиками, что позволяет хранить в топике последнюю запись по конкретному ключу. Я кратко опишу эту концепцию ниже.

Топики в Kafka Broker организованы в виде сегментов. Когда сегмент достигает настроенного порогового размера, создается новый сегмент, а предыдущий уплотняется. По умолчанию этот порог установлен на 1 ГБ. Как вы, возможно, знаете, основная структура данных, лежащая в основе топиков Kafka и их партиций, представляет собой структуру лога с упреждающей записью, то есть, когда сообщения передаются в топик, они всегда добавляются в последний «активный» сегмент, и уплотнение не происходит.
Поэтому большинство сохраняемых состояний хранилища в changelog всегда находятся в файле «активного сегмента» и никогда не уплотняются, что приводит к миллионам неуплотненных сообщений changelog. Для Kafka Streams это означает, что во время перебалансировки, когда инстанс Kafka Streams восстанавливает свое состояние из changelog топика, ему необходимо прочитать много избыточных записей из changelog топика. Учитывая, что хранилища состояний заботятся только о последнем состоянии, а не об истории, это время обработки тратится впустую. Уменьшение размера сегмента вызовет более агрессивное сжатие данных, поэтому новые инстансы приложений Kafka Streams могут восстанавливать состояние гораздо быстрее.

Источник

Сказочный портал