Ishlov berish
Oqimli qayta ishlash nima
Oqimli qayta ishlash - bu holatlarning eng kam kechikishi va to’g’riligini kafolatlagan holda, hodisalarning cheksiz ketma-ketligiga (tranzaksiyalar, bosishlar, to’lovlar, telemetriya) uzluksiz munosabat. Batchdan farqli o’laroq, «davr mobaynida to’plangan barcha narsalarni olamiz», oqim ma’lumotlarni tushishiga qarab qayta ishlaydi, holatini saqlaydi va voqeaning vaqtini hisobga oladi.
Asosiy tushunchalar
Hodisa (event) -’event _ time’va noyob’event _ id’bilan oʻzgarmas fakt.
Hodisa vaqti (event time) vs qayta ishlash vaqti (processing time) - birinchisi manbadan keladi, ikkinchisi - operator hodisani haqiqatda koʻrganda.
- Tumbling (yopilmaydigan), Hopping/Sliding (yopiladigan), Session (harakatsizligi bo’yicha uzilishlar).
- Suv belgilari (watermarks) - oynalarni yopish va kechiktirilgan ma’lumotlarni kutishni cheklash imkonini beruvchi «T vaqtigacha bo’lgan voqealar allaqachon kelganligini» baholash.
- Kechiktirilgan ma’lumotlar (lateness) -’event _ time’bilan bog’liq voqealar joriy watermarkdan kam; ko’pincha qayta ishlash qoidalari qo’llaniladi.
- Holat (state) - agregatlar, join’lar, deduplikatsiyalar uchun operatorlarning lokal jadvallari/omborlari (keyed state).
- Backpressure - downstream o’tkazish qobiliyati oshganda bosim; bayonnoma va buferlar bilan boshqariladi.
Arxitektura asoslari
1. Manba (source): voqealar brokeri (Kafka/NATS/Pulsar), DBdan CDC, navbatlar, fayllar/log-kollektorlar.
2. Oqim dvigateli: oynalar, agregatlar, joylar, patternlarni (CEP) hisoblaydi, holatni va checkpoint’larni boshqaradi.
3. Qabul qiluvchi (sink): OLTP/OLAP DB, qidiruv dvigateli, kesh, topiklar, vitrinalar/hisobotlar uchun saqlash joylari.
4. Sxemalar reyestri: payload evolyutsiyasi va muvofiqligini nazorat qilish.
5. Kuzatish darajasi: metriklar, treysing, loglar, laglar va suv belgilarining dashbordlari.
Vaqt semantikasi va tartibi
Har doim tadbir vaqtini afzal ko’ring: bu kechikishlar va uzilishlarda yagona o’zgaruvchidir.
Voqealar tartibdan tashqarida kelishi mumkin; tartib faqat partiya kaliti doirasida kafolatlanadi.
- derazalarni yopish va natijalarni chiqarish;
- kechikayotgan voqealarni qancha kutayotganimizni cheklash (’allowed _ lateness’).
- Kechiktirilgan hodisalar uchun retractions/upserts: agregatlar va tuzatish hodisalarini qayta hisoblash.
Holati va ishonchliligi
Keyed state: ushbu agregatlar (summalar, hisoblagichlar, dedup uchun tuzilmalar) kalitlar boʻyicha taqsimlangan.
Checkpoint/Savepoint: tiklash uchun davriy suratlar; savepoint - kod versiyasi migratsiyasi uchun boshqariladigan rasm.
- tranzaksion «o’qidi-ishladi-yozdi» (commit sink + o’qish pozitsiyasi);
- idempotent sinks (upsert/merge) + dedup jadvallari;
- agregatlarni versiyalash (optimistic concurrency).
Derazalar, agregatsiyalar, join’s
Oynalar:- Tumbling: oddiy davriy hisobotlar (daqiqali, soatlik).
- Hopping/Sliding: «sirpanuvchi» metriklar (5 daqiqada 1 daqiqa qadam bilan).
- Session: foydalanuvchi sessiyalari va antifrod uchun tabiiy.
- Aggregations: sum/count/avg/approx-distinct (HyperLogLog), percentiles (TDigest/CKMS).
- Stream-Stream join:’allowed _ skew’ni hurmat qiling.
- Stream-Table join (KTable): maʼlumotnomani yoki joriy holatni ulash (masalan, «foydalanuvchining aktiv chegaralari»).
Kechiktirilgan va takrorlanadigan ma’lumotlar bilan ishlash
Deduplikatsiya:’event _ id’yoki’(producer_id, sequence)’; TTL bilan «koʻrilgan» kalitlarni takrorlash oynasi ≥ saqlang.
Late events: yopilgandan keyin’X’oynani qayta ishlashga ruxsat bering (retractions/upserts).
Soxta dublikatlar: agregatlarni idempotent qilib tuzating va «ALREADY_APPLIED» ni loglarga yozing.
Kattalashtirish va ishlash
Kalit bo’yicha shardalash: parallellikni ta’minlaydi; «issiq» kalitlarni kuzatib boring.
Backpressure: parallellikni cheklang, chop etishda batchi va kompresssiyadan foydalaning.
Suv belgilari: haddan tashqari tajovuzkor boʻlmang - qattiq watermarks kutishni qisqartiradi, lekin late yangilanishlarning ulushini oshiradi.
Holat: ulanish oʻlchami va patternlarini hisobga olgan holda (RocksDB/state store/xotira) formatini tanlang; TTLni tozalang.
Avtomashtirma: lag, CPU, state oʻlchami, GC vaqti boʻyicha.
Ishonchlilik va qayta ishga tushirish
Idempotent sink yoki ofset fiksatsiyasi bilan tranzaksion kommit - to’g "rilik asosi.
Qayta ishga tushirilgandan keyin qayta ishlashga yo’l qo’yiladi; effekt «aynan bir marta» qolishi kerak.
DLQ/parking lot: muammoli yozuvlarni sabablari bilan alohida oqimga yuboring; qayta ishlashni ta’minlang.
Kuzatish darajasi (nima o’lchash kerak)
Manbalar boʻyicha Lag (vaqt va xabar boʻyicha).
Watermark/current event time va late hodisalar ulushi.
Throughput/latency operatorlari, p95/p99 end-to-end.
State size/rocksdb I/O, checkpoint’lar chastotasi/davomiyligi.
DLQ rate, deduplikatsiyalar/retraylar foizi.
CPU/GC/heap, pauza vaqti.
Xavfsizlik va komplayens
Maʼlumotlar tasnifi: PII/PCI ni sxemalarda belgilang, minimal saqlang, state va snapshotlarni shifrlang.
Access control: state va sinks uchun alohida ACL.
Retensiyalar: yuridik talablarga muvofiq (GDPR/unutish huquqi).
Audit:’event _ id’,’trace _ id’, natijasi:’APPLIED/ALREADY _ APPLIED/RETRIED’.
Joriy etish patternlari
1. CDC → normallashtirish → domen voqealari: xom ma’lumotlar o’zgarishlarini translyatsiya qilmang, tushunarli biznes faktlariga map qiling.
2. Outbox ishlab chiqaruvchilarda: tranzaksiya fakti + hodisa - bitta DB-tranzaksiyada.
3. Core vs Enriched: eng kam payload kritik oqimda, boyitish - asinxron.
4. Replay-do’stlik: proyeksiyalar/vitrinalar logdan qayta yig’ilishi kerak.
5. Idempotency by design: operation/event key, upsert-sxemalar, agregatlar versiyasi.
Test oʻtkazish
Unit/Property-based: agregatlar va oʻzgarishlarning invariantlari.
Stream tests: out-of-order va dublikatlar bilan belgilangan voqealar oqimi → oyna va dedupni tekshirish.
Golden windows: etalon oynalar/agregatlar va ruxsat etilgan kechki tuzatishlar.
Fault-injection: «effekt yozdi» va «kommital ofset» orasidagi yiqilish.
Replay tests: logning boshidan oynani qayta yigʻish = joriy holat.
Qiymati va optimallashtirish
Oynalar va watermark kechikish/resurslarga taʼsir qiladi: oynaning uzunligi va’allowed _ lateness’kattaroq boʻlsa, state kattaroq boʻladi.
Kodeklar va siqish: CPU/tarmoqni muvozanatlash.
Chiqishda batching: kamroq tarmoq qoʻngʻiroqlari va tranzaktsiyalari.
Erta filtrlash («pushdown»): ortiqcha narsani manbaga iloji boricha yaqinroq tashlang.
Antipatternlar
Processing time bilan bog’lanish kerak bo’lgan joyda.
Restartlarda sink → ikki marta effektda idempotentlik yoʻqligi.
Global mega-kalitlar: bitta issiq bo’lim parallelizmni buzadi.
Xom CDClar ommaviy hodisalar sifatida: DD sxemalarining tarqalishi, evolyutsiya paytida mo’rt bo’lish.
DLQ yo’q: «zaharli» xabarlar butun konveyerni to’sib qo’yadi.
Watermark oʻrniga qatʼiy kechikish: yoki abadiy kutish yoki maʼlumotlarni yoʻqotish.
Domen namunalari
To’lovlar/moliya
’payment.’ oqimi, antifrod oynalari (session + CEP),’operation _ id’dedupi.
Buxgalteriya ledger (upsert + versiya) ga joylashtirilganda Exactly-once effekti.
Marketing/reklama
Sliding oyna CTR/konversiya, Join kliklash va’± Δ t’ruxsati bilan ko’rsatish, bidding uchun agregatsiyalar.
iGaming/onlayn xizmatlar
Real-taym balans/limitlar, missiyalar/achivkalar (session derazalar), antifrod-patternlar va ogohlantirishlar.
Mini-shablonlar (psevdokod)
Suv belgilari va late-yangilanishlar oynasi
pseudo stream
.withEventTime(tsExtractor)
.withWatermark(maxAllowedLag=2m)
.window(TUMBLING, size=1m, allowedLateness=30s)
.keyBy(user_id)
.aggregate(sum, retractions=enable)
.sink (upsert_table )//idempotent upsert by (user_id, window_start)
Ofset oʻrnatilgan tranzaksion sink
pseudo begin tx upsert target_table using event, key=(k)
update consumer_offsets set pos=:pos where consumer=:c commit
Ishlab chiqarish chek-varaqasi
- event time va watermark strategiyasi aniqlandi; ’allowed _ lateness’ oynasi tanlangan.
- Idempotent sink yoki ofset bilan tranzaksion kommit.
- Sxemalar reyestri va muvofiqlik rejimlari kiritilgan; qo’shimcha evolyutsiya.
- Metriklar: lag, watermark, p95/p99, DLQ, state oʻlchami, checkpoint davomiyligi.
- Testlar: out-of-order, dublikatlar, qayta ishga tushirishlar, replay.
- Davlat va snapshotlar uchun PII/retensiya siyosati.
- Kattalashtirish rejasi va backpressure strategiyasi.
- Deraza shartnomalari va tuzatishlar bo’yicha hujjatlar (late updates).
FAQ
Event time talab qilinadimi?
Agar metrikaning toʻgʻriligi va muvofiqligi muhim boʻlsa, ha. Processing time texnik hisob-kitoblar/monitoring uchun mos keladi, ammo tahlilni buzadi.
exactly-once kerakmi?
Aniq: tanqidiy effektlar uchun. Ko’pincha at-least-once + idempotent sink yetarli.
Oynalarni qanday tanlash mumkin?
SLAdan boshlang: «oxirgi 5 daqiqada» → hopping, «foydalanuvchi sessiyalari» → session, «daqiqali hisobotlar» → tumbling.
Kechroq maʼlumotlar bilan nima qilish kerak?
Cheklangan’allowed _ lateness’ga ruxsat berish va tuzatishlar chiqarish (upsert/retract). Mijoz vitrinasi yangilanishi kerak.
Jami
Oqim bilan ishlov berish nafaqat past kechikish, balki vaqt, holat va kontraktlar intizomidir. To’g’ri vaqt, derazalar va suv belgilarini tanlash, shuningdek, idempotent ta’sirlar, kuzatuv va testlar konveyerni ishonchli, qayta tiklanadigan va tejamkor qiladi - va biznesga «kechasi» emas, balki «hozir va hozir» yechimlarini beradi.