Парсинг реестров и автоматизация
TL;DR
Надежная «сверочная» автоматика строится на трех китах: детерминированный ingestion (безопасная доставка, идемпотентность, контроль целостности), нормализованный слой данных (единая схема, ключи сопоставления, стандартизованные единицы времени/валют/знаков) и жесткая дисциплина качества (валидации, толерансы, DLQ, алерты, автокоррекции). Цель — превратить разношерстные файлы/вебхуки в стабильные таблицы для сверки, отчетности и BI с SLA по доступности.
1) Ландшафт источников и форматов
1.1 Источники
PSP/эквайеры/агрегаторы: транзакции, сеты, комиссии, диспуты.
Банки: выписки MT940, ISO 20022 CAMT.052/053/054, платежи PAIN.001/002.
APM/кошельки/выплаты (OCT/RTP/SEPA): реестры payouts, возвраты.
Крипто-кастоди/биржи: он-чейн транзакции, отчеты конвертаций/комиссий.
Налоги/гос. порталы: CSV/XLSX/PDF, иногда через скриптованный браузер.
1.2 Форматы
CSV/TSV (вариативные разделители, локали, кодировки).
XLSX (мульти-шит, объединенные ячейки).
XML (ISO 20022 CAMT/PAIN, кастомные схе-мы XSD).
SWIFT MT940/942 (позиционные поля).
JSON-API/NDJSON (инкрементальные выгрузки, курсоры).
PDF (табличные — парсер; сканы — OCR).
ZIP/TAR.GZ (батчи с несколькими файлами).
2) Архитектура ingestion-пайплайна
Контуры:1. Landing: безопасная приемка файлов (SFTP/FTPS/WebDAV/API/webhooks) → немедленно считаем checksum, сохраняем сырье неизменно.
2. Raw: раскладка по датам/провайдерам/батчам, хранение с версиями.
3. Normalize: парсинг → унификация типов/единиц → normalized таблицы.
4. Validated: пост-валидации (правила качества) → флаги, DLQ.
5. Matched: сопоставление с внутренними событиями/банком.
6. Serving/BI: витрины для сверки/финансов/операций.
Ключевые требования:- Идемпотентность ingestion: `(provider, file_name, file_size, checksum, statement_date)` → уникальный ключ.
- Повторы/ретраи: повторное прогоняние файла не создает дублей.
- DLQ (dead-letter queue): все нераспознанное/нарушившее правила — в изолированную очередь.
- Версионирование: новый файл за тот же день → новая версия с ссылкой на предыдущую.
3) Безопасность доставки и секретов
Каналы: SFTP с ограниченными ключами; FTPS — только при строгом TLS; API — OAuth2/токены с коротким TTL.
Шифрование контента: PGP/GPG при загрузке файлов; S/MIME для e-mail-инбоксов (если используются).
Контроль целостности: SHA-256/512 checksum, сравнение с хешем в манифесте.
Секреты: хранить в Vault/KMS, ротация, запрещены в конфиг-файлах/логах.
Доступы: RBAC + принцип «наименьших привилегий», отдельные сервис-аккаунты.
4) Нормализация и схема данных
4.1 Универсальные правила
Время: всегда UTC в ISO-8601; для дат settlement — `DATE` без TZ.
Суммы: `DECIMAL(p, s)` в minor units + отдельный `scale`; знак: приход/расход строго по словарю.
Валюты: ISO-4217, фиксированная таблица курсов с `fx_src`.
Локали: запрет автодетекта — явная настройка разделителей/десятичной точки/тысячных.
Кодировки: вход в UTF-8; иные — конвертация с логом.
4.2 Нормализованный «плоский» слой (пример)
json
{
"provider": "Acquirer_A",
"source_kind": "PSP_TX PSP_SETTLEMENT BANK WALLET CRYPTO",
"kind": "AUTH CAPTURE REFUND PAYOUT FEE SETTLEMENT CHARGEBACK",
"payment_id": "pay_123", // ваше
"provider_txid": "psp_abc_789", // внешнее
"merchant_ref": "mr_456",
"sequence": 0, // partial/refund line index
"amount_minor": 100000, // 1000.00
"currency": "EUR",
"fee_minor": 120, // 1.20
"fx_rate": 1.0000,
"fx_src": "PSP ECB BANK",
"event_ts": "2025-11-03T12:00:00Z",
"value_date": "2025-11-05",
"account": "PSP_MERCHANT_CARD_A",
"bin": "425000",
"last4": "1234",
"status": "APPROVED CAPTURED SUCCESS FAILED SETTLED",
"file_id": "ing_20251103_001",
"row_hash": "sha256(raw_row)"
}
5) Парсеры по форматам: приемы и грабли
5.1 CSV/TSV
Явно задавайте `delimiter`, `quotechar`, `escapechar`, `encoding`.
5.2 XLSX
Чтение по sheet whitelist; запрет автослияний — расплющивание объединенных ячеек.
Преобразование формул в значения; даты Excel → UTC с явной TZ.
5.3 XML (ISO 20022 CAMT/PAIN)
Валидация по XSD; XPath-маппинг реквизитов (`<Ntry>`, `<TxDtls>`, `<Amt>`, `<CdtDbtInd>`).
Нормализация credit/debit → знак; поддержка множественных `<Chrgs>`, `<RmtInf>`.
5.4 MT940
Разбор тегов `:61:`, `:86:`; поддержка национальных расширений; позиционные поля → правила slicing.
Консолидация нескольких `:61:` в один батч.
5.5 JSON/NDJSON/API
5.6 PDF/OCR
Сперва попытка табличного парсинга (таб-детектор), только затем OCR (Tesseract) с whitelist символов.
Пост-валидация: суммы, контрольные итоги, сверка количества строк.
5.7 Архивы/батчи
Распаковка с сохранением структуры; каждому файлу — отдельный `file_id`; манифест, контроль всех частей.
6) Валидации и правила качества данных
Обязательные проверки:- Схема: все required поля присутствуют.
- Типы: суммы — числовые, даты — парсятся.
- Контрольные суммы/итоги: сумма строк = итог в файле (если есть).
- Диапазоны: дата в разумном окне; сумма > 0 (или по словарю допустимых отрицательных).
- Уникальность: `(provider, provider_txid, sequence)` не дублируется в normalized.
- Толерансы: допустимые расхождения `amount/fx/time`.
Результат: `VALID`, `VALID_WITH_WARNINGS`, `INVALID → DLQ (reason_code)`.
7) Идемпотентность и дедупликация
Ingestion key: `(provider, file_name, filesize, checksum, statement_date)` → единственный `file_id`.
Row-level idem: `row_hash = sha256(normalized_row_compact)`; повторная загрузка не создает новых записей.
Webhooks/API: `idempotency_key` провайдера + ваши метки (`exec_id`), хранить TTL.
Дубли провайдера: дедуп по `provider_txid`+`sequence`, при расхождении — в DLQ_DUPLICATE.
8) Оркестрация и расписания
Оркестратор: Airflow/Dagster (DAG: `fetch → decrypt → parse → normalize → validate → publish → match`).
SLA/SLO: `Time-to-Availability (TtA)` от появления файла до `normalized=READY`.
Ретраи: экспоненциальный backoff + jitter; лимиты попыток; четкие статусы.
Параллелизм и изоляция: тяжелые OCR/парсинг XLSX — в отдельном пуле/воркере с лимитом CPU/RAM.
DLQ-replay: периодический reprocess при обновлении правил/маппингов.
9) Наблюдаемость и алерты
Метрики:- Ingestion Success %, Parse Success % по источникам.
- TtA p50/p95, Throughput (строк/мин).
- DLQ Rate и Aging DLQ p50/p95.
- Schema Drift Incidents (смена заголовков/формата).
- Duplicate Rate по `provider_txid`.
- `TtA p95 > SLA` → P1.
- `DLQ Rate > 2%` за час по провайдеру → P1.
- `Schema Drift detected` → P0 (остановка авто-матчинга по источнику).
- `Duplicate spike` → P2 (проверить провайдера/вебхуки).
Дашборд: воронка `files → rows_raw → rows_norm → rows_valid → rows_matched`, карта DLQ по причинам, TtA-квантили.
10) Автокоррекции и маппинги
Header aliases: словарь с версиями (e.g., `Amount`→`amt`, `AMOUNT`→`amt`).
11) Связь с «Сверкой платежей и отчетов PSP»
Готовый normalized слой — вход для матчинга (provider_txid/merchant_ref/fuzzy), расчета diff-таксономии, авто-журналов и settlement↔bank-сшивки. Ключевые поля: `provider_txid`, `sequence`, `kind`, `amount_minor`, `value_date`, `account`.
12) Модель хранилища и таблицы
Таблица landed файлов:sql
CREATE TABLE landed_files (
file_id TEXT PRIMARY KEY,
provider TEXT,
source_kind TEXT,
file_name TEXT,
file_size BIGINT,
checksum TEXT,
statement_date DATE,
received_at TIMESTAMP WITH TIME ZONE,
version INT,
status TEXT, -- RECEIVED PARSED FAILED error TEXT
);
Нормализованные строки:
sql
CREATE TABLE psp_norm (
row_id BIGSERIAL PRIMARY KEY,
file_id TEXT REFERENCES landed_files(file_id),
provider TEXT,
source_kind TEXT,
kind TEXT,
payment_id TEXT,
provider_txid TEXT,
merchant_ref TEXT,
sequence INT,
amount_minor BIGINT,
currency CHAR(3),
fee_minor BIGINT,
fx_rate NUMERIC(18,8),
fx_src TEXT,
event_ts TIMESTAMPTZ,
value_date DATE,
account TEXT,
status TEXT,
row_hash TEXT UNIQUE,
repair_flags TEXT[]
);
CREATE INDEX idx_psp_norm_txid ON psp_norm(provider, provider_txid, sequence);
13) Псевдокод парсеров
CSV/XLSX:python def parse_table(file, spec):
df = load_csv_or_xlsx(file, delimiter=spec.delim, encoding=spec.enc, sheet=spec.sheet)
df = rename_headers(df, spec.header_aliases)
df = clean_amounts(df, thousand=spec.thousand, decimal=spec.decimal, sign_policy=spec.sign)
rows = []
for r in df.itertuples():
rows.append(normalize_row(r, spec))
return rows
XML CAMT:
python def parse_camt(xml):
root = parse_xml(xml, xsd="camt053.xsd")
for ntry in root.findall('.//Ntry'):
sign = 1 if ntry.findtext('CdtDbtInd') == 'CRDT' else -1 amt = Decimal(ntry.findtext('Amt')) sign
... map to normalized fields
OCR PDF (fallback):
python def parse_pdf_ocr(pdf):
text = tesseract(pdf, lang="eng", psm=6, whitelist="0123456789.,-;:/A-Za-z")
table = detect_table(text)
return normalize_table(table)
14) GDPR/PII и редактирование логов
Маскирование/хэширование: PAN/email/телефон → `sha256+salt`, логи — без первичных значений.
Политика хранения: `retention` по типу источника (AML/бухучет).
Доступы к PII — только по роли; аудит чтений/экспортов.
15) KPI и цели (для парсинга/ingestion)
Ingestion Success % ≥ 99.5%/день на источник.
Parse Success % ≥ 99%, DLQ ≤ 1%.
TtA p95 (файл→normalized) ≤ 15 минут (CSV/XML), ≤ 60 минут (PDF/OCR).
Schema Drift Incidents: 0/месяц без алерта/фикса.
Duplicate Rate по `provider_txid` ≤ 0.05%.
16) Плейбуки инцидентов
Schema drift: стоп авто-матчинга, включить «мягкий» парсер с ML-детектом колонок, подготовить alias-патч, прогнать DLQ-replay.
Всплеск DLQ: отладка последних файлов, проверка кодировки/локали/знака, временно понизить строгость толерансов (с флагом).
Задержки SFTP: переключение на API-polling/вебхуки, увеличение ретраев, коммуникация с провайдером.
Spikes duplicates: включить доппроверку `row_hash`, блок повторов до выяснения.
17) Тест-кейс-пакет (UAT/Prod-готовность)
1. Идемпотентность: повтор той же загрузки → 1 `file_id`, 0 новых строк.
2. Локали: файлы с `,`/`.`/пробелами → корректные суммы.
3. Partial/refund: несколько `sequence` к одному `provider_txid`.
4. XML XSD: невалидный CAMT → `INVALID` + DLQ.
5. MT940 вариации: национальные расширения → корректный разбор.
6. PDF→OCR: скан с шумом → извлечение и прохождение базовых правил.
7. Schema drift: новый хэдер → alias-патч и повторная обработка исторических файлов.
8. Throughput: нагрузочный тест N файлов/час → соблюдение TtA SLA.
9. PII-редакция: логи без PAN/e-mail, только хэши.
18) Чек-лист внедрения
- Реестр источников: протокол, расписание, SLA, формат, контакт.
- Безопасные каналы (SFTP/PGP/API), Vault для секретов.
- Идемпотентный ingestion + checksum + версии.
- Парсеры по форматам, alias-словарь, sign/locale-политики.
- Нормализованный слой и индексы ключей.
- Правила валидаций, толерансы, DLQ и replay.
- Оркестратор (DAG), ретраи/backoff, пулы ресурсов.
- Наблюдаемость: метрики, дашборды, алерты.
- GDPR/PII-маскирование, аудиты доступа.
- Тест-кейсы и регулярные schema-drift drills.
Резюме
Автоматизация парсинга — это не «написать парсер», а выстроить промышленный контур: надежная доставка и шифрование, идемпотентные пайплайны, строгая нормализация, правила качества и прозрачные алерты. Такой контур превращает любые реестры в предсказуемые таблицы с гарантированным SLA по доступности данных — фундамент для сверок, казначейства и управленческой отчетности.