GH GambleHub

Потокова обробка

Що таке потокова обробка

Потокова обробка - це безперервна реакція на нескінченні послідовності подій (лог транзакцій, кліки, платежі, телеметрія), з мінімальною затримкою і гарантією коректності станів. На відміну від батча, де «беремо все накопичене за період», потік обробляє дані в міру надходження, підтримує стан і враховує час події.

Ключові поняття

Подія (event) - незмінний факт з'event _ time'і унікальним'event _ id'.
Час події (event time) vs час обробки (processing time) - перше приходить з джерела, друге - коли оператор реально побачив подію.

Вікна (windows) - групування подій за часом:
  • Tumbling (неперекриваються), Hopping/Sliding (з перекриттям), Session (розриви по неактивності).
  • Водяні знаки (watermarks) - оцінка того, що «події до моменту T вже прийшли», що дозволяє закривати вікна і обмежувати очікування спізнілих даних.
  • Запізнюючі дані (lateness) - події з'event _ time'менше поточного watermark; часто застосовуються правила дообробки.
  • Стан (state) - локальні таблиці/сховища операторів (keyed state) для агрегатів, join'ів, дедуплікації.
  • Backpressure - тиск при перевищенні пропускної здатності downstream; управляється протоколом і буферами.

Архітектурні основи

1. Джерело (source): брокер подій (Kafka/NATS/Pulsar), CDC з БД, черги, файли/лог-колектори.
2. Потоковий рушій: обчислює вікна, агрегати, джойни, патерни (CEP), управляє станом і checkpoint'ами.
3. Приймач (sink): OLTP/OLAP БД, пошуковий рушій, кеш, топіки, сховища для вітрин/звітів.
4. Реєстр схем: контроль еволюції payload і сумісності.
5. Спостережуваність: метрики, трейсинг, логи, дашборди лага і водяних знаків.

Семантика часу і порядок

Завжди віддайте перевагу event time: це єдиний інваріант при затримках і перебоях.
Події можуть приходити поза порядком; порядок гарантується тільки в межах ключа партії.

Watermarks дозволяють:
  • закривати вікна і емітити результати;
  • обмежувати «скільки чекаємо» запізнілі події ('allowed _ lateness').
  • Для спізнілих подій використовуйте retractions/upserts: перерахунок агрегатів і коригувальні події.

Стан і надійність

Keyed state: дані агрегатів (суми, лічильники, структури для дедупа) шардингом розподілені по ключах.
Checkpoint/Savepoint: періодичні знімки стану для відновлення; savepoint - керований знімок для міграцій версії коду.

Exactly-once за ефектом досягається:
  • транзакційним «прочитав-обробив-записав» (commit sink + позиція читання);
  • ідемпотентними sinks (upsert/merge) + таблиці дедупа;
  • версіонування агрегатів (optimistic concurrency).

Вікна, агрегації, join'и

Вікна:
  • Tumbling: прості періодичні звіти (хвилинні, годинні).
  • Hopping/Sliding: «ковзаючі» метрики (за 5 хв з кроком 1 хв).
  • Session: природно для призначених для користувача сесій і антифроду.
  • Aggregations: sum/count/avg/approx-distinct (HyperLogLog), percentiles (TDigest/CKMS).
  • Stream-Stream join: вимагає буферизації обох сторін по ключу і часу, поважайте'allowed _ skew'.
  • Stream-Table join (KTable): приєднання довідника або поточного стану (наприклад, «активні ліміти користувача»).

Робота з запізнілими і дублюються даними

Дедуплікація: по `event_id` или `(producer_id, sequence)`; зберігайте «бачені» ключі з TTL ≥ вікна повтору.
Late events: допускайте доопрацювання вікна протягом'X'після закриття (retractions/upserts).
Помилкові дублікати: коригуйте агрегати ідемпотентно і фіксуйте «ALREADY_APPLIED» в логах.

Масштабування та продуктивність

