Потокова обробка
Що таке потокова обробка
Потокова обробка - це безперервна реакція на нескінченні послідовності подій (лог транзакцій, кліки, платежі, телеметрія), з мінімальною затримкою і гарантією коректності станів. На відміну від батча, де «беремо все накопичене за період», потік обробляє дані в міру надходження, підтримує стан і враховує час події.
Ключові поняття
Подія (event) - незмінний факт з'event _ time'і унікальним'event _ id'.
Час події (event time) vs час обробки (processing time) - перше приходить з джерела, друге - коли оператор реально побачив подію.
- Tumbling (неперекриваються), Hopping/Sliding (з перекриттям), Session (розриви по неактивності).
- Водяні знаки (watermarks) - оцінка того, що «події до моменту T вже прийшли», що дозволяє закривати вікна і обмежувати очікування спізнілих даних.
- Запізнюючі дані (lateness) - події з'event _ time'менше поточного watermark; часто застосовуються правила дообробки.
- Стан (state) - локальні таблиці/сховища операторів (keyed state) для агрегатів, join'ів, дедуплікації.
- Backpressure - тиск при перевищенні пропускної здатності downstream; управляється протоколом і буферами.
Архітектурні основи
1. Джерело (source): брокер подій (Kafka/NATS/Pulsar), CDC з БД, черги, файли/лог-колектори.
2. Потоковий рушій: обчислює вікна, агрегати, джойни, патерни (CEP), управляє станом і checkpoint'ами.
3. Приймач (sink): OLTP/OLAP БД, пошуковий рушій, кеш, топіки, сховища для вітрин/звітів.
4. Реєстр схем: контроль еволюції payload і сумісності.
5. Спостережуваність: метрики, трейсинг, логи, дашборди лага і водяних знаків.
Семантика часу і порядок
Завжди віддайте перевагу event time: це єдиний інваріант при затримках і перебоях.
Події можуть приходити поза порядком; порядок гарантується тільки в межах ключа партії.
- закривати вікна і емітити результати;
- обмежувати «скільки чекаємо» запізнілі події ('allowed _ lateness').
- Для спізнілих подій використовуйте retractions/upserts: перерахунок агрегатів і коригувальні події.
Стан і надійність
Keyed state: дані агрегатів (суми, лічильники, структури для дедупа) шардингом розподілені по ключах.
Checkpoint/Savepoint: періодичні знімки стану для відновлення; savepoint - керований знімок для міграцій версії коду.
- транзакційним «прочитав-обробив-записав» (commit sink + позиція читання);
- ідемпотентними sinks (upsert/merge) + таблиці дедупа;
- версіонування агрегатів (optimistic concurrency).
Вікна, агрегації, join'и
Вікна:- Tumbling: прості періодичні звіти (хвилинні, годинні).
- Hopping/Sliding: «ковзаючі» метрики (за 5 хв з кроком 1 хв).
- Session: природно для призначених для користувача сесій і антифроду.
- Aggregations: sum/count/avg/approx-distinct (HyperLogLog), percentiles (TDigest/CKMS).
- Stream-Stream join: вимагає буферизації обох сторін по ключу і часу, поважайте'allowed _ skew'.
- Stream-Table join (KTable): приєднання довідника або поточного стану (наприклад, «активні ліміти користувача»).
Робота з запізнілими і дублюються даними
Дедуплікація: по `event_id` или `(producer_id, sequence)`; зберігайте «бачені» ключі з TTL ≥ вікна повтору.
Late events: допускайте доопрацювання вікна протягом'X'після закриття (retractions/upserts).
Помилкові дублікати: коригуйте агрегати ідемпотентно і фіксуйте «ALREADY_APPLIED» в логах.
Масштабування та продуктивність
Шардування за ключем: забезпечує паралелізм; слідкуйте за «гарячими» ключами.
Backpressure: обмежуйте паралельність, використовуйте батчі і компресію при публікації.
Водяні знаки: не ставте занадто агресивно - жорсткі watermarks скорочують очікування, але підвищують частку late-оновлень.
Стан: вибирайте формат (RocksDB/state store/в пам'яті) з урахуванням розміру і патернів доступу; чистіть TTL.
Автомасштабування: за лагом, CPU, розміром state, часом GC.
Надійність і перезапуски
Ідемпотентний sink або транзакційний коміт з фіксацією офсету - основа коректності.
Повторна обробка після перезапуску допускається; ефект повинен залишатися «рівно один раз».
DLQ/parking lot: відправляйте проблемні записи в окремий потік з причинами; забезпечте переобробку.
Спостережуваність (що міряти)
Lag за джерелами (за часом і за повідомленням).
Watermark/current event time і частка late-подій.
Throughput/latency операторів, p95/p99 end-to-end.
State size/rocksdb I/O, частота checkpoint'ів/тривалість.
DLQ rate, відсоток дедуплікацій/ретраїв.
CPU/GC/heap, час пауз.
Безпека та комплаєнс
Класифікація даних: PII/PCI позначайте в схемах, зберігайте мінімум, шифруйте state і снапшоти.
Access control: окремі ACL на топіки/таблиці state і на sinks.
Ретенції: узгоджуються з юридичними вимогами (GDPR/право на забуття).
Аудит: логуйте'event _ id','trace _ id', результат: `APPLIED/ALREADY_APPLIED/RETRIED`.
Патерни впровадження
1. CDC → нормалізація → події домену: не транслюйте сирі зміни БД, мапте до зрозумілих бізнес-фактів.
2. Outbox у продюсерів: факт транзакції + подія - в одній БД-транзакції.
3. Core vs Enriched: мінімальний payload в критичному потоці, збагачення - асинхронно.
4. Replay-дружність: проекції/вітрини повинні пересобиратися з логу.
5. Idempotency by design: operation/event key, upsert-схеми, версії агрегатів.
Тестування
Unit/Property-based: інваріанти агрегатів і перетворень.
Stream tests: фіксований потік подій з out-of-order і дублікатами → перевірка вікон і дедупа.
Golden windows: еталонні вікна/агрегати та допустимі пізні коригування.
Fault-injection: падіння між «записав ефект» і «коммітнув офсет».
Replay tests: перезбирання вітрини з початку логу = поточний стан.
Вартість та оптимізація
Вікна та watermark впливають на затримку/ресурси: чим довше вікно і більше'allowed _ lateness', тим більше state.
Кодеки і компресія: балансуйте CPU/мережу.
Batching на виході: менше мережевих викликів і транзакцій.
Фільтрація рано («pushdown»): відкидайте зайве якомога ближче до джерела.
Антипатерни
Зав'язка на processing time там, де потрібен event time → невірна аналітика.
Відсутність ідемпотентності в sink → подвійні ефекти при рестартах.
Глобальні «мега-ключі»: один гарячий розділ ламає паралелізм.
Сирі CDC як публічні події: витік схем БД, крихкість при еволюції.
Немає DLQ: «отруйні» повідомлення блокують весь конвеєр.
Фіксована жорстка затримка замість watermark: або вічне очікування, або втрати даних.
Приклади доменів
Платежі/фінанси
Потік'payment.', вікна для антифроду (session + CEP), дедуп по'operation _ id'.
Exactly-once ефект при рознесенні в бухгалтерський ledger (upsert + версія).
Маркетинг/реклама
Sliding вікна CTR/конверсій, Join кліків і показів з допуском'± Δ t', аггрегації для біддингу.
iGaming/онлайн-сервіси
Реал-тайм баланс/ліміти, місії/ачування (session вікна), антифрод-патерни і оповіщення.
Міні-шаблони (псевдокод)
Вікно з водяними знаками та late-оновленнями
pseudo stream
.withEventTime(tsExtractor)
.withWatermark(maxAllowedLag=2m)
.window(TUMBLING, size=1m, allowedLateness=30s)
.keyBy(user_id)
.aggregate(sum, retractions=enable)
.sink (upsert_table )//idempotent upsert by (user_id, window_start)
Транзакційний sink з фіксацією офсету
pseudo begin tx upsert target_table using event, key=(k)
update consumer_offsets set pos=:pos where consumer=:c commit
Чек-лист продакшену
- Визначені event time і стратегія watermark; вибрані вікна і'allowed _ lateness'.
- Ідемпотентний sink або транзакційний коміт з офсетом.
- Реєстр схем і режими сумісності включені; адитивна еволюція.
- Метрики: лаг, watermark, p95/p99, DLQ, розмір state, тривалість checkpoint.
- Тести: out-of-order, дублікати, перезапуски, replay.
- Політики PII/ретенції для state і снапшотів.
- План масштабування та стратегії backpressure.
- Документація за договорами вікон і коригувань (late updates).
FAQ
Event time обов'язковий?
Якщо важлива коректність метрик і узгодженість, - так. Processing time підходить для техпідрахунків/моніторингу, але спотворює аналітику.
Чи потрібен exactly-once?
Точково: для критичних ефектів. Частіше досить at-least-once + ідемпотентний sink.
Як вибрати вікна?
Відштовхуйтесь від бізнес-SLA: «за останні 5 хвилин» → hopping, «сесії користувача» → session, «хвилинні звіти» → tumbling.
Що робити з пізніми даними?
Дозволяти обмежене'allowed _ lateness'і емітити коригування (upsert/retract). Клієнтська вітрина повинна вміти оновлюватися.
Підсумок
Потокова обробка - це не тільки низька затримка, але і дисципліна часу, стану і контрактів. Правильний вибір event time, вікон і водяних знаків, плюс ідемпотентні ефекти, спостережуваність і тести роблять конвеєр надійним, відтворюваним і економічним - і дають бізнесу рішення «тут і зараз», а не «через ніч».