GH GambleHub

Потоковая обработка

Что такое потоковая обработка

Потоковая обработка — это непрерывная реакция на бесконечные последовательности событий (лог транзакций, клики, платежи, телеметрия), с минимальной задержкой и гарантией корректности состояний. В отличие от батча, где «берем все накопленное за период», поток обрабатывает данные по мере поступления, поддерживает состояние и учитывает время события.

Ключевые понятия

Событие (event) — неизменяемый факт с `event_time` и уникальным `event_id`.
Время события (event time) vs время обработки (processing time) — первое приходит из источника, второе — когда оператор реально увидел событие.

Окна (windows) — группировка событий по времени:
  • Tumbling (неперекрывающиеся), Hopping/Sliding (с перекрытием), Session (разрывы по неактивности).
  • Водяные знаки (watermarks) — оценка того, что «события до момента T уже пришли», позволяющая закрывать окна и ограничивать ожидание опоздавших данных.
  • Запаздывающие данные (lateness) — события с `event_time` меньше текущего watermark; часто применяются правила дообработки.
  • Состояние (state) — локальные таблицы/хранилища операторов (keyed state) для агрегатов, join’ов, дедупликации.
  • Backpressure — давление при превышении пропускной способности downstream; управляется протоколом и буферами.

Архитектурные основы

1. Источник (source): брокер событий (Kafka/NATS/Pulsar), CDC из БД, очереди, файлы/лог-коллекторы.
2. Потоковый движок: вычисляет окна, агрегаты, джойны, паттерны (CEP), управляет состоянием и checkpoint’ами.
3. Приемник (sink): OLTP/OLAP БД, поисковый движок, кэш, топики, хранилища для витрин/отчетов.
4. Реестр схем: контроль эволюции payload и совместимости.
5. Наблюдаемость: метрики, трейсинг, логи, дашборды лага и водяных знаков.

Семантика времени и порядок

Всегда предпочитайте event time: это единственный инвариант при задержках и перебоях.
События могут приходить вне порядка; порядок гарантируется только в пределах ключа партиции.

Watermarks позволяют:
  • закрывать окна и эмитить результаты;
  • ограничивать «сколько ждем» запаздывающие события (`allowed_lateness`).
  • Для опоздавших событий используйте retractions/upserts: пересчет агрегатов и корректирующие события.

Состояние и надежность

Keyed state: данные агрегатов (суммы, счетчики, структуры для дедупа) шардингом распределены по ключам.
Checkpoint/Savepoint: периодические снимки состояния для восстановления; savepoint — управляемый снимок для миграций версии кода.

Exactly-once по эффекту достигается:
  • транзакционным «прочитал-обработал-записал» (commit sink + позиция чтения);
  • идемпотентными sinks (upsert/merge) + таблицы дедупа;
  • версионированием агрегатов (optimistic concurrency).

Окна, агрегации, join’ы

Окна:
  • Tumbling: простые периодические отчеты (минутные, часовые).
  • Hopping/Sliding: «скользящие» метрики (за 5 мин с шагом 1 мин).
  • Session: естественно для пользовательских сессий и антифрода.
  • Aggregations: sum/count/avg/approx-distinct (HyperLogLog), percentiles (TDigest/CKMS).
  • Stream-Stream join: требует буферизации обеих сторон по ключу и времени, уважайте `allowed_skew`.
  • Stream-Table join (KTable): присоединение справочника или текущего состояния (например, «активные лимиты пользователя»).

Работа с запаздывающими и дублирующимися данными

Дедупликация: по `event_id` или `(producer_id, sequence)`; храните «виденные» ключи с TTL ≥ окна повтора.
Late events: допускайте дообработку окна в течение `X` после закрытия (retractions/upserts).
Ложные дубликаты: корректируйте агрегаты идемпотентно и фиксируйте «ALREADY_APPLIED» в логах.

Масштабирование и производительность

Шардирование по ключу: обеспечивает параллелизм; следите за «горячими» ключами.
Backpressure: ограничивайте параллельность, используйте батчи и компрессию при публикации.
Водяные знаки: не ставьте слишком агрессивно — жесткие watermarks сокращают ожидание, но повышают долю late-обновлений.
Состояние: выбирайте формат (RocksDB/state store/в памяти) с учетом размера и паттернов доступа; чистите TTL.
Автомасштабирование: по лагу, CPU, размеру state, времени GC.

Надежность и перезапуски

Идемпотентный sink или транзакционный коммит с фиксацией офсета — основа корректности.
Повторная обработка после перезапуска допускается; эффект должен оставаться «ровно один раз».
DLQ/parking lot: отправляйте проблемные записи в отдельный поток с причинами; обеспечьте переобработку.

Наблюдаемость (что мерить)

Lag по источникам (по времени и по сообщению).
Watermark/current event time и доля late-событий.
Throughput/latency операторов, p95/p99 end-to-end.
State size/rocksdb I/O, частота checkpoint’ов/длительность.
DLQ rate, процент дедупликаций/ретраев.
CPU/GC/heap, время пауз.

Безопасность и комплаенс

