Back to Quality

Data Pipeline Observability: From Alerts to Root Cause

Building observability into data pipelines for fast incident detection and root cause analysis — covering freshness, volume, schema, distribution, and lineage.

Chris P

Chris P

Quality11 min read
Data quality dashboard with pass rate metrics

Your dashboard is showing yesterday's numbers. An analyst pings you on Slack. You check Airflow — all DAGs are green. You check Snowflake — the tables exist. But somewhere between the source system and the dashboard, data stopped flowing, and nobody was alerted. This is the observability gap that most data teams live with until it causes a serious incident.

Data pipeline observability is the practice of instrumenting your pipelines so that you detect issues within minutes, understand their impact immediately, and trace them to root cause without guesswork. This article covers the five pillars of data observability, practical implementations for each, and the build-versus-buy decision.

The Five Pillars of Data Observability

Every data observability system monitors five dimensions: freshness (is data arriving on time?), volume (is the expected amount of data present?), schema (has the structure changed unexpectedly?), distribution (do values look statistically normal?), and lineage (which upstream change caused this downstream issue?). Skip any one and you have a blind spot.

Pillar 1: Freshness Monitoring

Freshness measures the time between now and the most recent record in a table. If your orders table should update every hour but the latest record is from 6 hours ago, something is broken.

sql
1-- Freshness monitor: alert when a table falls behind its SLA2WITH freshness AS (3    SELECT4        'analytics.fct_orders'       AS table_name,5        MAX(updated_at)              AS latest_record,6        CURRENT_TIMESTAMP()          AS checked_at,7        DATEDIFF('minute', MAX(updated_at), CURRENT_TIMESTAMP()) AS minutes_stale,8        120                          AS sla_minutes9    FROM analytics.fct_orders10    UNION ALL11    SELECT12        'analytics.fct_payments',13        MAX(created_at),14        CURRENT_TIMESTAMP(),15        DATEDIFF('minute', MAX(created_at), CURRENT_TIMESTAMP()),16        6017    FROM analytics.fct_payments18)19SELECT20    table_name, latest_record, minutes_stale, sla_minutes,21    CASE22        WHEN minutes_stale > sla_minutes THEN 'BREACH'23        WHEN minutes_stale > sla_minutes * 0.8 THEN 'WARNING'24        ELSE 'OK'25    END AS status26FROM freshness27ORDER BY minutes_stale DESC;

Store the results of each freshness check in a monitoring table. Over time, this gives you historical freshness trends — invaluable for SLA reporting and for identifying tables that are gradually degrading before they breach.

Pillar 2: Volume Anomaly Detection

Volume monitoring checks whether the number of rows loaded in each pipeline run falls within expected bounds. A table that normally receives 500,000 rows per day suddenly receiving 50,000 is a strong signal that something went wrong upstream.

python
1import pandas as pd2import numpy as np3from datetime import datetime, timedelta4 5def detect_volume_anomaly(6    table_name: str,7    current_count: int,8    history_df: pd.DataFrame,9    lookback_days: int = 30,10    z_threshold: float = 2.5,11) -> dict:12    cutoff = datetime.now() - timedelta(days=lookback_days)13    recent = history_df[history_df["run_date"] >= cutoff]["row_count"]14 15    if len(recent) < 7:16        return {"table": table_name, "status": "INSUFFICIENT_DATA"}17 18    mean = recent.mean()19    std = recent.std()20    z_score = 0.0 if std == 0 else (current_count - mean) / std21 22    if abs(z_score) > z_threshold:23        status = "ANOMALY"24    elif abs(z_score) > z_threshold * 0.7:25        status = "WARNING"26    else:27        status = "OK"28 29    return {30        "table": table_name,31        "status": status,32        "current_count": current_count,33        "mean": int(mean),34        "z_score": round(z_score, 2),35        "lower_bound": max(0, int(mean - z_threshold * std)),36        "upper_bound": int(mean + z_threshold * std),37    }

The z-score approach catches both drops and spikes. A sudden 10x volume increase is just as suspicious as a drop — it could mean duplicate ingestion or a source system replaying historical data.

Pillar 3: Schema Change Detection with Elementary

Schema changes are the most common cause of silent pipeline failures. The best approach for dbt projects is the Elementary package, which integrates schema monitoring directly into your dbt workflow.

