重复数据消除事件
1)为什么需要重复数据消除
由于retrais,网络定时器,fails后的恢复以及历史数据的重播,重复出现。如果不控制它们:- 违反不变量(双重注销,重复电子邮件/SMS,"双重创建"订单);
- 成本上升(重复记录/处理);
- 分析失真。
重复数据消除的目的是在允许的传输重复中提供单次观察到的效果,并且通常具有等效性。
2)在何处放置重复数据消除(层)
1.Edge/API网关-通过"Idempotency-Keu"/身体+签名切断显式双打。
2.Broker/Stream-逻辑按键/音序重复数据消除,在错过时进行同步(由于成本而减少)。
3.事件接收器(Consumer)-主要位置:收件箱/密钥表/缓存。
4.Sink (DB/kesh)-唯一键/UPSERT/版本/压缩。
5.ETL/分析是按时间窗口和柱柱中的键进行去除。
规则:尽早但要考虑到误报的成本和倒带的必要性。
3)重复数据消除密钥
3.1自然(最好)
`payment_id`, `order_id`, `saga_id#step`, `aggregate_id#seq`.
保证稳定性和意义。
3.2复合体
`(tenant_id, type, external_id, version)` или `(user_id, event_ts_truncated, payload_hash)`.
3.3个指纹(指纹)
字段确定性子集的散列(归一化顺序/寄存器),可选的"HMAC(secret,payload)"。
3.4序列/版本
单调的"seq" per aggregate(乐观的锁定/转化)。
反模式:"随机的UUID"与业务实体没有关联-不可行。
4)时间窗口和顺序
重复数据消除窗口-事件可以重复出现的时期(通常为24-72h;对于金融-更长时间)。
订单:允许迟到(lateness)。在流媒体框架中-活动时间+水上市场。
Sliding/Fix-window dedup: "在最后N分钟看到钥匙了吗?».
Sequence-aware:如果"seq" ≤最后处理的是双打/重播。
5)数据结构和实现
5.1精确核算(exact)
Redis SET/STRING+TTL: "SETNX key 1 EX 86400" →"是第一次-处理,否则是SKIP。"
LRU/LFU缓存(in-proc):快速但频繁→仅作为第一个障碍更好。
SQL唯一索引+UPSERT:"插入或更新"(等效效果)。
5.2近似结构(probabilistic)
Bloom/Cuckoo过滤器:便宜的内存,假阳性(假阳性)是可能的。适用于明显的"嘈杂"下垂(例如遥测),不适用于财务/订单。
Count-Min素描:估计频率以防止热倍。
5.3流状态
Kafka Streams/Flink: keyed state store c TTL,在窗口中按键;checkpoint/restore.
Watermark+allowed lateness:管理一个迟到的事件窗口。
6)事务模式
6.1 Inbox(入站表)
我们保留"message_id"/密钥和结果直到副作用:pseudo
BEGIN;
ins = INSERT INTO inbox(id, received_at) ON CONFLICT DO NOTHING;
IF ins_not_inserted THEN RETURN cached_result;
result = handle(event);
UPSERT sink with result; -- idempotent sync
UPDATE inbox SET status='done', result_hash=... WHERE id=...;
COMMIT;
重播将看到录音,不会重复效果。
6.2 Outbox
一笔交易中的业务记录和事件→发布者将其交给经纪人。并不能消除消费者的挤压,但排除了"漏洞"。
6.3 个唯一索引/UPSERT
sql
INSERT INTO payments(id, status, amount)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO NOTHING; -- "create once"
或受控版本更新:
sql
UPDATE orders
SET status = $new, version = version + 1
WHERE id=$id AND version = $expected; -- optimistic blocking
6.4组转化
事件适用于'事件。version = aggregate.version + 1`.否则-双打/重播/冲突。
7)Dedup和经纪人/流媒体
7.1 Kafka
Idempotent Producer降低了入口处的配音。
Transactions允许原子商量失真+输出记录。
Compaction:存储最后一个按键值-事后去除/合并(不用于付款)。
消费者面:state store/Redis/DB用于窗口密钥。
7.2 NATS / JetStream
Ack/稀有 →。消费者中的Dedup(Inbox/Redis)。
消费者的JetStream序列/沙丘简化了重复的识别。
7.3个队列(Rabbit/SQS)
Visibility timeout+重新交付→需要key+dedup stor。
带有"MessageGroupId"/"DeduplicationId"的SQS FIFO提供了帮助,但窗口的TTL仅限于提供商-如果业务需要,则存储密钥的时间更长。
8)存储和分析a
8.1 ClickHouse/BigQuery
Dedup在窗口:"ORDER BY key, ts"和"argMax"/"anyLast"与条件。
ClickHouse:
sql
SELECT key,
anyLast(value) AS v
FROM t
WHERE ts >= now() - INTERVAL 1 DAY
GROUP BY key;
或"唯一"事件的实例化层(按键/版本排列)。
8.2 Logi/遥测
假设ingest上的approximate dedup (Bloom) →可以节省网络/磁盘。
9)重复处理、倒带和后背
Dedup Keys必须在继电器中幸存下来(TTL ≥继电器窗口)。
对于后门,使用带有版本("key#source=batch 2025")或单独的"李子"的密钥空间,以免干扰在线窗口。
存储结果工件(hash/版本)-这会加速重播中的"快速跳过"。
10)度量与可观察性
'dedup_hit_total'/'dedup_hit_rate'是捕获的倍数的比例。
概率滤波器的"dedup_fp_rate"。
"window_size_seconds"是实际的(通过长篇小说)。
`inbox_conflict_total`, `upsert_conflict_total`.
`replayed_events_total`, `skipped_by_inbox_total`.
根据tenant/key/type的配置文件:哪里的倍数最多,为什么。
Логи: `message_id`, `idempotency_key`, `seq`, `window_id`, `action=process|skip`.
11)安全和隐私
不要将PII放入钥匙中;使用哈希/别名。
对于签名,打印为HMAC(秘密,canonical_payload),以避免冲突/伪造。
密钥保留时间与合规性(GDPR重构)保持一致。
12)生产力和成本
每个操作的潜伏期/成本的in-proc LRU ≪ Redis ≪ SQL。
Redis:便宜而快速,但要考虑密钥和TTL的数量;在"tenant/hash"上摇摆。
SQL:p99价格昂贵,但提供强大的保修和受众。
Probilistic过滤器:非常便宜,但FP是可能的-应用"额外的SKIP"并不关键。
13)反模式
"我们有Kafka exactly-once-不需要钥匙。"需要-在合成器/业务层中。
太短的TTL用于扳手→继电/延迟将提供双打。
全球单进站→热点和SPOF;未通过tenant/键进行挤压。
仅内存中的dedup-过程丢失=双波。
金钱/订单的Bloom-false positive将剥夺合法交易。
不一致的payload规范化是消息含义上相同的不同哈希。
忽略顺序退出-后期事件错误地标记为双打。
14)实施支票
- 定义自然键(或复合/指纹)。
- 设置dedup窗口和"lateness"策略。
- 选择级别:edge, consumer, sink;预先设置硬化。
- 实现Inbox/UPSERT;对于线程-keyed state+TTL。
- 如果需要approximate屏障-Bloom/Cuckoo(仅适用于非关键域)。
- 配置继电器兼容性(TTL ≥继电器/后门窗口)。
- "dedup_hit_rate"度量,冲突以及窗口的滞后;dashbords per-tenant。
- Game Day: taymouts/retrais, replays, out-of-order,缓存下降。
- 记录付费封装和密钥转换。
- 对热键和长窗进行负载测试。
15)配置/代码示例
15.1 Redis SETNX+TTL(屏障)
lua
-- KEYS[1] = "dedup:{tenant}:{key}"
-- ARGV[1] = ttl_seconds local ok = redis. call("SET", KEYS[1], "1", "NX", "EX", ARGV[1])
if ok then return "PROCESS"
else return "SKIP"
end
15.2 PostgreSQL Inbox
sql
CREATE TABLE inbox (
id text PRIMARY KEY,
received_at timestamptz default now(),
status text default 'received',
result_hash text
);
-- In the handler: INSERT... ON CONFLICT DO NOTHING -> check, then UPSERT in blue.
15.3 Kafka Streams(窗口中的dedup)
java var deduped = input
.selectKey((k,v) -> v.idempotencyKey())
.groupByKey()
.windowedBy(TimeWindows. ofSizeWithNoGrace(Duration. ofHours(24)))
.reduce((oldV,newV) -> oldV) // first wins
.toStream()
.map((wKey,val) -> KeyValue. pair(wKey. key(), val));
15.4 Flink (keyed state+TTL, pseudo)
java
ValueState<Boolean> seen;
env. enableCheckpointing(10000);
onEvent(e):
if (!seen.value()) { process(e); seen. update(true); }
15.5个NGINX/API网关(Edge上的Idempotency-Key)
nginx map $http_idempotency_key $idkey { default ""; }
Proxy the key to the backend; backend solves deadup (Inbox/Redis).
16) FAQ
问:选择什么:去世还是纯粹的平均水平?
答:通常两者兼而有之:去势是快速"过滤器"(节省),幂等是正确效果的保证。
问:什么是TTL上市?
答:≥可能重新交付的最大时间+库存。通常为24-72小时;财务和延期任务-天/周。
Q: 如何处理后期事件?
A:设置"allowed lateness"和"late_event"信号;后期-通过一个单独的分支(recompute/skip)。
Q: 是否可以对整个遥测流进行重复数据消除?
A:是的,在边缘使用approximate过滤器(Bloom),但要考虑FP,不要应用于关键的业务效果。
问:Dedup干扰后场吗?
答:共享钥匙空间("key#batch 2025")或关闭后门时间的屏障;TTL密钥只能覆盖在线窗口。
17)结果
重复数据消除是一种构成:正确的密钥、窗口和状态结构+事务模式(Inbox/Outbox/UPSERT)以及有意识地处理顺序和延迟事件。将屏障放置在最便宜的地方,确保蓝色中的幂等,测量"dedup_hit_rate"并测试反射/反射-这样你就可以获得"有效的exactly-once"而无需额外的潜伏和成本尾巴。