Deduplicarea evenimentelor
1) De ce eliminarea duplicatelor
Duplicatele apar datorită retraielor, timeout-urilor de rețea, failover-ului și reluării datelor istorice. Dacă nu sunt controlate:- invarianții sunt încălcați (debite duble, e-mail repetat/SMS, ordine „creată de două ori”);
- Creșterea costurilor (re-scrie/reprocese)
- analiza distorsionată.
Scopul deduplicării este de a oferi un efect unic observat cu repetări acceptabile ale transportului, adesea împreună cu idempotența.
2) În cazul în care pentru a plasa deduplicare (niveluri)
1. Gateway-ul Edge/API - taie duplicatele explicite prin semnătura 'Idempotency-Keu '/body +.
2. Broker/stream - deduplicarea logică prin cheie/secvență, coagularea la o ratare (mai rar - din cauza costurilor).
3. Receptor de evenimente (consumator) - locație principală: Inbox/masă cheie/memorie cache.
4. Chiuveta (DB/cache) - chei unice/UPSERT/versiuni/compresie.
5. ETL/analiză - termen limită în funcție de fereastra de timp și cheie în paturi coloană.
Regula: cât mai curând posibil, dar luând în considerare costul pozitivelor false și nevoia de reluare.
3) Cheile de eliminare a duplicatelor
3. 1 Natural (preferat)
'payment _ id',' order _ id', 'saga _ id # step', 'agregate _ id # seq'.
Garantați stabilitatea și semnificația.
3. 2 Compozit
„( , tip, , versiune)” „( , )”.
3. 3 Amprentă digitală
Hash-ul unui subset determinist de câmpuri (normalizați ordinea/registrele), opțional „HMAC (secret, sarcină utilă)”.
3. 4 secvențe/versiuni
Monoton 'seq' per agregat (blocare/versionare optimistă).
Anti-model: „UUID aleatoriu” fără o conexiune cu o entitate de afaceri este imposibil.
4) Ferestre de timp și ordine
Fereastra de eliminare a duplicatelor - perioada în care evenimentul poate veni din nou (de obicei 24-72 de ore; pentru finanțe - mai mult).
Out-of-order: să fie întârziere. În cadrele de streaming - timp eveniment + filigrane.
Sliding/Fix-fereastră deadup: "ai văzut cheia în ultimele N minute? ».
Secvență conștientă: dacă „seq” ≤ ultimul procesat - dublu/repetat.
5) Structuri și implementări de date
5. 1 Contabilitate exactă
Redis SET/STRING + TTL: „SETNX cheie 1 EX 86400” → „pentru prima dată - suntem de prelucrare, în caz contrar - SKIP”.
Memoria cache LRU/LFU (in-proc): rapidă, dar volatilă → mai bună decât prima barieră.
SQL indexuri unice + UPSERT: „inserați sau actualizați” (efect idempotent).
5. 2 Structuri aproximative (probabilistice)
Filtru Bloom/Cuckoo: memorie ieftină, pozitive false sunt posibile. Este potrivit pentru o picătură evidentă „zgomotoasă” (de exemplu, telemetrie), nu pentru finanțe/comenzi.
Count-Min Sketch: Estimarea frecvențelor pentru a proteja împotriva „fierbinte” ia.
5. 3 stări de streaming
Kafka Streams/Flink: keyed state store cu TTL, dedup prin cheie în fereastră; punct de control/restaurare.
Filigran + lateness permis: Gestionează fereastra evenimentelor târzii.
6) Modele tranzactionale
6. 1 Inbox (tabelul de intrare)
Salvați 'message _ id'/cheie și rezultatul la efecte secundare:pseudo
BEGIN;
ins = INSERT INTO inbox(id, received_at) ON CONFLICT DO NOTHING;
IF ins_not_inserted THEN RETURN cached_result;
result = handle(event);
UPSERT sink with result; -- idempotent sync
UPDATE inbox SET status='done', result_hash=... WHERE id=...;
COMMIT;
Reluarea va vedea înregistrarea și nu va repeta efectul.
6. 2 Outbox
Înregistrare de afaceri și eveniment într-o singură tranzacție → editorul trimite la broker. Nu elimină dublura de la consumator, dar exclude „găurile”.
6. 3 Indici unici/UPSERT
sql
INSERT INTO payments(id, status, amount)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO NOTHING; -- "create once"
sau upgrade-ul versiunii controlate:
sql
UPDATE orders
SET status = $new, version = version + 1
WHERE id=$id AND version = $expected; -- optimistic blocking
6. 4 Versionarea agregatelor
Evenimentul este aplicabil dacă "eveniment. versiune = agregat. versiunea + 1 '. În caz contrar - dublu/repetare/conflict.
7) Deadup și brokeri/fluxuri
7. 1 Kafka
Producătorul Idempotent reduce dublurile de intrare.
Tranzacțiile vă permit să comiteți atomic compensări + înregistrări de ieșire.
Compactare: stochează ultima valoare per cheie - post-factum dedup/coalescing (nu pentru plăți).
Consumator: magazin de stat/Redis/DB pentru cheile de fereastră.
7. 2 NATS/JetStream
Ack/redelivery → cel puțin o dată. Dedup în consumator (Inbox/Redis).
Secvența JetStream/munca consumatorilor facilitează identificarea repetărilor.
7. 3 Cozi (Iepure/SQS)
Timpul de vizibilitate + livrările repetate → aveți nevoie de o cheie + deadstore.
SQS FIFO cu „MessageGroupId'/” DeduplicationId' ajută, dar ferestrele TTL sunt limitate de furnizor - păstrați cheile mai mult timp dacă afacerea necesită.
8) Depozitare și analizoare
8. 1 ClickHouse/BigQuery
Dedup by window: 'COMANDĂ DUPĂ CHEIE, ts' și 'argMax '/' anyLast' cu condiție.
ClickHouse:sql
SELECT key,
anyLast(value) AS v
FROM t
WHERE ts >= now() - INTERVAL 1 DAY
GROUP BY key;
Sau un strat materializat de evenimente „unice” (îmbinare prin cheie/versiune).
8. 2 Busteni/telemetrie
Să spunem aproximativ-dump (Bloom) pe ingera → salva rețea/disc.
9) Reprocesare, reluare și rambursare
Cheile dedup trebuie să supraviețuiască reluării (TTL ≥ fereastra de reluare).
Pentru rambursare, utilizați spațiul cheie cu versiunea ('key # source = batch2025') sau separat „scurgeri”, astfel încât să nu interfereze cu fereastra online.
Stoca artefacte rezultat (hash/versiune) - acest lucru accelerează „rapid-sari” pe reluări.
10) Măsurători și observabilitate
'dedup _ hit _ total '/' dedup _ hit _ rate' - proporția duplicatelor prinse.
'dedup _ fp _ rate' pentru filtrele probabilistice.
'window _ size _ seconds' actual (prin telemetrie sosiri târzii).
'inbox _ conflict _ total', 'upsert _ conflict _ total'.
'replayed _ events _ total', 'skipped _ by _ inbox _ total'.
Profile după chiriaș/cheie/tip: unde sunt cele mai multe și de ce.
Логи: 'message _ id',' idempotency _ key ',' seq ',' window _ id', 'action = process' skip'.
11) Securitate și confidențialitate
Nu puneți PII în cheie; utilizați hashes/pseudonime.
Pentru a semna amprenta - HMAC (secret, canonical_payload) pentru a evita coliziunile/falsificarea.
Coordonați timpul de stocare al cheilor cu respectarea (păstrarea GDPR).
12) Performanță și cost
In-proc LRU ≪ Redis ≪ SQL prin latență/cost per operație.
Redis: ieftin și rapid, dar ia în considerare volumul de chei și TTL; shardy de „chiriaș/hash”.
SQL: scump cu p99, dar oferă garanții puternice și audiență.
Filtre probabilistice: foarte ieftine, dar FPs sunt posibile - utilizați în cazul în care „SKIP suplimentar” nu este critică.
13) Anti-modele
"Avem Kafka exact o dată - nici o cheie necesară. "Necesar - într-un strat de vânătaie/afacere.
Prea scurt TTL pentru chei → reluări/întârziere va livra o dublă.
Dedup unic global → hotspot și SPOF; nu este ascuțită de chiriaș/cheie.
Dedup numai în memorie - pierderea procesului = val de ia.
Bloom pentru bani/ordine - fals pozitiv va priva operațiunea legitimă.
Canonizarea sarcinii utile inconsistente - diferite hash-uri pentru mesaje care sunt identice în sens.
Ignorarea out-of-order - evenimentele târzii sunt marcate cu duplicate în mod eronat.
14) Lista de verificare a implementării
- Definiți o cheie naturală (sau compus/amprentă digitală).
- Setați fereastra dedup și politica 'lateness'.
- Selectați nivelul (nivelurile): margine, consumator, chiuvetă; asigura pentru shardening.
- Implementare Inbox/UPSERT; pentru fluxuri - stat cu taste + TTL.
- Dacă aveți nevoie de o barieră aproximativă - Bloom/Cuckoo (numai pentru domenii non-critice).
- Configurați compatibilitatea reluării (TTL ≥ fereastra de reluare/rambursare).
- Metrics' dedup _ hit _ rate ", conflicte și lag-uri fereastră; tablouri de bord per chiriaș.
- Ziua jocului: timeout/retrays, reluare, out-of-order, picătură de cache.
- Canonizarea sarcinii utile a documentelor și versionarea cheilor.
- Efectuați teste de sarcină pe chei fierbinți și ferestre lungi.
15) Configurații de probă/Cod
15. 1 Redis SETNX + TTL (barieră)
lua
-- KEYS[1] = "dedup:{tenant}:{key}"
-- ARGV[1] = ttl_seconds local ok = redis. call("SET", KEYS[1], "1", "NX", "EX", ARGV[1])
if ok then return "PROCESS"
else return "SKIP"
end
15. 2 Inbox PostgreSQL
sql
CREATE TABLE inbox (
id text PRIMARY KEY,
received_at timestamptz default now(),
status text default 'received',
result_hash text
);
-- In the handler: INSERT... ON CONFLICT DO NOTHING -> check, then UPSERT in blue.
15. 3 fluxuri Kafka
java var deduped = input
.selectKey((k,v) -> v.idempotencyKey())
.groupByKey()
.windowedBy(TimeWindows. ofSizeWithNoGrace(Duration. ofHours(24)))
.reduce((oldV,newV) -> oldV) // first wins
.toStream()
.map((wKey,val) -> KeyValue. pair(wKey. key(), val));
15. 4 Flink (stare cheie + TTL, pseudo)
java
ValueState<Boolean> seen;
env. enableCheckpointing(10000);
onEvent(e):
if (!seen.value()) { process(e); seen. update(true); }
15. 5 gateway-ul NGINX/API (Idempotency-Key pe margine)
nginx map $http_idempotency_key $idkey { default ""; }
Proxy the key to the backend; backend solves deadup (Inbox/Redis).
16) ÎNTREBĂRI FRECVENTE
Î: Ce să alegeți: deadup sau idempotence pură?
R: De obicei, ambele: deadup este un „filtru” rapid (economii), idempotența este o garanție a efectului corect.
Î: Ce TTL pentru a pune?
R: ≥ timp maxim posibil de re-livrare + inventar. De obicei 24-72 ore; pentru sarcini financiare și amânate - zile/săptămâni.
Î: Cum gestionați evenimentele târzii?
R: Configurați "întârzierea permisă" și alarm' late _ event "; mai târziu - printr-o ramură separată (recompute/skip).
Î: Întregul flux de telemetrie poate fi deduplicat?
R: Da, filtre aproximative (Bloom) pe margine, dar ia în considerare FP și nu se aplică efectelor critice de afaceri.
Î: Deadup obtinerea în calea de rambursare?
R: Separați spațiile cheie ('key # batch2025') sau dezactivați bariera pe durata rambursării; Cheile TTL ar trebui să acopere doar ferestrele online.
17) Totaluri
Deduplicarea este compoziția: cheia potrivită, structura ferestrei și a stării + modelele tranzacționale (Inbox/Outbox/UPSERT) și manipularea atentă a ordinii și a evenimentelor târzii. Puneți bariere unde este mai ieftin, asigurați idempotența în vânătăi, măsurați 'dedup _ hit _ rate' și testați reluări/eșecuri - în acest fel obțineți „exact-o dată” fără cozi inutile de latență și cost.