yaml
1# packages.yml — add Elementary to your dbt project2packages:3  - package: elementary-data/elementary4    version: "0.16.0"5 6# models/staging/schema.yml — add Elementary monitors7version: 28models:9  - name: stg_orders10    tests:11      - elementary.schema_changes:12          severity: warn13      - elementary.volume_anomalies:14          timestamp_column: updated_at15          severity: warn16      - elementary.freshness_anomalies:17          timestamp_column: updated_at18          severity: error19    columns:20      - name: order_id21        tests:22          - not_null23          - unique24      - name: order_total25        tests:26          - elementary.column_anomalies:27              column_anomalies:28                - zero_count29                - null_count30                - average

Run dbt test on every pipeline execution. When Elementary detects a schema change — a new column, a dropped column, a type change — it logs the change and can fail the pipeline if configured with severity: error.

Pillar 4: Distribution Drift

Distribution monitoring catches issues that pass all other checks. The table has fresh data, the right number of rows, and the correct schema — but values are wrong. Revenue is 10x higher because a source system switched from cents to dollars. Statistical tests detect these drifts. For numeric columns, compare distributions against a historical baseline. Elementary handles this through column-level anomaly tests.

Pillar 5: Lineage-Powered Root Cause Analysis

When an alert fires, the first question is: what changed upstream? Without lineage, answering this requires manual investigation. With lineage, the answer is a graph traversal. dbt provides compile-time lineage through ref() and source(). Dagster extends this with runtime lineage. Tools like DataHub and OpenLineage capture cross-system lineage.

Slack Alerting Integration

python
1import requests2 3SLACK_WEBHOOK = "https://hooks.slack.com/services/T00/B00/xxxx"4 5def send_observability_alert(table_name, check_type, status, details, dashboard_url=None):6    emoji = {"BREACH": "\U0001f7e5", "ANOMALY": "\U0001f7e5", "WARNING": "\U0001f7e8", "OK": "\u2705"}7    detail_lines = "\n".join(f"  *{k}*: {v}" for k, v in details.items())8    message = {9        "blocks": [10            {"type": "header", "text": {"type": "plain_text", "text": f"{emoji.get(status, '?')} Data Observability Alert"}},11            {"type": "section", "text": {"type": "mrkdwn", "text": f"*Table:* `{table_name}`\n*Check:* {check_type}\n*Status:* {status}\n\n{detail_lines}"}},12        ]13    }14    if dashboard_url:15        message["blocks"].append({"type": "actions", "elements": [{"type": "button", "text": {"type": "plain_text", "text": "View Dashboard"}, "url": dashboard_url}]})16    requests.post(SLACK_WEBHOOK, json=message, timeout=10)

SLO/SLI Framework for Data Teams

Observability without SLOs is just noise. Borrow the SLO/SLI framework from site reliability engineering. An SLI is a measurement (freshness in minutes, volume deviation %). An SLO is a target (fct_orders will be no more than 120 minutes stale 99.5% of the time). An error budget is the allowed failure.

yaml
1# data_slos.yml — SLO definitions for critical data products2data_products:3  - name: fct_orders4    owner: analytics-engineering5    tier: 16    slos:7      freshness:8        sli: minutes_since_last_update9        target: 12010        compliance: 99.5%11        window: 30d12      volume:13        sli: row_count_z_score14        target_range: [-2.5, 2.5]15        compliance: 99.0%16        window: 30d17  - name: dim_customers18    owner: analytics-engineering19    tier: 220    slos:21      freshness:22        sli: minutes_since_last_update23        target: 36024        compliance: 99.0%25        window: 30d

Automated Incident Response Playbooks

Observability signals become truly valuable when they trigger repeatable actions instead of ad hoc firefighting. When a freshness SLO breaches, the first questions are whether upstream extraction finished, whether the orchestrator task failed silently, and whether downstream consumers should be paused. Encoding those steps in code reduces mean time to recovery and preserves audit context for postmortems. A practical pattern is to classify the breach, query orchestration metadata, attempt a bounded auto-remediation such as retriggering a single failed task, and emit a structured Slack message with links to logs and run IDs. Guardrails matter: cap retries, require idempotent tasks, and escalate to humans when upstream systems are unhealthy.

