
Your dashboard is showing yesterday's numbers. An analyst pings you on Slack. You check Airflow — all DAGs are green. You check Snowflake — the tables exist. But somewhere between the source system and the dashboard, data stopped flowing, and nobody was alerted. This is the observability gap that most data teams live with until it causes a serious incident.
Data pipeline observability is the practice of instrumenting your pipelines so that you detect issues within minutes, understand their impact immediately, and trace them to root cause without guesswork. This article covers the five pillars of data observability, practical implementations for each, and the build-versus-buy decision.
The Five Pillars of Data Observability
Every data observability system monitors five dimensions: freshness (is data arriving on time?), volume (is the expected amount of data present?), schema (has the structure changed unexpectedly?), distribution (do values look statistically normal?), and lineage (which upstream change caused this downstream issue?). Skip any one and you have a blind spot.
Pillar 1: Freshness Monitoring
Freshness measures the time between now and the most recent record in a table. If your orders table should update every hour but the latest record is from 6 hours ago, something is broken.
1-- Freshness monitor: alert when a table falls behind its SLA2WITH freshness AS (3 SELECT4 'analytics.fct_orders' AS table_name,5 MAX(updated_at) AS latest_record,6 CURRENT_TIMESTAMP() AS checked_at,7 DATEDIFF('minute', MAX(updated_at), CURRENT_TIMESTAMP()) AS minutes_stale,8 120 AS sla_minutes9 FROM analytics.fct_orders10 UNION ALL11 SELECT12 'analytics.fct_payments',13 MAX(created_at),14 CURRENT_TIMESTAMP(),15 DATEDIFF('minute', MAX(created_at), CURRENT_TIMESTAMP()),16 6017 FROM analytics.fct_payments18)19SELECT20 table_name, latest_record, minutes_stale, sla_minutes,21 CASE22 WHEN minutes_stale > sla_minutes THEN 'BREACH'23 WHEN minutes_stale > sla_minutes * 0.8 THEN 'WARNING'24 ELSE 'OK'25 END AS status26FROM freshness27ORDER BY minutes_stale DESC;Store the results of each freshness check in a monitoring table. Over time, this gives you historical freshness trends — invaluable for SLA reporting and for identifying tables that are gradually degrading before they breach.
Pillar 2: Volume Anomaly Detection
Volume monitoring checks whether the number of rows loaded in each pipeline run falls within expected bounds. A table that normally receives 500,000 rows per day suddenly receiving 50,000 is a strong signal that something went wrong upstream.
1import pandas as pd2import numpy as np3from datetime import datetime, timedelta4 5def detect_volume_anomaly(6 table_name: str,7 current_count: int,8 history_df: pd.DataFrame,9 lookback_days: int = 30,10 z_threshold: float = 2.5,11) -> dict:12 cutoff = datetime.now() - timedelta(days=lookback_days)13 recent = history_df[history_df["run_date"] >= cutoff]["row_count"]14 15 if len(recent) < 7:16 return {"table": table_name, "status": "INSUFFICIENT_DATA"}17 18 mean = recent.mean()19 std = recent.std()20 z_score = 0.0 if std == 0 else (current_count - mean) / std21 22 if abs(z_score) > z_threshold:23 status = "ANOMALY"24 elif abs(z_score) > z_threshold * 0.7:25 status = "WARNING"26 else:27 status = "OK"28 29 return {30 "table": table_name,31 "status": status,32 "current_count": current_count,33 "mean": int(mean),34 "z_score": round(z_score, 2),35 "lower_bound": max(0, int(mean - z_threshold * std)),36 "upper_bound": int(mean + z_threshold * std),37 }The z-score approach catches both drops and spikes. A sudden 10x volume increase is just as suspicious as a drop — it could mean duplicate ingestion or a source system replaying historical data.
Pillar 3: Schema Change Detection with Elementary
Schema changes are the most common cause of silent pipeline failures. The best approach for dbt projects is the Elementary package, which integrates schema monitoring directly into your dbt workflow.
1# packages.yml — add Elementary to your dbt project2packages:3 - package: elementary-data/elementary4 version: "0.16.0"5 6# models/staging/schema.yml — add Elementary monitors7version: 28models:9 - name: stg_orders10 tests:11 - elementary.schema_changes:12 severity: warn13 - elementary.volume_anomalies:14 timestamp_column: updated_at15 severity: warn16 - elementary.freshness_anomalies:17 timestamp_column: updated_at18 severity: error19 columns:20 - name: order_id21 tests:22 - not_null23 - unique24 - name: order_total25 tests:26 - elementary.column_anomalies:27 column_anomalies:28 - zero_count29 - null_count30 - averageRun dbt test on every pipeline execution. When Elementary detects a schema change — a new column, a dropped column, a type change — it logs the change and can fail the pipeline if configured with severity: error.
Pillar 4: Distribution Drift
Distribution monitoring catches issues that pass all other checks. The table has fresh data, the right number of rows, and the correct schema — but values are wrong. Revenue is 10x higher because a source system switched from cents to dollars. Statistical tests detect these drifts. For numeric columns, compare distributions against a historical baseline. Elementary handles this through column-level anomaly tests.
Pillar 5: Lineage-Powered Root Cause Analysis
When an alert fires, the first question is: what changed upstream? Without lineage, answering this requires manual investigation. With lineage, the answer is a graph traversal. dbt provides compile-time lineage through ref() and source(). Dagster extends this with runtime lineage. Tools like DataHub and OpenLineage capture cross-system lineage.
Slack Alerting Integration
1import requests2 3SLACK_WEBHOOK = "https://hooks.slack.com/services/T00/B00/xxxx"4 5def send_observability_alert(table_name, check_type, status, details, dashboard_url=None):6 emoji = {"BREACH": "\U0001f7e5", "ANOMALY": "\U0001f7e5", "WARNING": "\U0001f7e8", "OK": "\u2705"}7 detail_lines = "\n".join(f" *{k}*: {v}" for k, v in details.items())8 message = {9 "blocks": [10 {"type": "header", "text": {"type": "plain_text", "text": f"{emoji.get(status, '?')} Data Observability Alert"}},11 {"type": "section", "text": {"type": "mrkdwn", "text": f"*Table:* `{table_name}`\n*Check:* {check_type}\n*Status:* {status}\n\n{detail_lines}"}},12 ]13 }14 if dashboard_url:15 message["blocks"].append({"type": "actions", "elements": [{"type": "button", "text": {"type": "plain_text", "text": "View Dashboard"}, "url": dashboard_url}]})16 requests.post(SLACK_WEBHOOK, json=message, timeout=10)SLO/SLI Framework for Data Teams
Observability without SLOs is just noise. Borrow the SLO/SLI framework from site reliability engineering. An SLI is a measurement (freshness in minutes, volume deviation %). An SLO is a target (fct_orders will be no more than 120 minutes stale 99.5% of the time). An error budget is the allowed failure.
1# data_slos.yml — SLO definitions for critical data products2data_products:3 - name: fct_orders4 owner: analytics-engineering5 tier: 16 slos:7 freshness:8 sli: minutes_since_last_update9 target: 12010 compliance: 99.5%11 window: 30d12 volume:13 sli: row_count_z_score14 target_range: [-2.5, 2.5]15 compliance: 99.0%16 window: 30d17 - name: dim_customers18 owner: analytics-engineering19 tier: 220 slos:21 freshness:22 sli: minutes_since_last_update23 target: 36024 compliance: 99.0%25 window: 30dAutomated Incident Response Playbooks
Observability signals become truly valuable when they trigger repeatable actions instead of ad hoc firefighting. When a freshness SLO breaches, the first questions are whether upstream extraction finished, whether the orchestrator task failed silently, and whether downstream consumers should be paused. Encoding those steps in code reduces mean time to recovery and preserves audit context for postmortems. A practical pattern is to classify the breach, query orchestration metadata, attempt a bounded auto-remediation such as retriggering a single failed task, and emit a structured Slack message with links to logs and run IDs. Guardrails matter: cap retries, require idempotent tasks, and escalate to humans when upstream systems are unhealthy.
1import os2import time3import requests4from typing import Optional5 6SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK_URL"]7AIRFLOW_URL = os.environ["AIRFLOW_URL"].rstrip("/")8AIRFLOW_USER = os.environ["AIRFLOW_USER"]9AIRFLOW_PASS = os.environ["AIRFLOW_PASS"]10 11def _airflow_get(path: str) -> dict:12 r = requests.get(f"{AIRFLOW_URL}{path}", auth=(AIRFLOW_USER, AIRFLOW_PASS), timeout=30)13 r.raise_for_status()14 return r.json()15 16def _airflow_post(path: str, json_body: Optional[dict] = None) -> dict:17 r = requests.post(f"{AIRFLOW_URL}{path}", auth=(AIRFLOW_USER, AIRFLOW_PASS), json=json_body or {}, timeout=30)18 r.raise_for_status()19 return r.json() if r.content else {}20 21def auto_remediate_freshness_breach(dag_id: str, task_id: str, logical_date: str, table: str) -> dict:22 dag_run = _airflow_get(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}")23 tis = _airflow_get(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}/taskInstances")24 upstream_failed = any(ti["state"] == "failed" for ti in tis["task_instances"])25 if upstream_failed:26 _airflow_post(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}/taskInstances/{task_id}/clear")27 time.sleep(5)28 _airflow_post(f"/api/v1/dags/{dag_id}/dagRuns/{logical_date}/taskInstances/{task_id}/run")29 action = "retriggered_failed_task"30 else:31 action = "no_failed_upstream_task_detected"32 msg = {"text": f"Freshness breach on {table}: {action} for {dag_id}.{task_id} ({logical_date})"}33 requests.post(SLACK_WEBHOOK, json=msg, timeout=15)34 return {"table": table, "action": action, "dag_run_state": dag_run.get("state")}OpenLineage Integration for Cross-System Tracing
OpenLineage gives you a vendor-neutral event vocabulary for runs, jobs, datasets, and facets so lineage is not trapped inside a single scheduler or engine. That matters when Airflow coordinates ingestion, Spark transforms land curated tables, dbt builds semantic models, and Flink powers near-real-time features. Emitting OpenLineage events at task boundaries creates a coherent graph across those hops: dataset inputs and outputs link to job definitions, run state transitions capture success and failure, and facets carry SQL text, schema snapshots, and cluster identifiers without bespoke integrations per tool.
1import json2from datetime import datetime, timezone3from uuid import uuid44 5def emit_openlineage_complete_run_event() -> dict:6 run_id = str(uuid4())7 event_time = datetime.now(timezone.utc).isoformat()8 event = {9 "eventType": "COMPLETE",10 "eventTime": event_time,11 "run": {"runId": run_id, "facets": {}},12 "job": {13 "namespace": "analytics",14 "name": "dbt.build.fct_orders",15 "facets": {16 "sql": {"query": "select * from ref('stg_orders')"}17 },18 },19 "inputs": [{"namespace": "warehouse", "name": "analytics.stg.stg_orders"}],20 "outputs": [{"namespace": "warehouse", "name": "analytics.marts.fct_orders"}],21 "producer": "https://example.com",22 "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json",23 }24 return event25 26print(json.dumps(emit_openlineage_complete_run_event(), indent=2))Distribution Monitoring Deep Dive
Distribution monitoring catches subtle shifts that pass all other checks. For numeric columns, a practical approach is to compare the latest window of values against a trusted baseline using a two-sample Kolmogorov-Smirnov test, which is nonparametric and sensitive to changes in shape and location. As a second defense with binned histograms, symmetric Kullback-Leibler divergence between normalized probabilities highlights mass moving between buckets. Always exclude known one-off campaigns from baselines and log sample sizes because tiny samples produce noisy p-values.
1import numpy as np2from scipy.stats import ks_2samp3 4def symmetric_kl_divergence(p: np.ndarray, q: np.ndarray, eps: float = 1e-12) -> float:5 p = np.clip(p, eps, 1.0)6 q = np.clip(q, eps, 1.0)7 p = p / p.sum()8 q = q / q.sum()9 return float((np.sum(p * np.log(p / q)) + np.sum(q * np.log(q / p))) / 2.0)10 11def distribution_monitor(current: np.ndarray, baseline: np.ndarray, ks_alpha: float = 0.01) -> dict:12 ks = ks_2samp(baseline, current, alternative="two-sided", method="auto")13 bins = np.histogram_bin_edges(np.concatenate([baseline, current]), bins="auto")14 bc, _ = np.histogram(baseline, bins=bins, density=True)15 cc, _ = np.histogram(current, bins=bins, density=True)16 kl = symmetric_kl_divergence(bc.astype(float), cc.astype(float))17 return {18 "ks_statistic": float(ks.statistic),19 "ks_pvalue": float(ks.pvalue),20 "ks_alert": bool(ks.pvalue < ks_alpha),21 "symmetric_kl": kl,22 }Dagster Asset Checks for Observable Pipelines
Dagster pushes observability closer to the data product by attaching checks to software-defined assets rather than bolting validation onto opaque operators. Asset checks run in the same execution context as materializations, which makes failures attributable to a specific asset version. Typical checks encode freshness expectations relative to an upstream cursor, row-count floors compared to a trailing median, and schema constraints that fail fast before expensive downstream assets start. Treat checks like unit tests for data: keep them fast, deterministic, and narrowly scoped.
1from datetime import datetime, timedelta, timezone2from dagster import asset, asset_check, AssetCheckResult, AssetCheckSeverity, Definitions3 4@asset(key_prefix=["warehouse"])5def fct_orders():6 return "fct_orders"7 8@asset_check(asset=fct_orders)9def fct_orders_freshness(fct_orders):10 latest_ts = datetime(2026, 4, 15, 10, 0, tzinfo=timezone.utc)11 max_lag = timedelta(hours=6)12 passed = datetime.now(timezone.utc) - latest_ts <= max_lag13 return AssetCheckResult(14 passed=passed,15 severity=AssetCheckSeverity.ERROR,16 metadata={"latest_row_timestamp": float(latest_ts.timestamp()), "max_lag_hours": max_lag.total_seconds() / 3600.0},17 )18 19@asset_check(asset=fct_orders)20def fct_orders_volume(fct_orders):21 row_count = 1_250_00022 minimum_rows = 1_000_00023 passed = row_count >= minimum_rows24 return AssetCheckResult(25 passed=passed,26 severity=AssetCheckSeverity.WARN,27 metadata={"row_count": row_count, "minimum_rows": minimum_rows},28 )29 30defs = Definitions(assets=[fct_orders], asset_checks=[fct_orders_freshness, fct_orders_volume])Cost of Downtime: Quantifying Data Quality Issues
Quantifying the cost of downtime helps observability investments compete for roadmap time against feature work. A lightweight model combines analyst rework hours, downstream SLA penalties, incremental cloud spend from failed retries, and a trust erosion factor for repeated misses on executive-facing metrics. Analyst hours are straightforward: incident duration multiplied by people pulled in multiplied by a loaded hourly rate. Once you have rough costs per incident class, prioritize monitoring breadth by ranking tables using consumer count and business criticality rather than engineering familiarity.
1formula:2 priority_score: "consumer_count * business_criticality_weight"3table_prioritization_examples:4 - table: finance.gl_postings5 consumer_count: 386 business_criticality_weight: 1.07 priority_score: 38.08 rationale: "Regulatory reporting and revenue recognition"9 - table: marketing.email_events10 consumer_count: 12011 business_criticality_weight: 0.3512 priority_score: 42.013 rationale: "High fan-out, lower direct revenue impact"14 - table: product.experiment_assignments15 consumer_count: 2216 business_criticality_weight: 0.917 priority_score: 19.818 rationale: "Fewer consumers, but irreversible decisions"Build vs. Buy: for teams with fewer than five engineers monitoring under 50 tables, the approach in this article covers 80 percent of cases. For larger teams, consider Monte Carlo (end-to-end with ML anomaly detection), Elementary Cloud (dbt-native), Soda (check-as-code across tools), or Datadog (integrated with infrastructure monitoring). Start homegrown for your most critical tables. If you spend more than 20 percent of your time maintaining monitoring instead of improving quality, evaluate a vendor.