Классификация данных: PII/PCI помечайте в схемах, храните минимум, шифруйте state и снапшоты.
Access control: отдельные ACL на топики/таблицы state и на sinks.
Ретенции: согласуются с юридическими требованиями (GDPR/право на забвение).
Аудит: логируйте `event_id`, `trace_id`, исход: `APPLIED/ALREADY_APPLIED/RETRIED`.

Паттерны внедрения

1. CDC → нормализация → события домена: не транслируйте сырые изменения БД, мапьте к понятным бизнес-фактам.
2. Outbox у продюсеров: факт транзакции + событие — в одной БД-транзакции.
3. Core vs Enriched: минимальный payload в критическом потоке, обогащения — асинхронно.
4. Replay-дружественность: проекции/витрины должны пересобираться из лога.
5. Idempotency by design: operation/event key, upsert-схемы, версии агрегатов.

Тестирование

Unit/Property-based: инварианты агрегатов и преобразований.
Stream tests: фиксированный поток событий с out-of-order и дубликатами → проверка окон и дедупа.
Golden windows: эталонные окна/агрегаты и допустимые поздние корректировки.
Fault-injection: падение между «записал эффект» и «коммитнул офсет».
Replay tests: пересборка витрины из начала лога = текущее состояние.

Стоимость и оптимизация

Окна и watermark влияют на задержку/ресурсы: чем длиннее окно и больше `allowed_lateness`, тем больше state.
Кодеки и компрессия: балансируйте CPU/сеть.
Batching на выходе: меньше сетевых вызовов и транзакций.
Фильтрация ранo («pushdown»): отбрасывайте лишнее как можно ближе к источнику.

Антипаттерны

Завязка на processing time там, где нужен event time → неверная аналитика.
Отсутствие идемпотентности в sink → двойные эффекты при рестартах.
Глобальные «мега-ключи»: один горячий раздел ломает параллелизм.
Сырые CDC как публичные события: утечка схем БД, хрупкость при эволюции.
Нет DLQ: «ядовитые» сообщения блокируют весь конвейер.
Фиксированная жесткая задержка вместо watermark: либо вечное ожидание, либо потери данных.

Примеры доменов

Платежи/финансы

Поток `payment.`, окна для антифрода (session + CEP), дедуп по `operation_id`.
Exactly-once эффект при разнесении в бухгалтерский ledger (upsert + версия).

Маркетинг/реклама

Sliding окна CTR/конверсий, Join кликов и показов с допуском `±Δt`, аггрегации для биддинга.

iGaming/онлайн-сервисы

Реал-тайм баланс/лимиты, миссии/ачивки (session окна), антифрод-паттерны и оповещения.

Мини-шаблоны (псевдокод)

Окно с водяными знаками и late-обновлениями

pseudo stream
.withEventTime(tsExtractor)
.withWatermark(maxAllowedLag=2m)
.window(TUMBLING, size=1m, allowedLateness=30s)
.keyBy(user_id)
.aggregate(sum, retractions=enable)
.sink (upsert_table )//idempotent upsert by (user_id, window_start)

Транзакционный sink с фиксацией офсета

pseudo begin tx upsert target_table using event, key=(k)
update consumer_offsets set pos=:pos where consumer=:c commit

Чек-лист продакшена

  • Определены event time и стратегия watermark; выбраны окна и `allowed_lateness`.
  • Идемпотентный sink или транзакционный коммит с офсетом.
  • Реестр схем и режимы совместимости включены; аддитивная эволюция.
  • Метрики: лаг, watermark, p95/p99, DLQ, размер state, длительность checkpoint.
  • Тесты: out-of-order, дубликаты, перезапуски, replay.
  • Политики PII/ретенции для state и снапшотов.
  • План масштабирования и стратегии backpressure.
  • Документация по договорам окон и корректировкам (late updates).

FAQ

Event time обязателен?
Если важна корректность метрик и согласованность, — да. Processing time подходит для техподсчетов/мониторинга, но искажает аналитику.

Нужен ли exactly-once?
Точечно: для критичных эффектов. Чаще достаточно at-least-once + идемпотентный sink.

Как выбирать окна?
Отталкивайтесь от бизнес-SLA: «за последние 5 минут» → hopping, «сессии пользователя» → session, «минутные отчеты» → tumbling.

Что делать с поздними данными?
Разрешать ограниченное `allowed_lateness` и эмитить корректировки (upsert/retract). Клиентская витрина должна уметь обновляться.

Итог

Потоковая обработка — это не только низкая задержка, но и дисциплина времени, состояния и контрактов. Правильный выбор event time, окон и водяных знаков, плюс идемпотентные эффекты, наблюдаемость и тесты делают конвейер надежным, воспроизводимым и экономичным — и дают бизнесу решения «здесь и сейчас», а не «через ночь».

Contact

Свяжитесь с нами

Обращайтесь по любым вопросам или за поддержкой.Мы всегда готовы помочь!

Начать интеграцию

Email — обязателен. Telegram или WhatsApp — по желанию.

Ваше имя необязательно
Email необязательно
Тема необязательно
Сообщение необязательно
Telegram необязательно
@
Если укажете Telegram — мы ответим и там, в дополнение к Email.
WhatsApp необязательно
Формат: +код страны и номер (например, +380XXXXXXXXX).

Нажимая кнопку, вы соглашаетесь на обработку данных.