重復數據消除事件
1)為什麼需要重復數據消除
由於retrais,網絡定時器,fails後的恢復以及歷史數據的重播,重復出現。如果不控制它們:- 違反不變量(雙重註銷,重復電子郵件/SMS,「雙重創建」訂單);
- 成本上升(重復記錄/處理);
- 分析失真。
重復數據消除的目的是在允許的傳輸重復中提供單次觀察到的效果,並且通常具有等效性。
2)在何處放置重復數據消除(層)
1.Edge/API網關-通過「Idempotency-Keu」/身體+簽名切斷顯式雙打。
2.Broker/Stream-邏輯按鍵/音序重復數據消除,在錯過時進行同步(由於成本而減少)。
3.事件接收器(Consumer)-主要位置:收件箱/密鑰表/緩存。
4.Sink (DB/kesh)-唯一鍵/UPSERT/版本/壓縮。
5.ETL/分析是按時間窗口和柱柱中的鍵進行去除。
規則:盡早但要考慮到誤報的成本和倒帶的必要性。
3)重復數據消除密鑰
3.1自然(最好)
`payment_id`, `order_id`, `saga_id#step`, `aggregate_id#seq`.
保證穩定性和意義。
3.2復合體
`(tenant_id, type, external_id, version)` или `(user_id, event_ts_truncated, payload_hash)`.
3.3個指紋(指紋)
字段確定性子集的散列(歸一化順序/寄存器),可選的「HMAC(secret,payload)」。
3.4序列/版本
單調的「seq」 per aggregate(樂觀的鎖定/轉化)。
反模式:「隨機的UUID」與業務實體沒有關聯-不可行。
4)時間窗口和順序
重復數據消除窗口-事件可以重復出現的時期(通常為24-72h;對於金融-更長時間)。
訂單:允許遲到(lateness)。在流媒體框架中-活動時間+水上市場。
Sliding/Fix-window dedup: "在最後N分鐘看到鑰匙了嗎?».
Sequence-aware:如果「seq」 ≤最後處理的是雙打/重播。
5)數據結構和實現
5.1精確核算(exact)
Redis SET/STRING+TTL: 「SETNX key 1 EX 86400」 →「是第一次-處理,否則是SKIP。」
LRU/LFU緩存(in-proc):快速但頻繁→僅作為第一個障礙更好。
SQL唯一索引+UPSERT:「插入或更新」(等效效果)。
5.2近似結構(probabilistic)
Bloom/Cuckoo過濾器:便宜的內存,假陽性(假陽性)是可能的。適用於明顯的「嘈雜」下垂(例如遙測),不適用於財務/訂單。
Count-Min素描:估計頻率以防止熱倍。
5.3流狀態
Kafka Streams/Flink: keyed state store c TTL,在窗口中按鍵;checkpoint/restore.
Watermark+allowed lateness:管理一個遲到的事件窗口。
6)事務模式
6.1 Inbox(入站表)
我們保留「message_id」/密鑰和結果直到副作用:pseudo
BEGIN;
ins = INSERT INTO inbox(id, received_at) ON CONFLICT DO NOTHING;
IF ins_not_inserted THEN RETURN cached_result;
result = handle(event);
UPSERT sink with result; -- idempotent sync
UPDATE inbox SET status='done', result_hash=... WHERE id=...;
COMMIT;
重播將看到錄音,不會重復效果。
6.2 Outbox
一筆交易中的業務記錄和事件→發布者將其交給經紀人。並不能消除消費者的擠壓,但排除了「漏洞」。
6.3 個唯一索引/UPSERT
sql
INSERT INTO payments(id, status, amount)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO NOTHING; -- "create once"
或受控版本更新:
sql
UPDATE orders
SET status = $new, version = version + 1
WHERE id=$id AND version = $expected; -- optimistic blocking
6.4組轉化
事件適用於'事件。version = aggregate.version + 1`.否則-雙打/重播/沖突。
7)Dedup和經紀人/流媒體
7.1 Kafka
Idempotent Producer降低了入口處的配音。
Transactions允許原子商量失真+輸出記錄。
Compaction:存儲最後一個按鍵值-事後去除/合並(不用於付款)。
消費者面:state store/Redis/DB用於窗口密鑰。
7.2 NATS / JetStream
Ack/稀有 →。消費者中的Dedup(Inbox/Redis)。
消費者的JetStream序列/沙丘簡化了重復的識別。
7.3個隊列(Rabbit/SQS)
Visibility timeout+重新交付→需要key+dedup stor。
帶有「MessageGroupId」/「DeduplicationId」的SQS FIFO提供了幫助,但窗口的TTL僅限於提供商-如果業務需要,則存儲密鑰的時間更長。
8)存儲和分析a
8.1 ClickHouse/BigQuery
Dedup在窗口:「ORDER BY key, ts」和「argMax」/「anyLast」與條件。
ClickHouse:
sql
SELECT key,
anyLast(value) AS v
FROM t
WHERE ts >= now() - INTERVAL 1 DAY
GROUP BY key;
或「唯一」事件的實例化層(按鍵/版本排列)。
8.2 Logi/遙測
假設ingest上的approximate dedup (Bloom) →可以節省網絡/磁盤。
9)重復處理、倒帶和後背
Dedup Keys必須在繼電器中幸存下來(TTL ≥繼電器窗口)。
對於後門,使用帶有版本(「key#source=batch 2025」)或單獨的「李子」的密鑰空間,以免幹擾在線窗口。
存儲結果工件(hash/版本)-這會加速重播中的「快速跳過」。
10)度量與可觀察性
'dedup_hit_total'/'dedup_hit_rate'是捕獲的倍數的比例。
概率濾波器的「dedup_fp_rate」。
「window_size_seconds」是實際的(通過長篇小說)。
`inbox_conflict_total`, `upsert_conflict_total`.
`replayed_events_total`, `skipped_by_inbox_total`.
根據tenant/key/type的配置文件:哪裏的倍數最多,為什麼。
Логи: `message_id`, `idempotency_key`, `seq`, `window_id`, `action=process|skip`.
11)安全和隱私
不要將PII放入鑰匙中;使用哈希/別名。
對於簽名,打印為HMAC(秘密,canonical_payload),以避免沖突/偽造。
密鑰保留時間與合規性(GDPR重構)保持一致。
12)生產力和成本
每個操作的潛伏期/成本的in-proc LRU ≪ Redis ≪ SQL。
Redis:便宜而快速,但要考慮密鑰和TTL的數量;在「tenant/hash」上搖擺。
SQL:p99價格昂貴,但提供強大的保修和受眾。
Probilistic過濾器:非常便宜,但FP是可能的-應用「額外的SKIP」並不關鍵。
13)反模式
「我們有Kafka exactly-once-不需要鑰匙。」需要-在合成器/業務層中。
太短的TTL用於扳手→繼電/延遲將提供雙打。
全球單進站→熱點和SPOF;未通過tenant/鍵進行擠壓。
僅內存中的dedup-過程丟失=雙波。
金錢/訂單的Bloom-false positive將剝奪合法交易。
不一致的payload規範化是消息含義上相同的不同哈希。
忽略順序退出-後期事件錯誤地標記為雙打。
14)實施支票
- 定義自然鍵(或復合/指紋)。
- 設置dedup窗口和「lateness」策略。
- 選擇級別:edge, consumer, sink;預先設置硬化。
- 實現Inbox/UPSERT;對於線程-keyed state+TTL。
- 如果需要approximate屏障-Bloom/Cuckoo(僅適用於非關鍵域)。
- 配置繼電器兼容性(TTL ≥繼電器/後門窗口)。
- 「dedup_hit_rate」度量,沖突以及窗口的滯後;dashbords per-tenant。
- Game Day: taymouts/retrais, replays, out-of-order,緩存下降。
- 記錄付費封裝和密鑰轉換。
- 對熱鍵和長窗進行負載測試。
15)配置/代碼示例
15.1 Redis SETNX+TTL(屏障)
lua
-- KEYS[1] = "dedup:{tenant}:{key}"
-- ARGV[1] = ttl_seconds local ok = redis. call("SET", KEYS[1], "1", "NX", "EX", ARGV[1])
if ok then return "PROCESS"
else return "SKIP"
end
15.2 PostgreSQL Inbox
sql
CREATE TABLE inbox (
id text PRIMARY KEY,
received_at timestamptz default now(),
status text default 'received',
result_hash text
);
-- In the handler: INSERT... ON CONFLICT DO NOTHING -> check, then UPSERT in blue.
15.3 Kafka Streams(窗口中的dedup)
java var deduped = input
.selectKey((k,v) -> v.idempotencyKey())
.groupByKey()
.windowedBy(TimeWindows. ofSizeWithNoGrace(Duration. ofHours(24)))
.reduce((oldV,newV) -> oldV) // first wins
.toStream()
.map((wKey,val) -> KeyValue. pair(wKey. key(), val));
15.4 Flink (keyed state+TTL, pseudo)
java
ValueState<Boolean> seen;
env. enableCheckpointing(10000);
onEvent(e):
if (!seen.value()) { process(e); seen. update(true); }
15.5個NGINX/API網關(Edge上的Idempotency-Key)
nginx map $http_idempotency_key $idkey { default ""; }
Proxy the key to the backend; backend solves deadup (Inbox/Redis).
16) FAQ
問:選擇什麼:去世還是純粹的平均水平?
答:通常兩者兼而有之:去勢是快速「過濾器」(節省),冪等是正確效果的保證。
問:什麼是TTL上市?
答:≥可能重新交付的最大時間+庫存。通常為24-72小時;財務和延期任務-天/周。
Q: 如何處理後期事件?
A:設置「allowed lateness」和「late_event」信號;後期-通過一個單獨的分支(recompute/skip)。
Q: 是否可以對整個遙測流進行重復數據消除?
A:是的,在邊緣使用approximate過濾器(Bloom),但要考慮FP,不要應用於關鍵的業務效果。
問:Dedup幹擾後場嗎?
答:共享鑰匙空間(「key#batch 2025」)或關閉後門時間的屏障;TTL密鑰只能覆蓋在線窗口。
17)結果
重復數據消除是一種構成:正確的密鑰、窗口和狀態結構+事務模式(Inbox/Outbox/UPSERT)以及有意識地處理順序和延遲事件。將屏障放置在最便宜的地方,確保藍色中的冪等,測量「dedup_hit_rate」並測試反射/反射-這樣你就可以獲得「有效的exactly-once」而無需額外的潛伏和成本尾巴。