python
1import os2import time3import requests4from typing import Optional5 6SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK_URL"]7AIRFLOW_URL = os.environ["AIRFLOW_URL"].rstrip("/")8AIRFLOW_USER = os.environ["AIRFLOW_USER"]9AIRFLOW_PASS = os.environ["AIRFLOW_PASS"]10 11def _airflow_get(path: str) -> dict:12    r = requests.get(f"{AIRFLOW_URL}{path}", auth=(AIRFLOW_USER, AIRFLOW_PASS), timeout=30)13    r.raise_for_status()14    return r.json()15 16def _airflow_post(path: str, json_body: Optional[dict] = None) -> dict:17    r = requests.post(f"{AIRFLOW_URL}{path}", auth=(AIRFLOW_USER, AIRFLOW_PASS), json=json_body or {}, timeout=30)18    r.raise_for_status()19    return r.json() if r.content else {}20 21def auto_remediate_freshness_breach(dag_id: str, task_id: str, logical_date: str, table: str) -> dict:22    dag_run = _airflow_get(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}")23    tis = _airflow_get(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}/taskInstances")24    upstream_failed = any(ti["state"] == "failed" for ti in tis["task_instances"])25    if upstream_failed:26        _airflow_post(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}/taskInstances/{task_id}/clear")27        time.sleep(5)28        _airflow_post(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}/taskInstances/{task_id}/run")29        action = "retriggered_failed_task"30    else:31        action = "no_failed_upstream_task_detected"32    msg = {"text": f"Freshness breach on {table}: {action} for {dag_id}.{task_id} ({logical_date})"}33    requests.post(SLACK_WEBHOOK, json=msg, timeout=15)34    return {"table": table, "action": action, "dag_run_state": dag_run.get("state")}

OpenLineage Integration for Cross-System Tracing

OpenLineage gives you a vendor-neutral event vocabulary for runs, jobs, datasets, and facets so lineage is not trapped inside a single scheduler or engine. That matters when Airflow coordinates ingestion, Spark transforms land curated tables, dbt builds semantic models, and Flink powers near-real-time features. Emitting OpenLineage events at task boundaries creates a coherent graph across those hops: dataset inputs and outputs link to job definitions, run state transitions capture success and failure, and facets carry SQL text, schema snapshots, and cluster identifiers without bespoke integrations per tool.

python
1import json2from datetime import datetime, timezone3from uuid import uuid44 5def emit_openlineage_complete_run_event() -> dict:6    run_id = str(uuid4())7    event_time = datetime.now(timezone.utc).isoformat()8    event = {9        "eventType": "COMPLETE",10        "eventTime": event_time,11        "run": {"runId": run_id, "facets": {}},12        "job": {13            "namespace": "analytics",14            "name": "dbt.build.fct_orders",15            "facets": {16                "sql": {"query": "select * from ref('stg_orders')"}17            },18        },19        "inputs": [{"namespace": "warehouse", "name": "analytics.stg.stg_orders"}],20        "outputs": [{"namespace": "warehouse", "name": "analytics.marts.fct_orders"}],21        "producer": "https://example.com",22        "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json",23    }24    return event25 26print(json.dumps(emit_openlineage_complete_run_event(), indent=2))

Distribution Monitoring Deep Dive

Distribution monitoring catches subtle shifts that pass all other checks. For numeric columns, a practical approach is to compare the latest window of values against a trusted baseline using a two-sample Kolmogorov-Smirnov test, which is nonparametric and sensitive to changes in shape and location. As a second defense with binned histograms, symmetric Kullback-Leibler divergence between normalized probabilities highlights mass moving between buckets. Always exclude known one-off campaigns from baselines and log sample sizes because tiny samples produce noisy p-values.

