Потоковая обработка
Что такое потоковая обработка
Потоковая обработка — это непрерывная реакция на бесконечные последовательности событий (лог транзакций, клики, платежи, телеметрия), с минимальной задержкой и гарантией корректности состояний. В отличие от батча, где «берем все накопленное за период», поток обрабатывает данные по мере поступления, поддерживает состояние и учитывает время события.
Ключевые понятия
Событие (event) — неизменяемый факт с `event_time` и уникальным `event_id`.
Время события (event time) vs время обработки (processing time) — первое приходит из источника, второе — когда оператор реально увидел событие.
- Tumbling (неперекрывающиеся), Hopping/Sliding (с перекрытием), Session (разрывы по неактивности).
- Водяные знаки (watermarks) — оценка того, что «события до момента T уже пришли», позволяющая закрывать окна и ограничивать ожидание опоздавших данных.
- Запаздывающие данные (lateness) — события с `event_time` меньше текущего watermark; часто применяются правила дообработки.
- Состояние (state) — локальные таблицы/хранилища операторов (keyed state) для агрегатов, join’ов, дедупликации.
- Backpressure — давление при превышении пропускной способности downstream; управляется протоколом и буферами.
Архитектурные основы
1. Источник (source): брокер событий (Kafka/NATS/Pulsar), CDC из БД, очереди, файлы/лог-коллекторы.
2. Потоковый движок: вычисляет окна, агрегаты, джойны, паттерны (CEP), управляет состоянием и checkpoint’ами.
3. Приемник (sink): OLTP/OLAP БД, поисковый движок, кэш, топики, хранилища для витрин/отчетов.
4. Реестр схем: контроль эволюции payload и совместимости.
5. Наблюдаемость: метрики, трейсинг, логи, дашборды лага и водяных знаков.
Семантика времени и порядок
Всегда предпочитайте event time: это единственный инвариант при задержках и перебоях.
События могут приходить вне порядка; порядок гарантируется только в пределах ключа партиции.
- закрывать окна и эмитить результаты;
- ограничивать «сколько ждем» запаздывающие события (`allowed_lateness`).
- Для опоздавших событий используйте retractions/upserts: пересчет агрегатов и корректирующие события.
Состояние и надежность
Keyed state: данные агрегатов (суммы, счетчики, структуры для дедупа) шардингом распределены по ключам.
Checkpoint/Savepoint: периодические снимки состояния для восстановления; savepoint — управляемый снимок для миграций версии кода.
- транзакционным «прочитал-обработал-записал» (commit sink + позиция чтения);
- идемпотентными sinks (upsert/merge) + таблицы дедупа;
- версионированием агрегатов (optimistic concurrency).
Окна, агрегации, join’ы
Окна:- Tumbling: простые периодические отчеты (минутные, часовые).
- Hopping/Sliding: «скользящие» метрики (за 5 мин с шагом 1 мин).
- Session: естественно для пользовательских сессий и антифрода.
- Aggregations: sum/count/avg/approx-distinct (HyperLogLog), percentiles (TDigest/CKMS).
- Stream-Stream join: требует буферизации обеих сторон по ключу и времени, уважайте `allowed_skew`.
- Stream-Table join (KTable): присоединение справочника или текущего состояния (например, «активные лимиты пользователя»).
Работа с запаздывающими и дублирующимися данными
Дедупликация: по `event_id` или `(producer_id, sequence)`; храните «виденные» ключи с TTL ≥ окна повтора.
Late events: допускайте дообработку окна в течение `X` после закрытия (retractions/upserts).
Ложные дубликаты: корректируйте агрегаты идемпотентно и фиксируйте «ALREADY_APPLIED» в логах.
Масштабирование и производительность
Шардирование по ключу: обеспечивает параллелизм; следите за «горячими» ключами.
Backpressure: ограничивайте параллельность, используйте батчи и компрессию при публикации.
Водяные знаки: не ставьте слишком агрессивно — жесткие watermarks сокращают ожидание, но повышают долю late-обновлений.
Состояние: выбирайте формат (RocksDB/state store/в памяти) с учетом размера и паттернов доступа; чистите TTL.
Автомасштабирование: по лагу, CPU, размеру state, времени GC.
Надежность и перезапуски
Идемпотентный sink или транзакционный коммит с фиксацией офсета — основа корректности.
Повторная обработка после перезапуска допускается; эффект должен оставаться «ровно один раз».
DLQ/parking lot: отправляйте проблемные записи в отдельный поток с причинами; обеспечьте переобработку.
Наблюдаемость (что мерить)
Lag по источникам (по времени и по сообщению).
Watermark/current event time и доля late-событий.
Throughput/latency операторов, p95/p99 end-to-end.
State size/rocksdb I/O, частота checkpoint’ов/длительность.
DLQ rate, процент дедупликаций/ретраев.
CPU/GC/heap, время пауз.
Безопасность и комплаенс
Классификация данных: PII/PCI помечайте в схемах, храните минимум, шифруйте state и снапшоты.
Access control: отдельные ACL на топики/таблицы state и на sinks.
Ретенции: согласуются с юридическими требованиями (GDPR/право на забвение).
Аудит: логируйте `event_id`, `trace_id`, исход: `APPLIED/ALREADY_APPLIED/RETRIED`.
Паттерны внедрения
1. CDC → нормализация → события домена: не транслируйте сырые изменения БД, мапьте к понятным бизнес-фактам.
2. Outbox у продюсеров: факт транзакции + событие — в одной БД-транзакции.
3. Core vs Enriched: минимальный payload в критическом потоке, обогащения — асинхронно.
4. Replay-дружественность: проекции/витрины должны пересобираться из лога.
5. Idempotency by design: operation/event key, upsert-схемы, версии агрегатов.
Тестирование
Unit/Property-based: инварианты агрегатов и преобразований.
Stream tests: фиксированный поток событий с out-of-order и дубликатами → проверка окон и дедупа.
Golden windows: эталонные окна/агрегаты и допустимые поздние корректировки.
Fault-injection: падение между «записал эффект» и «коммитнул офсет».
Replay tests: пересборка витрины из начала лога = текущее состояние.
Стоимость и оптимизация
Окна и watermark влияют на задержку/ресурсы: чем длиннее окно и больше `allowed_lateness`, тем больше state.
Кодеки и компрессия: балансируйте CPU/сеть.
Batching на выходе: меньше сетевых вызовов и транзакций.
Фильтрация ранo («pushdown»): отбрасывайте лишнее как можно ближе к источнику.
Антипаттерны
Завязка на processing time там, где нужен event time → неверная аналитика.
Отсутствие идемпотентности в sink → двойные эффекты при рестартах.
Глобальные «мега-ключи»: один горячий раздел ломает параллелизм.
Сырые CDC как публичные события: утечка схем БД, хрупкость при эволюции.
Нет DLQ: «ядовитые» сообщения блокируют весь конвейер.
Фиксированная жесткая задержка вместо watermark: либо вечное ожидание, либо потери данных.
Примеры доменов
Платежи/финансы
Поток `payment.`, окна для антифрода (session + CEP), дедуп по `operation_id`.
Exactly-once эффект при разнесении в бухгалтерский ledger (upsert + версия).
Маркетинг/реклама
Sliding окна CTR/конверсий, Join кликов и показов с допуском `±Δt`, аггрегации для биддинга.
iGaming/онлайн-сервисы
Реал-тайм баланс/лимиты, миссии/ачивки (session окна), антифрод-паттерны и оповещения.
Мини-шаблоны (псевдокод)
Окно с водяными знаками и late-обновлениями
pseudo stream
.withEventTime(tsExtractor)
.withWatermark(maxAllowedLag=2m)
.window(TUMBLING, size=1m, allowedLateness=30s)
.keyBy(user_id)
.aggregate(sum, retractions=enable)
.sink (upsert_table )//idempotent upsert by (user_id, window_start)
Транзакционный sink с фиксацией офсета
pseudo begin tx upsert target_table using event, key=(k)
update consumer_offsets set pos=:pos where consumer=:c commit
Чек-лист продакшена
- Определены event time и стратегия watermark; выбраны окна и `allowed_lateness`.
- Идемпотентный sink или транзакционный коммит с офсетом.
- Реестр схем и режимы совместимости включены; аддитивная эволюция.
- Метрики: лаг, watermark, p95/p99, DLQ, размер state, длительность checkpoint.
- Тесты: out-of-order, дубликаты, перезапуски, replay.
- Политики PII/ретенции для state и снапшотов.
- План масштабирования и стратегии backpressure.
- Документация по договорам окон и корректировкам (late updates).
FAQ
Event time обязателен?
Если важна корректность метрик и согласованность, — да. Processing time подходит для техподсчетов/мониторинга, но искажает аналитику.
Нужен ли exactly-once?
Точечно: для критичных эффектов. Чаще достаточно at-least-once + идемпотентный sink.
Как выбирать окна?
Отталкивайтесь от бизнес-SLA: «за последние 5 минут» → hopping, «сессии пользователя» → session, «минутные отчеты» → tumbling.
Что делать с поздними данными?
Разрешать ограниченное `allowed_lateness` и эмитить корректировки (upsert/retract). Клиентская витрина должна уметь обновляться.
Итог
Потоковая обработка — это не только низкая задержка, но и дисциплина времени, состояния и контрактов. Правильный выбор event time, окон и водяных знаков, плюс идемпотентные эффекты, наблюдаемость и тесты делают конвейер надежным, воспроизводимым и экономичным — и дают бизнесу решения «здесь и сейчас», а не «через ночь».