Шардування за ключем: забезпечує паралелізм; слідкуйте за «гарячими» ключами.
Backpressure: обмежуйте паралельність, використовуйте батчі і компресію при публікації.
Водяні знаки: не ставте занадто агресивно - жорсткі watermarks скорочують очікування, але підвищують частку late-оновлень.
Стан: вибирайте формат (RocksDB/state store/в пам'яті) з урахуванням розміру і патернів доступу; чистіть TTL.
Автомасштабування: за лагом, CPU, розміром state, часом GC.

Надійність і перезапуски

Ідемпотентний sink або транзакційний коміт з фіксацією офсету - основа коректності.
Повторна обробка після перезапуску допускається; ефект повинен залишатися «рівно один раз».
DLQ/parking lot: відправляйте проблемні записи в окремий потік з причинами; забезпечте переобробку.

Спостережуваність (що міряти)

Lag за джерелами (за часом і за повідомленням).
Watermark/current event time і частка late-подій.
Throughput/latency операторів, p95/p99 end-to-end.
State size/rocksdb I/O, частота checkpoint'ів/тривалість.
DLQ rate, відсоток дедуплікацій/ретраїв.
CPU/GC/heap, час пауз.

Безпека та комплаєнс

Класифікація даних: PII/PCI позначайте в схемах, зберігайте мінімум, шифруйте state і снапшоти.
Access control: окремі ACL на топіки/таблиці state і на sinks.
Ретенції: узгоджуються з юридичними вимогами (GDPR/право на забуття).
Аудит: логуйте'event _ id','trace _ id', результат: `APPLIED/ALREADY_APPLIED/RETRIED`.

Патерни впровадження

1. CDC → нормалізація → події домену: не транслюйте сирі зміни БД, мапте до зрозумілих бізнес-фактів.
2. Outbox у продюсерів: факт транзакції + подія - в одній БД-транзакції.
3. Core vs Enriched: мінімальний payload в критичному потоці, збагачення - асинхронно.
4. Replay-дружність: проекції/вітрини повинні пересобиратися з логу.
5. Idempotency by design: operation/event key, upsert-схеми, версії агрегатів.

Тестування

Unit/Property-based: інваріанти агрегатів і перетворень.
Stream tests: фіксований потік подій з out-of-order і дублікатами → перевірка вікон і дедупа.
Golden windows: еталонні вікна/агрегати та допустимі пізні коригування.
Fault-injection: падіння між «записав ефект» і «коммітнув офсет».
Replay tests: перезбирання вітрини з початку логу = поточний стан.

Вартість та оптимізація

Вікна та watermark впливають на затримку/ресурси: чим довше вікно і більше'allowed _ lateness', тим більше state.
Кодеки і компресія: балансуйте CPU/мережу.
Batching на виході: менше мережевих викликів і транзакцій.
Фільтрація рано («pushdown»): відкидайте зайве якомога ближче до джерела.

Антипатерни

Зав'язка на processing time там, де потрібен event time → невірна аналітика.
Відсутність ідемпотентності в sink → подвійні ефекти при рестартах.
Глобальні «мега-ключі»: один гарячий розділ ламає паралелізм.
Сирі CDC як публічні події: витік схем БД, крихкість при еволюції.
Немає DLQ: «отруйні» повідомлення блокують весь конвеєр.
Фіксована жорстка затримка замість watermark: або вічне очікування, або втрати даних.

Приклади доменів

Платежі/фінанси

Потік'payment.', вікна для антифроду (session + CEP), дедуп по'operation _ id'.
Exactly-once ефект при рознесенні в бухгалтерський ledger (upsert + версія).

Маркетинг/реклама

Sliding вікна CTR/конверсій, Join кліків і показів з допуском'± Δ t', аггрегації для біддингу.

iGaming/онлайн-сервіси

Реал-тайм баланс/ліміти, місії/ачування (session вікна), антифрод-патерни і оповіщення.

Міні-шаблони (псевдокод)

Вікно з водяними знаками та late-оновленнями

pseudo stream
.withEventTime(tsExtractor)
.withWatermark(maxAllowedLag=2m)
.window(TUMBLING, size=1m, allowedLateness=30s)
.keyBy(user_id)
.aggregate(sum, retractions=enable)
.sink (upsert_table )//idempotent upsert by (user_id, window_start)

Транзакційний sink з фіксацією офсету

pseudo begin tx upsert target_table using event, key=(k)
update consumer_offsets set pos=:pos where consumer=:c commit

Чек-лист продакшену

  • Визначені event time і стратегія watermark; вибрані вікна і'allowed _ lateness'.
  • Ідемпотентний sink або транзакційний коміт з офсетом.
  • Реєстр схем і режими сумісності включені; адитивна еволюція.
  • Метрики: лаг, watermark, p95/p99, DLQ, розмір state, тривалість checkpoint.
  • Тести: out-of-order, дублікати, перезапуски, replay.
  • Політики PII/ретенції для state і снапшотів.
  • План масштабування та стратегії backpressure.
  • Документація за договорами вікон і коригувань (late updates).

FAQ

Event time обов'язковий?
Якщо важлива коректність метрик і узгодженість, - так. Processing time підходить для техпідрахунків/моніторингу, але спотворює аналітику.

Чи потрібен exactly-once?
Точково: для критичних ефектів. Частіше досить at-least-once + ідемпотентний sink.

Як вибрати вікна?
Відштовхуйтесь від бізнес-SLA: «за останні 5 хвилин» → hopping, «сесії користувача» → session, «хвилинні звіти» → tumbling.

Що робити з пізніми даними?
Дозволяти обмежене'allowed _ lateness'і емітити коригування (upsert/retract). Клієнтська вітрина повинна вміти оновлюватися.

Підсумок

Потокова обробка - це не тільки низька затримка, але і дисципліна часу, стану і контрактів. Правильний вибір event time, вікон і водяних знаків, плюс ідемпотентні ефекти, спостережуваність і тести роблять конвеєр надійним, відтворюваним і економічним - і дають бізнесу рішення «тут і зараз», а не «через ніч».

Contact

Зв’яжіться з нами

Звертайтеся з будь-яких питань або за підтримкою.Ми завжди готові допомогти!

Розпочати інтеграцію

Email — обов’язковий. Telegram або WhatsApp — за бажанням.

Ваше ім’я необов’язково
Email необов’язково
Тема необов’язково
Повідомлення необов’язково
Telegram необов’язково
@
Якщо ви вкажете Telegram — ми відповімо й там, додатково до Email.
WhatsApp необов’язково
Формат: +код країни та номер (наприклад, +380XXXXXXXXX).

Натискаючи кнопку, ви погоджуєтесь на обробку даних.