python
1import numpy as np2from scipy.stats import ks_2samp3 4def symmetric_kl_divergence(p: np.ndarray, q: np.ndarray, eps: float = 1e-12) -> float:5    p = np.clip(p, eps, 1.0)6    q = np.clip(q, eps, 1.0)7    p = p / p.sum()8    q = q / q.sum()9    return float((np.sum(p * np.log(p / q)) + np.sum(q * np.log(q / p))) / 2.0)10 11def distribution_monitor(current: np.ndarray, baseline: np.ndarray, ks_alpha: float = 0.01) -> dict:12    ks = ks_2samp(baseline, current, alternative="two-sided", method="auto")13    bins = np.histogram_bin_edges(np.concatenate([baseline, current]), bins="auto")14    bc, _ = np.histogram(baseline, bins=bins, density=True)15    cc, _ = np.histogram(current, bins=bins, density=True)16    kl = symmetric_kl_divergence(bc.astype(float), cc.astype(float))17    return {18        "ks_statistic": float(ks.statistic),19        "ks_pvalue": float(ks.pvalue),20        "ks_alert": bool(ks.pvalue < ks_alpha),21        "symmetric_kl": kl,22    }

Dagster Asset Checks for Observable Pipelines

Dagster pushes observability closer to the data product by attaching checks to software-defined assets rather than bolting validation onto opaque operators. Asset checks run in the same execution context as materializations, which makes failures attributable to a specific asset version. Typical checks encode freshness expectations relative to an upstream cursor, row-count floors compared to a trailing median, and schema constraints that fail fast before expensive downstream assets start. Treat checks like unit tests for data: keep them fast, deterministic, and narrowly scoped.

python
1from datetime import datetime, timedelta, timezone2from dagster import asset, asset_check, AssetCheckResult, AssetCheckSeverity, Definitions3 4@asset(key_prefix=["warehouse"])5def fct_orders():6    return "fct_orders"7 8@asset_check(asset=fct_orders)9def fct_orders_freshness(fct_orders):10    latest_ts = datetime(2026, 4, 15, 10, 0, tzinfo=timezone.utc)11    max_lag = timedelta(hours=6)12    passed = datetime.now(timezone.utc) - latest_ts <= max_lag13    return AssetCheckResult(14        passed=passed,15        severity=AssetCheckSeverity.ERROR,16        metadata={"latest_row_timestamp": float(latest_ts.timestamp()), "max_lag_hours": max_lag.total_seconds() / 3600.0},17    )18 19@asset_check(asset=fct_orders)20def fct_orders_volume(fct_orders):21    row_count = 1_250_00022    minimum_rows = 1_000_00023    passed = row_count >= minimum_rows24    return AssetCheckResult(25        passed=passed,26        severity=AssetCheckSeverity.WARN,27        metadata={"row_count": row_count, "minimum_rows": minimum_rows},28    )29 30defs = Definitions(assets=[fct_orders], asset_checks=[fct_orders_freshness, fct_orders_volume])

Cost of Downtime: Quantifying Data Quality Issues

Quantifying the cost of downtime helps observability investments compete for roadmap time against feature work. A lightweight model combines analyst rework hours, downstream SLA penalties, incremental cloud spend from failed retries, and a trust erosion factor for repeated misses on executive-facing metrics. Analyst hours are straightforward: incident duration multiplied by people pulled in multiplied by a loaded hourly rate. Once you have rough costs per incident class, prioritize monitoring breadth by ranking tables using consumer count and business criticality rather than engineering familiarity.

yaml
1formula:2  priority_score: "consumer_count * business_criticality_weight"3table_prioritization_examples:4  - table: finance.gl_postings5    consumer_count: 386    business_criticality_weight: 1.07    priority_score: 38.08    rationale: "Regulatory reporting and revenue recognition"9  - table: marketing.email_events10    consumer_count: 12011    business_criticality_weight: 0.3512    priority_score: 42.013    rationale: "High fan-out, lower direct revenue impact"14  - table: product.experiment_assignments15    consumer_count: 2216    business_criticality_weight: 0.917    priority_score: 19.818    rationale: "Fewer consumers, but irreversible decisions"

Build vs. Buy: for teams with fewer than five engineers monitoring under 50 tables, the approach in this article covers 80 percent of cases. For larger teams, consider Monte Carlo (end-to-end with ML anomaly detection), Elementary Cloud (dbt-native), Soda (check-as-code across tools), or Datadog (integrated with infrastructure monitoring). Start homegrown for your most critical tables. If you spend more than 20 percent of your time maintaining monitoring instead of improving quality, evaluate a vendor.

Tags

Qualitydata observabilitydata qualitypipeline monitoringdata lineagefreshness SLOanomaly detection

Related articles

Found this useful? Share it with your team.