AI pipelines and training automation
1) Purpose and principles
Purpose: is reliable and reproduced to turn data → features → models → decisions → feedback, with the minimum time-to-value and control of risk/cost.
Principles:- Pipeline-as-Code: everything (DAG, configs, tests, politicians) - in Git, through PR and review.
- Determinism: fixed versions of data/code/containers/dependencies.
- Separation of Concerns: DataOps, FeatureOps, TrainOps, DeployOps, MonitorOps.
- Guarded Automation: we automate, but with "gates" of quality, safety and compliance.
- Privacy by Design: PII minimization, residency, audit.
2) Conveyor layers and architecture
1. Ingest & Bronze: reliable reception of events/batches (CDC, tires, retrays, DLQ).
2. Silver (normalization/enrichment): SCD, currencies/time, clearing, dedup.
3. Gold (storefronts): subject tables and datasets for training/reporting.
4. Feature Store: uniform formulas for online/offline features, versions and SLO.
5. Train & Validate: sample preparation, training, calibration, evaluation/check gates.
6. Registry & Promotion: register of models, quality cards, promotion policy.
7. Serving: REST/gRPC/Batch, feature caches, feature flags, canary/shadow.
8. Monitor & Feedback: SLI/SLO, drift/calibration, online labels, auto-retrain.
3) Orchestration: DAG patterns
Daily CT (D + 1): nightly data cycle → features → training → validation → registry candidate.
Event-Driven Retrain: trigger for PSI/ECE/expected-cost drift or circuit release.
Rolling Windows: weekly/monthly retraining with a "sliding window" of data.
Blue/Green Artifacts: all artifacts are immutable (hash), parallel versions.
Dual-write v1/v2: schema/feature migrations through double write and equivalence comparison.
python with DAG("ct_daily", schedule="@daily", start_date=..., catchup=False) as dag:
bronze = BashOperator(task_id="ingest_cdc", bash_command="ingest.sh")
silver = BashOperator(task_id="silver_norm", bash_command="dbt run --models silver")
gold = BashOperator(task_id="gold_marts", bash_command="dbt run --models gold")
feats = BashOperator(task_id="feature_store_publish", bash_command="features publish")
ds = BashOperator(task_id="build_dataset", bash_command="dataset build --asof {{ ds }}")
train = BashOperator(task_id="train", bash_command="trainer run --config conf.yaml")
eval = BashOperator(task_id="evaluate", bash_command="eval run --gate conf/gates.yaml")
reg = BashOperator(task_id="register", bash_command="registry add --stage Staging")
bronze >> silver >> gold >> feats >> ds >> train >> eval >> reg
4) Datasets and samples
Point-in-time join and "no future" for feature/labels.
Stratified by markets/tenants/time, holdout and "gap" for leaks.
Versioning: 'data _ version', 'logic _ version', 'asof _ date'; WORM snapshots.
5) Feature Store and online/offline equivalence
Unified specification of features (name, formula, owner, SLO, tests).
Online = offline: common transformation code; equivalence test (MAE/MAPE).
TTL and cache: windows 10m/1h/1d; timeouts/retreats; Folbacks "last_known_good".
yaml name: bets_sum_7d owner: ml-risk offline: {source: silver.fact_bets, window: "[-7d,0)"}
online: {compute: "streaming_window: 7d", ttl: "10m"}
tests:
- compare_online_offline_max_abs_diff: 0.5 slo: {latency_ms_p95: 20, availability: 0.999}
6) Training Automation (CT) and Quality Gates
CT cycle: preparation → training → calibration → evaluation → registration of the candidate.
Gates (example):- Off-line: PR-AUC benchmark ≥ − δ; ECE ≤ 0. 05; expected-cost limit ≤.
- Slice/Fairness: drop in metrics in any slice ≤ Y%; disparate impact is normal.
- Equivalence feature: approx.
- Cost: time/resources ≤ budget.
yaml gates:
pr_auc_min: 0.42 ece_max: 0.05 expected_cost_delta_max: 0.0 slice_drop_max_pct: 10 features_equivalence_p95_abs_diff_max: 0.5
7) Model register and promotion
Model card: data, windows, features, off/online metrics, calibration, risks, owner.
Stages: `Staging → Production → Archived`; promotion only through proven gates.
Rollback policy: keep ≥N of the latest production versions; one-click rollback.
8) CI/CD/CT: how to connect
CI (code/tests): unit/integration/contract tests, linters, security scans.
CD (serving): Docker/K8s/Helm/feature flags, canary/shadow/blue-green.
CT (data/training): schedule/event orchestrator; artifacts → registry.
Promotion Gates: auto-release in production with green online SLO (on canary ≥ X hours).
9) Multi-tenancy and residency
Tenants/regions: isolated pipelines and encryption keys (EEA/UK/BR); banning interregional joins without reason.
Secrets: KMS/CMK, Secret Manager; tokenized IDs in logs.
DSAR/RTBF policies: computable projections and selective edits in features and logs; Legal Hold for cases.
10) Monitoring → feedback → retrain
SLI/SLO: latency p95/p99, 5xx, coverage, cost/request; drift PSI/KL, ECE, expected-cost.
Online labels: proxy (hour/day) and delayed (D + 7/D + 30/D + 90).
Auto-actions: recalibration/threshold update → shadow retrain → canary → promotion.
Runbooks: degradation scenarios (drift, calibration, feature cache, providers).
11) Security, RG/AML and Solutions Policy
Guardrails: pre/post-filter, caps frequencies, cooldown, ban lists.
Policy Shielding - Model → Solution → Policy Filter → Action.
Audit: 'model _ id/version', 'feature _ version', 'threshold', 'policy _ id', reasons.
WORM archive: releases, quality reports, test/promotion logs.
12) Cost and performance
Path profiling: features (30-60%), inference (20-40%), IO/network.
Cost-dashboards: cost/request, cost/feature, GPU/CPU-clock, small-files.
Optimization: materialization of heavy offline features, hot window cache, INT8/FP16, replay quotas/backfill.
Chargeback: we distribute the budget by team/market, control the "expensive" features.
13) Examples (fragments)
Argo Workflow:yaml apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: {name: ct-daily}
spec:
entrypoint: pipeline templates:
- name: pipeline dag:
tasks:
- name: gold template: task arguments: {parameters: [{name: cmd, value: "dbt run --models gold"}]}
- name: features dependencies: [gold]
template: task arguments: {parameters: [{name: cmd, value: "features publish"}]}
- name: train dependencies: [features]
template: task arguments: {parameters: [{name: cmd, value: "trainer run --config conf.yaml"}]}
- name: eval dependencies: [train]
template: task arguments: {parameters: [{name: cmd, value: "eval run --gate conf/gates.yaml"}]}
- name: task inputs: {parameters: [{name: cmd}]}
container: {image: "ml/ct:latest", command: ["/bin/bash","-lc"], args: ["{{inputs.parameters.cmd}}"]}
Gate script (pseudocode):
python ok = (pr_auc >= gate.pr_auc_min and ece <= gate.ece_max and expected_cost_delta <= gate.expected_cost_delta_max and slice_drop_pct <= gate.slice_drop_max_pct and features_equivalence_p95_abs_diff <= gate.features_equivalence_p95_abs_diff_max)
exit(0 if ok else 1)
Promotion policy (idea):
yaml promotion:
require:
- offline_gates_passed
- canary_online_hours >= 24
- slo_green: [latency_p95, error_rate, coverage]
- drift_warn_rate <= 5%
14) Processes and RACI
R (Responsible):- Data Eng — Ingest/Silver/Gold, Feature Store, CDC/Backfill;
- Data Science - samples/training/calibration/gates;
- MLOps - orkestration / reestr / serving / nablyudayemost.
- A (Accountable): Head of Data / CDO.
- C (Consulted): Compliance/DPO (PII/RG/AML/DSAR), Security (KMS/Audit), SRE (SLO/Value), Finance (Budgets/ROI), Product.
- I (Informed): Marketing/Operations/Support.
15) Implementation Roadmap
MVP (3-6 weeks):1. DAG «daily CT»: Bronze→Silver→Gold→Feature Store→Train→Eval→Registry(Staging).
2. Feature Store v1 and online/offline equivalence test.
3. Quality gates (PR-AUC/ECE/expected-cost/slice).
4. Model register, card and WORM release archive.
Phase 2 (6-12 weeks):- Auto-recalibration/threshold update, canary-promotion via online SLO.
- Event-driven retrain by drift; dual-write v1/v2 for migrations.
- Cost-dashboards and backfill/replay quotas; multi-tenant isolation.
- Fairness policies on slices and auto-reporting.
- Multi-regional residency (EEA/UK/BR) with individual keys.
- Auto-retrain by schedule and events, autogen documentation of pipelines.
16) Delivery checklist
- Pipeline-as-Code в Git; CI tests (unit/integration/contracts/security).
- Bronze/Silver/Gold and Feature Store are stable; equivalence feature green.
- Offline gates passed; model card is full; WORM archive created.
- Canary ≥ 24 h with green SLOs; rollback button and kill-switch function.
- Drift/ECE/expected-cost and online label monitoring is enabled.
- PII/residency/DSAR/RTBF/Legal Hold met; audit configured.
- Cost in budget; cache/quotas/feature and replay limits are active.
17) Anti-patterns and risks
Training without gates and cards; promotion "by hand."
Manual, "one-shot" steps outside the orchestrator; no Git story.
Inconsistent online/offline features → discrepancies on sale.
Ignoring drift/calibration/expected-cost; ROC-AUC "per species" only.
Lack of residence/PII policies; logging of "raw" IDs.
Unlimited backfill/replays → cost explosion and impact on SLA.
18) The bottom line
AI pipelines are a pipeline of value, not a set of laptops. Formalize data layers, Feature Store and CT/CI/CD, add quality and safety gates, automate retrain by drift, keep online/offline equivalence and a transparent economy. This gives you a fast, predictable, and compliant data → model → effect cycle that scales across markets and time.