Дедупликация событий
1) Зачем нужна дедупликация
Дубликаты появляются из-за ретраев, сетевых таймаутов, восстановлений после фейлов и реплея исторических данных. Если их не контролировать:- нарушаются инварианты (двойные списания, повторные email/SMS, «дважды созданный» заказ);
- растут затраты (повторные записи/обработки);
- искажается аналитика.
Цель дедупликации — обеспечить однократный наблюдаемый эффект при допускаемых повторах транспорта, часто вместе с идемпотентностью.
2) Где размещать дедупликацию (уровни)
1. Edge/API-шлюз — отсекаем явные дубли по `Idempotency-Key`/тело+подпись.
2. Брокер/стрим — логическая дедупликация по ключу/секвенсу, coalescing при промахе (реже — из-за стоимости).
3. Приемник событий (consumer) — основное место: Inbox/таблица ключей/кэш.
4. Синк (БД/кеш) — уникальные ключи/UPSERT/версии/компакция.
5. ETL/анализ — дедуп по окну времени и ключу в колоночных сторах.
Правило: как можно раньше, но с учетом стоимости ложных срабатываний и необходимости реплея.
3) Ключи дедупликации
3.1 Естественные (предпочтительно)
`payment_id`, `order_id`, `saga_id#step`, `aggregate_id#seq`.
Гарантируют стабильность и смысл.
3.2 Составные
`(tenant_id, type, external_id, version)` или `(user_id, event_ts_truncated, payload_hash)`.
3.3 Отпечаток (fingerprint)
Хэш детерминированного подмножества полей (нормализовать порядок/регистры), опционально `HMAC(secret, payload)`.
3.4 Последовательности/версии
Монотонные `seq` per aggregate (оптимистическая блокировка/версионирование).
Анти-паттерн: «рандомный UUID» без связи с бизнес-сущностью — дедуп невозможна.
4) Временные окна и порядок
Окно дедупликации — период, в течение которого событие может прийти повторно (обычно 24–72ч; для финансов — дольше).
Out-of-order: допускаем опоздание (lateness). В потоковых фреймворках — event time + watermarks.
Sliding/Fix-window дедуп: «видели ли ключ в последние N минут?».
Sequence-aware: если `seq` ≤ последнего обработанного — дубль/повтор.
5) Структуры данных и реализации
5.1 Точный учет (exact)
Redis SET/STRING + TTL: `SETNX key 1 EX 86400` → «впервые — обрабатываем, иначе — SKIP».
LRU/LFU кэш (in-proc): быстрый, но volatile → лучше только как первый барьер.
SQL уникальные индексы + UPSERT: «вставь или обнови» (идемпотентный эффект).
5.2 Приближенные структуры (probabilistic)
Bloom/Cuckoo filter: дешевая память, возможны ложные срабатывания (false positive). Подходит для явного «шумного» дропа (например, телеметрия), не для финансов/заказов.
Count-Min Sketch: оценка частот для защиты от «горячих» дублей.
5.3 Потоковые состояния
Kafka Streams/Flink: keyed state store c TTL, дедуп по ключу в окне; checkpoint/restore.
Watermark + allowed lateness: управляет окном опоздавших событий.
6) Транзакционные паттерны
6.1 Inbox (входящая таблица)
Сохраняем `message_id`/ключ и результат до побочных эффектов:pseudo
BEGIN;
ins = INSERT INTO inbox(id, received_at) ON CONFLICT DO NOTHING;
IF ins_not_inserted THEN RETURN cached_result;
result = handle(event);
UPSERT sink with result; -- idempotent sync
UPDATE inbox SET status='done', result_hash=... WHERE id=...;
COMMIT;
Повтор увидит запись и не повторит эффект.
6.2 Outbox
Бизнес-запись и событие в одной транзакции → публикатор отдает в брокер. Не устраняет дубль от потребителя, но исключает «дырки».
6.3 Уникальные индексы/UPSERT
sql
INSERT INTO payments(id, status, amount)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO NOTHING; -- "create once"
или контролируемое обновление версии:
sql
UPDATE orders
SET status = $new, version = version + 1
WHERE id=$id AND version = $expected; -- optimistic blocking
6.4 Версионирование агрегатов
Событие применимо, если `event.version = aggregate.version + 1`. Иначе — дубль/повтор/конфликт.
7) Дедуп и брокеры/стримы
7.1 Kafka
Idempotent Producer снижает дубли на входе.
Transactions позволяют атомарно коммитить офсеты + выходные записи.
Compaction: хранит последнее значение per key — пост-фактум дедуп/коалесинг (не для платежей).
Consumer-side: state store/Redis/DB для ключей окна.
7.2 NATS / JetStream
Ack/редоставка → at-least-once. Дедуп в потребителе (Inbox/Redis).
JetStream sequence/дюработ потребителя упрощают выявление повторов.
7.3 Очереди (Rabbit/SQS)
Visibility timeout + повторные доставки → нужен ключ + дедуп-стор.
SQS FIFO с `MessageGroupId`/`DeduplicationId` помогает, но TTL окна ограничен провайдером — храните ключи дольше, если бизнес требует.
8) Хранилища и аналитиka
8.1 ClickHouse/BigQuery
Дедуп по окну: `ORDER BY key, ts` и `argMax`/`anyLast` с условием.
ClickHouse:sql
SELECT key,
anyLast(value) AS v
FROM t
WHERE ts >= now() - INTERVAL 1 DAY
GROUP BY key;
Или материализованный слой «уникальных» событий (мердж по ключу/версии).
8.2 Логи/телеметрия
Допустим approximate-дедуп (Bloom) на ingest → экономим сеть/диск.
9) Повторная обработка, реплей и бэкфилл
Дедуп-ключи должны переживать реплей (TTL ≥ окно реплея).
Для бэкфилла используйте пространство ключей с версией (`key#source=batch2025`) или отдельные «сливы», чтобы не мешать онлайновому окну.
Храните артефакты результата (hash/версию) — это ускоряет «fast-skip» на повторах.
10) Метрики и наблюдаемость
`dedup_hit_total` / `dedup_hit_rate` — доля пойманных дублей.
`dedup_fp_rate` для вероятностных фильтров.
`window_size_seconds` фактическое (по telemetry late arrivals).
`inbox_conflict_total`, `upsert_conflict_total`.
`replayed_events_total`, `skipped_by_inbox_total`.
Профили по tenant/key/type: где больше всего дублей и почему.
Логи: `message_id`, `idempotency_key`, `seq`, `window_id`, `action=process|skip`.
11) Безопасность и приватность
Не кладите PII в ключ; используйте хэши/псевдонимы.
Для подписи отпечатка — HMAC(secret, canonical_payload), чтобы избежать коллизий/подделки.
Сроки хранения ключей согласуйте с комплаенсом (GDPR ретеншн).
12) Производительность и стоимость
In-proc LRU ≪ Redis ≪ SQL по латентности/стоимости на операцию.
Redis: дешевый и быстрый, но учитывайте объем ключей и TTL; шардируйте по `tenant/hash`.
SQL: дорого по p99, но дает сильные гарантии и аудиторность.
Probabilistic фильтры: очень дешево, но возможны FP — применяйте там, где «лишний SKIP» не критичен.
13) Анти-паттерны
«У нас Kafka exactly-once — ключ не нужен». Нужен — в синке/бизнес-слое.
Слишком короткий TTL для ключей → реплей/задержка доставит дубль.
Глобальный одиночный дедуп-стор → hotspot и SPOF; не шардирован по tenant/ключу.
Дедуп только в памяти — потеря процесса = волна дублей.
Bloom для денег/заказов — false positive лишит легитимной операции.
Несогласованная канонизация payload — разные хэши на идентичные по смыслу сообщения.
Игнорирование out-of-order — поздние события помечаются дублями ошибочно.
14) Чек-лист внедрения
- Определите естественный ключ (или составной/отпечаток).
- Установите окно дедуп и политику `lateness`.
- Выберите уровень(я): edge, consumer, sink; предусмотрите шардирование.
- Реализуйте Inbox/UPSERT; для потоков — keyed state + TTL.
- Если нужен approximate-барьер — Bloom/Cuckoo (только для некритичных доменов).
- Настройте реплей-совместимость (TTL ≥ окно реплея/бэкфилла).
- Метрики `dedup_hit_rate`, конфликты и лаги окон; дашборды per-tenant.
- Game Day: таймауты/ретраи, реплей, out-of-order, падение кэша.
- Документируйте канонизацию payload и версионирование ключей.
- Проведите нагрузочные тесты на «горячих ключах» и длинных окнаx.
15) Примеры конфигураций/кода
15.1 Redis SETNX + TTL (барьер)
lua
-- KEYS[1] = "dedup:{tenant}:{key}"
-- ARGV[1] = ttl_seconds local ok = redis. call("SET", KEYS[1], "1", "NX", "EX", ARGV[1])
if ok then return "PROCESS"
else return "SKIP"
end
15.2 PostgreSQL Inbox
sql
CREATE TABLE inbox (
id text PRIMARY KEY,
received_at timestamptz default now(),
status text default 'received',
result_hash text
);
-- In the handler: INSERT... ON CONFLICT DO NOTHING -> check, then UPSERT in blue.
15.3 Kafka Streams (дедуп в окне)
java var deduped = input
.selectKey((k,v) -> v.idempotencyKey())
.groupByKey()
.windowedBy(TimeWindows. ofSizeWithNoGrace(Duration. ofHours(24)))
.reduce((oldV,newV) -> oldV) // first wins
.toStream()
.map((wKey,val) -> KeyValue. pair(wKey. key(), val));
15.4 Flink (keyed state + TTL, псевдо)
java
ValueState<Boolean> seen;
env. enableCheckpointing(10000);
onEvent(e):
if (!seen.value()) { process(e); seen. update(true); }
15.5 NGINX/API-шлюз (Idempotency-Key на edge)
nginx map $http_idempotency_key $idkey { default ""; }
Proxy the key to the backend; backend solves deadup (Inbox/Redis).
16) FAQ
Q: Что выбрать: дедуп или чистая идемпотентность?
A: Обычно оба: дедуп — быстрый «фильтр» (экономия), идемпотентность — гарантия правильного эффекта.
Q: Какой TTL ставить?
A: ≥ максимального времени возможной повторной доставки + запас. Типично 24–72ч; для финансов и отложенных задач — дни/недели.
Q: Как обрабатывать поздние события?
A: Настройте `allowed lateness` и сигнализацию `late_event`; поздние — через отдельную ветку (recompute/skip).
Q: Можно ли дедуплицировать весь поток телеметрии?
A: Да, approximate-фильтрами (Bloom) на edge, но учитывайте FP и не применяйте к критичным бизнес-эффектам.
Q: Дедуп мешает бэкфиллу?
A: Разделяйте пространства ключей (`key#batch2025`) или отключайте барьер на время бэкфилла; TTL ключей должен покрывать только онлайновые окна.
17) Итоги
Дедупликация — это композиция: правильный ключ, окно и структура состояния + транзакционные паттерны (Inbox/Outbox/UPSERT) и осознанная работа с порядком и опоздавшими событиями. Разместите барьеры там, где это дешевле всего, обеспечьте идемпотентность в синках, измеряйте `dedup_hit_rate` и тестируйте реплей/фейлы — так вы получите «эффективно exactly-once» без лишних хвостов латентности и стоимости.