Озера данных и агрегация потоков
1) Назначение и ценность
Data Lake/Lakehouse — опорный слой длительного хранения и масштабного чтения, где:- Потоки из продуктов/игр/платежей приземляются в Bronze «как есть».
- Silver нормализует и обогащает, обеспечивая согласованные ключи и качество.
- Gold — агрегированные витрины (в т.ч. real-/near-real-time) для BI, регуляторки, антифрода/RG.
Агрегация потоков на Lakehouse дает: низкую задержку отчетов, предсказуемую стоимость, воспроизводимость и форензику.
2) Референс-архитектура
1. Ingest/Edge: HTTP/gRPC, OTel, batch endpoints → шина (Kafka/Redpanda).
2. Bronze (append-only): объектное хранилище + ACID-таблицы (Delta/Iceberg/Hudi), партиции by date/market/tenant; хранение исходного payload.
3. Stream Compute: Flink/Spark/Beam — оконные агрегаты, CEP, дедуп, online-lookups.
4. Silver (clean/conform): нормализация валют/таймзон, FK/справочники, SCD для измерений.
5. Serving/OLAP: ClickHouse/Pinot/Druid — материализованные минутные/секундные агрегаты для панелей.
6. Gold (serve): дневные/часовые витрины, регуляторные срезы, неизменяемые экспортные пакеты (WORM).
7. Контуры управления: Schema Registry, DQ-как-код, lineage, каталоги, секреты/KMS, RBAC/ABAC.
3) Контракты и схемы
Schema-first: JSON/Avro/Protobuf; обязательные поля: `event_time (UTC)`, `event_id`, `trace_id`, `user_pseudo_id`, `market`, `schema_version`.
Эволюция: back-compatible → добавления nullable; breaking → `/v2` + двойная запись.
Каталог: описание домена, владелец, SLA свежести, DQ-правила, lineage.
4) Приземление потоков в озеро
Exactly-once на дне: at-least-once публикация + идемпотентный sink (MERGE/upsert по `event_id`).
Дедуп: stateful в стриме + уникальность в Silver.
Компакция файлов: small files → регулярные OPTIMIZE/VACUUM для чтения и стоимости.
Time-travel: включает отладку, реплей и audit.
sql
CREATE TABLE bronze. payment_events (
event_id STRING, user_pseudo_id STRING, currency STRING,
amount DECIMAL(18,2), market STRING, event_time TIMESTAMP, payload STRING
)
PARTITIONED BY (days(event_time), market);
5) Агрегация потоков: окна и watermarks
Окна:- Tumbling — фиксированные (например, 1 мин/5 мин) для стабильных панелей.
- Hopping — перекрывающиеся (шаг < окно) для «гладких» метрик.
- Session — поведенческие разрывы по неактивности.
- Watermarks: управление late data (обычно 2–5 минут), правила доэмиссий/коррекции.
sql
SELECT market,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS ts_min,
COUNT() AS deposits_1m,
SUM(amount_base) AS sum_1m
FROM silver. payments
GROUP BY market, TUMBLE(event_time, INTERVAL '1' MINUTE);
6) Материализация агрегатов
OLAP-движок (ClickHouse/Pinot/Druid): хранит минутные/секундные агрегаты для дашбордов и оперативной аналитики.
Lakehouse Gold: хранит суточные/часовые срезы для отчетности и сверок (воспроизводимость).
sql
CREATE MATERIALIZED VIEW mv_ggr_1m
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(event_time)
ORDER BY (toStartOfMinute(event_time), market, provider_id) AS
SELECT toStartOfMinute(event_time) AS ts_min,
market,
provider_id,
sumState(stake_base) AS s_stake,
sumState(payout_base) AS s_payout
FROM stream. game_events
GROUP BY ts_min, market, provider_id;
Gold — дневной срез (Lakehouse):
sql
CREATE OR REPLACE VIEW gold. ggr_daily AS
SELECT
DATE(event_time) AS event_date,
market, provider_id,
SUM(stake_base) AS stakes_eur,
SUM(payout_base) AS payouts_eur,
SUM(stake_base) - SUM(payout_base) AS ggr_eur
FROM silver. fact_game_financials
GROUP BY 1,2,3;
7) Silver: нормализация и согласование
Время и валюта: `event_time (UTC)`, `amount_base`, `fx_rate_used`, `fx_source`.
Ключи/справочники: `user_pseudo_id`, `game_id`, `provider_id`, `market`.
SCD II: историзация измерений (users/games/providers/RG/KYC).
DQ-правила: уникальность ключей, справочники, диапазоны сумм, temporal-валидность.
8) Реестр агрегатов и «правильные» определения
Semantic Layer: единые формулы GGR/NGR, ставки/выигрыши, конверсия, ARPPU, latency p95.
Версионирование метрик: `metric_version` и «as-of» вычисления.
Док-карточки: owner, формула, источники, SLA готовности.
9) Exactly-once/идемпотентность и порядок
Шина: at-least-once + партиционирование (локальный порядок).
Обработка: дедуп по `event_id` (TTL 24–72ч), CEP/оконные операторы с корректировками.
Sink: транзакционные коммиты или idempotent upsert/merge.
Outbox/Inbox: публикация доменных событий из OLTP с гарантией.
10) Late data и корректировки
Allowed lateness: 2–5 мин для оперативных витрин; суточные пересборки для Gold.
Коррекции: доэмиссии в OLAP и пересбор Gold (idempotent).
Флаги: `late=true`, `correction_of=<event_id>` для аудита.
11) Наблюдаемость и DQ
SLI/SLO (ориентиры):- p95 ingest→1-мин витрина ≤ 2–5 c; Gold daily готово до 06:00 лок.
- Completeness ≥ 99.5%; Schema validity ≥ 99.9%; Trace coverage ≥ 98%.
- Метрики пайплайнов: lag/throughput/busy time/state size, late-ratio, dup-rate.
- DQ-дашборды: Freshness/Completeness/Validity, воронка потерь, карта «горячих» ключей.
- Lineage: путь от Bronze до Gold/экспортов; impact-анализ при изменениях.
12) Приватность, резидентность, безопасность
PII-минимизация: псевдонимизация, отдельный защищенный маппинг.
Residency: EEA/UK/BR — отдельные каталоги и ключи шифрования; запрет кросс-региональных join’ов без оснований.
Шифрование: TLS in-transit; KMS/CMK at-rest; подписи экспортов + WORM при регуляторке.
DSAR/RTBF/Legal Hold: селективные редактирования, заморозка удалений, аудируемые доступы.
13) Производительность и стоимость
Партиционирование: по дате/рынку/тенанту; кластеризация/Z-order по часто-фильтруемым атрибутам.
Компакция: устранение small files, регулярный OPTIMIZE/VACUUM.
Материализация: минуты/секунды — в OLAP; сутки/часы — в Gold.
Tiered storage: hot/warm/cold, SLA восстановления, chargeback по командам (cost/GB, cost/query).
Предагрегации/скетчи: HyperLogLog/approx-distinct там, где приемлемо.
14) Примеры (фрагменты)
Flink CEP — структурирование депозитов (10 мин):python if count_deposits(window=10MIN) >= 3 \
and sum_deposits(window=10MIN) > THRESH \
and all(d. amount < REPORTING_LIMIT for d in window_events):
emit_alert("AML_STRUCTURING", user_id, snapshot())
SQL — дедуп при загрузке в Silver:
sql
CREATE TABLE silver. payments AS
SELECT EXCEPT(rn) FROM (
SELECT p., ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_time) rn
FROM bronze. payment_events p
) WHERE rn = 1;
Iceberg/Delta — MERGE идемпотентный:
sql
MERGE INTO silver. fact_bets s
USING stage. fact_bets_delta d
ON s. bet_id = d. bet_id
WHEN MATCHED THEN UPDATE SET
WHEN NOT MATCHED THEN INSERT;
15) Процессы и RACI
R (Responsible):- Data Platform (Lakehouse/каталог/ACID, компакция),
- Streaming (агрегаты/CEP/dedup),
- Domain Analytics (метрики/Gold).
- A (Accountable): Head of Data/CDO.
- C (Consulted): Compliance/Legal/DPO (PII/residency/Legal Hold), Finance (FX/GGR), SRE (SLO/стоимость), Security.
- I (Informed): BI/Продукт/Маркетинг/Операции.
16) Дорожная карта внедрения
MVP (3–5 недель):1. Lakehouse Bronze/Silver (ACID-таблицы), ingest из Kafka, registry схем.
2. Базовые стрим-агрегаты (1–5 мин) в OLAP; витрина Gold.ggr_daily (D+1 до 06:00).
3. DQ-как-код для Payments/Gameplay, дашборды Freshness/Completeness.
4. Компакция/OPTIMIZE, минимальные cost-метрики и алерты lag/late/dup.
Фаза 2 (5–10 недель):- Расширение Silver (SCD II для users/games/providers), lineage и impact-анализ.
- Асинхронные lookups (RG/KYC/ASN/BIN), управление late-коррекциями.
- Семантический слой метрик, регламент экспортов (WORM/подписи).
- Мульти-регион, DR/реплей-симулятор, auto-tuning окон и watermarks.
- Cost-дашборды, chargeback/квоты, tiered storage и архивация.
- Автогенерация документации витрин и карточек метрик.
17) Чек-лист перед продом
- Схемы и контракты в реестре; back-compat тесты зеленые.
- Включены дедуп, watermark/allowed lateness, DLQ.
- Компакция/OPTIMIZE/VACUUM настроены по расписанию.
- SLO: p95 ingest→minute-view, Gold до 06:00; алерты lag/late/dup/state size.
- DQ-правила активны; lineage виден от Bronze до экспортов.
- RBAC/ABAC и KMS; резидентность и DSAR/RTBF/Legal Hold протестированы.
- Стоимость под контролем (cost/GB, cost/query, доля cold), лимиты на реплеи.
18) Анти-паттерны и риски
Смешение сырых и отчетных данных в одной таблице: нарушает reproducibility.
Отсутствие компакции: взрыв small files → дорогие запросы.
Вычисление FX «задним числом»: ломает историю и отчеты.
Нет watermarks/late-политик: «плывут» витрины и алерты.
Full reload без нужды: используйте инкременты/MERGE и корректировки.
PII в аналитике: держите маппинги отдельно, включайте CLS/RLS.
19) Глоссарий (кратко)
Lakehouse — data lake + ACID-таблицы и SQL-движок.
Bronze/Silver/Gold — сырые/нормализованные/сервинговые слои.
Watermark — граница готовности окон по event-time.
Materialized View — предвычисленная витрина для быстрого чтения.
Time-travel — чтение исторических версий таблиц.
WORM — неизменяемое хранение экспортных артефактов.
20) Итог
Озеро данных с правильной стрим-агрегацией — это дисциплина слоев и контрактов: Bronze «как есть», Silver для нормализации и качества, OLAP для минутных панелей, Gold для воспроизводимых отчетов. Управляя окнами и watermarks, дедупом и компакцией, приватностью и стоимостью, вы получаете быстрые, проверяемые и комплаентные витрины для продукта, комплаенса и операционного управления.