Back to Orchestration

Migrating from Airflow to Dagster: A Practical Guide

A step-by-step migration path from Apache Airflow's task-centric model to Dagster's asset-based approach, covering code translation, testing patterns, and a realistic timeline.

Chris P

Chris P

Orchestration11 min read
Pipeline orchestration DAG visualization with connected nodes

Apache Airflow has been the default orchestration tool for data teams since 2015. It works. But after running Airflow in production for a few years, most teams hit the same walls: testing is painful, local development requires Docker Compose gymnastics, and the task-centric model makes it hard to answer the question every stakeholder asks — when was this dataset last updated? Dagster addresses all three problems by centering its programming model on assets instead of tasks.

This guide is for teams that have decided to migrate. It covers the conceptual shift, side-by-side code comparisons, testing patterns, IO managers, and a realistic incremental migration strategy. If you are still evaluating whether to migrate, the short version is: if you have fewer than 10 DAGs and they work fine, stay on Airflow. If you have 30 or more DAGs, growing complexity, and engineers who dread writing integration tests, the migration pays for itself within a quarter.

The Fundamental Shift: Tasks to Assets

In Airflow, you think in tasks. A DAG is a sequence of operations: extract this, transform that, load the result. The tasks produce side effects — files on disk, rows in a database — but Airflow does not track those outputs. It only knows whether a task succeeded or failed.

In Dagster, you think in assets. An asset is a named dataset that your pipeline produces. Dagster tracks the asset itself: when it was last materialized, what upstream assets it depends on, and whether it passed its quality checks. The execution graph is inferred from asset dependencies. You never wire tasks together manually.

This distinction matters because it changes the questions you can answer. With Airflow, you can ask: did the extract task run at 6 AM? With Dagster, you can ask: is the customers dataset fresh? The second question is what your stakeholders actually care about.

Side-by-Side: Airflow DAG vs Dagster Assets

Here is a typical Airflow DAG that extracts orders from a source database, cleans them, and loads the result into a warehouse.

python
1# airflow_dags/orders_pipeline.py2from airflow import DAG3from airflow.operators.python import PythonOperator4from datetime import datetime5import pandas as pd6 7def extract_orders(**context):8    ds = context["ds"]9    df = pd.read_sql(10        f"SELECT * FROM orders WHERE order_date = '{ds}'",11        con=source_engine12    )13    df.to_parquet("/tmp/raw_orders.parquet")14 15def transform_orders(**context):16    df = pd.read_parquet("/tmp/raw_orders.parquet")17    cleaned = df.dropna(subset=["order_id", "total"])18    cleaned["total_usd"] = cleaned.apply(19        lambda r: convert_currency(r["total"], r["currency"]),20        axis=121    )22    cleaned.to_parquet("/tmp/clean_orders.parquet")23 24def load_orders(**context):25    df = pd.read_parquet("/tmp/clean_orders.parquet")26    df.to_sql("fct_orders", con=warehouse_engine,27              if_exists="append", index=False)28 29with DAG(30    "orders_pipeline",31    schedule="0 6 * * *",32    start_date=datetime(2026, 1, 1),33    catchup=False,34) as dag:35    t1 = PythonOperator(task_id="extract",36                        python_callable=extract_orders)37    t2 = PythonOperator(task_id="transform",38                        python_callable=transform_orders)39    t3 = PythonOperator(task_id="load",40                        python_callable=load_orders)41    t1 >> t2 >> t3

Several things to notice. The tasks communicate through temp files on disk. There is no type safety on the intermediate data. You cannot test transform_orders without first running extract_orders to produce the temp file. And Airflow has no idea what dataset this DAG produces — it just knows three tasks ran in sequence.

Here is the equivalent in Dagster using software-defined assets.

python
1# dagster_project/assets/orders.py2import dagster as dg3import pandas as pd4 5@dg.asset(6    description="Raw orders extracted from the source database",7    group_name="orders",8    kinds={"python", "pandas"},9    metadata={"source_system": "orders_db"},10)11def raw_orders(context: dg.AssetExecutionContext) -> pd.DataFrame:12    partition_date = context.partition_key13    df = pd.read_sql(14        f"SELECT * FROM orders WHERE order_date = '{partition_date}'",15        con=source_engine16    )17    context.log.info(f"Extracted {len(df)} orders for {partition_date}")18    return df19 20@dg.asset(21    description="Cleaned orders with currency converted to USD",22    group_name="orders",23    kinds={"python", "pandas"},24)25def clean_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:26    cleaned = raw_orders.dropna(subset=["order_id", "total"])27    cleaned["total_usd"] = cleaned.apply(28        lambda r: convert_currency(r["total"], r["currency"]),29        axis=130    )31    return cleaned32 33@dg.asset(34    description="Final orders fact table loaded to warehouse",35    group_name="orders",36    kinds={"snowflake"},37)38def fct_orders(clean_orders: pd.DataFrame) -> pd.DataFrame:39    return clean_orders

The Dagster version is shorter and more explicit. Each function declares what it produces (the function name is the asset name) and what it depends on (the function arguments). Dagster infers the dependency graph: fct_orders depends on clean_orders, which depends on raw_orders. No manual wiring required.

The intermediate data is passed directly between functions as DataFrames. No temp files. No implicit coupling through file paths. And Dagster tracks every materialization: you can see in the UI that raw_orders was last materialized at 6:02 AM, clean_orders at 6:03 AM, and fct_orders at 6:04 AM.

Dependency Inference vs Explicit Wiring

In Airflow, dependencies are explicit: t1 >> t2 >> t3. This works for simple pipelines but becomes unwieldy at scale. A 50-task DAG requires careful manual wiring, and refactoring the dependency graph means updating both the task code and the wiring code.

Dagster infers dependencies from function signatures. If clean_orders takes raw_orders as a parameter, Dagster knows clean_orders depends on raw_orders. This is not magic — it is just Python function arguments. But it means your dependency graph is always consistent with your code. You cannot accidentally wire task A to task C while the code reads from task B's output.

For cross-asset-group dependencies, use the deps parameter to declare dependencies on assets defined elsewhere.

python
1@dg.asset(2    description="Revenue metrics combining orders and payments",3    group_name="finance",4    deps=["fct_orders", "fct_payments"],5)6def fct_daily_revenue(7    context: dg.AssetExecutionContext,8) -> pd.DataFrame:9    orders = context.resources.warehouse.query(10        "SELECT * FROM fct_orders"11    )12    payments = context.resources.warehouse.query(13        "SELECT * FROM fct_payments"14    )15    return compute_daily_revenue(orders, payments)

Testing: The Killer Feature

Testing is where the migration pays off most dramatically. In Airflow, testing a task function means either mocking the entire Airflow context or spinning up a local Airflow instance with Docker Compose. Most teams give up and rely on manual testing in a dev environment.

In Dagster, assets are regular Python functions. You test them by passing inputs and asserting on outputs. No framework, no infrastructure, no mocking of schedulers or databases.

python
1# tests/test_orders.py2import pandas as pd3import pytest4from dagster_project.assets.orders import clean_orders5 6def test_clean_orders_drops_null_order_id():7    raw = pd.DataFrame({8        "order_id": ["a", None, "c"],9        "total": [10.0, 20.0, 30.0],10        "currency": ["USD", "EUR", "USD"],11    })12    result = clean_orders(raw)13    assert len(result) == 214    assert None not in result["order_id"].values15 16def test_clean_orders_drops_null_total():17    raw = pd.DataFrame({18        "order_id": ["a", "b"],19        "total": [10.0, None],20        "currency": ["USD", "EUR"],21    })22    result = clean_orders(raw)23    assert len(result) == 124    assert result.iloc[0]["order_id"] == "a"25 26def test_clean_orders_converts_currency():27    raw = pd.DataFrame({28        "order_id": ["x"],29        "total": [100.0],30        "currency": ["EUR"],31    })32    result = clean_orders(raw)33    assert "total_usd" in result.columns34    assert result.iloc[0]["total_usd"] > 035 36def test_clean_orders_handles_empty_input():37    raw = pd.DataFrame({38        "order_id": pd.Series([], dtype="str"),39        "total": pd.Series([], dtype="float64"),40        "currency": pd.Series([], dtype="str"),41    })42    result = clean_orders(raw)43    assert len(result) == 0

These are standard pytest tests. They run in milliseconds, they run in CI, and they catch real bugs. One team we worked with found three data corruption bugs within a week of adding unit tests to their migrated assets — bugs that had been silently producing incorrect data for months in Airflow.

IO Managers for Storage Abstraction

In the Airflow DAG, storage is hardcoded: temp files on disk, then a SQL insert to the warehouse. If you want to change the storage layer — say, move from Postgres to Snowflake — you rewrite every task.

Dagster solves this with IO managers. An IO manager handles how assets are stored and loaded. Your asset functions return DataFrames; the IO manager decides where to put them. In development, you might use a local Parquet IO manager. In production, you use a Snowflake IO manager. The asset code does not change.

python
1# dagster_project/definitions.py2from dagster import Definitions3from dagster_snowflake_pandas import SnowflakePandasIOManager4from dagster_project.assets.orders import (5    raw_orders, clean_orders, fct_orders6)7 8snowflake_io = SnowflakePandasIOManager(9    account="myorg-account",10    user="DAGSTER_SERVICE_USER",11    password={"env": "SNOWFLAKE_PASSWORD"},12    database="ANALYTICS",13    warehouse="WH_ETL",14    schema="MARTS",15)16 17defs = Definitions(18    assets=[raw_orders, clean_orders, fct_orders],19    resources={20        "io_manager": snowflake_io,21    },22)

With this configuration, every asset that returns a DataFrame is automatically written to a Snowflake table named after the asset. raw_orders goes to ANALYTICS.MARTS.RAW_ORDERS. clean_orders goes to ANALYTICS.MARTS.CLEAN_ORDERS. No manual SQL inserts. No temp files. And if you want to test locally, swap the IO manager to a Parquet file manager and everything works on your laptop.

Incremental Migration Strategy

The worst migration strategy is a big-bang cutover. You do not want to rewrite 30 DAGs over a weekend and flip a switch on Monday morning. Instead, run Dagster alongside Airflow and migrate one pipeline at a time.

Dagster provides an airflow integration package that lets you wrap existing Airflow DAGs as Dagster jobs. This gives you immediate visibility into your Airflow pipelines within Dagster's UI without changing any code. Use this as phase one — install Dagster, point it at your Airflow DAGs directory, and start monitoring.

Phase two: pick the simplest pipeline — ideally one with three to five tasks, no complex branching, and no custom operators. Rewrite it as Dagster assets. Add unit tests. Deploy it alongside the Airflow version. Run both in parallel for a week, comparing outputs. When you are confident the Dagster version produces identical results, disable the Airflow DAG.

Phase three: work through the remaining pipelines in order of complexity. Simple extract-load pipelines first. Complex multi-source joins and incremental models last. Each migration follows the same pattern: rewrite, test, parallel run, cutover.

A realistic timeline for a team of two engineers migrating a 30-DAG Airflow instance: two weeks for setup and the first pipeline, then two to three pipelines per week for the remaining. Total calendar time is four to six weeks. The effort is front-loaded — once you have patterns and IO managers established, each subsequent migration goes faster.

What Trips Teams Up

Three things consistently cause friction during migration. First, Airflow's XCom pattern has no direct equivalent in Dagster. If your tasks pass small values through XCom, refactor them into asset return values. If they pass large datasets through XCom (which is an Airflow anti-pattern anyway), convert those to proper assets.

Second, custom Airflow operators need to be replaced with Dagster resources or plain Python functions. If you have a SlackOperator that sends notifications, replace it with a Dagster sensor or a post-materialization hook. If you have a custom S3Operator, replace it with the dagster-aws package.

Third, Airflow's execution date semantics are confusing and Dagster's partition keys work differently. In Airflow, the execution_date for a daily DAG running on January 2nd is January 1st (the start of the interval). In Dagster, the partition key for the same run is simply the date you specify. Map your Airflow date logic carefully during migration to avoid off-by-one errors in your data.

Scheduling and Partitioning

Airflow schedules are cron-based: run this DAG at 6 AM daily. Dagster supports cron schedules too, but its real power is in declarative freshness policies and partition-aware scheduling. You can tell Dagster that an asset should never be more than two hours stale, and Dagster will automatically trigger materializations to maintain that guarantee.

Partitioned assets are a natural fit for data pipelines that process daily or hourly batches. Instead of relying on Airflow's execution_date templating, Dagster gives each partition an explicit key that your asset function receives as a parameter.

python
1# dagster_project/assets/partitioned_orders.py2import dagster as dg3import pandas as pd4 5daily_partitions = dg.DailyPartitionsDefinition(6    start_date="2026-01-01"7)8 9@dg.asset(10    partitions_def=daily_partitions,11    description="Daily order extract partitioned by date",12    group_name="orders",13)14def raw_orders_partitioned(15    context: dg.AssetExecutionContext,16) -> pd.DataFrame:17    date = context.partition_key18    context.log.info(f"Extracting orders for {date}")19    df = pd.read_sql(20        f"SELECT * FROM orders WHERE order_date = '{date}'",21        con=source_engine,22    )23    return df24 25@dg.asset(26    partitions_def=daily_partitions,27    description="Cleaned daily orders",28    group_name="orders",29)30def clean_orders_partitioned(31    raw_orders_partitioned: pd.DataFrame,32) -> pd.DataFrame:33    cleaned = raw_orders_partitioned.dropna(34        subset=["order_id", "total"]35    )36    cleaned["total_usd"] = cleaned.apply(37        lambda r: convert_currency(r["total"], r["currency"]),38        axis=1,39    )40    return cleaned

When you backfill a date range, Dagster materializes each partition independently and tracks their status in the UI. You can see at a glance that January 1 through January 15 are materialized, January 16 failed, and January 17 through January 20 are pending. This visibility is something Airflow's grid view provides for task runs, but Dagster ties it directly to the data partitions themselves.

Observability and the Asset Graph

One of the most underappreciated advantages of Dagster is the asset graph UI. Every asset, its dependencies, its materialization history, and its test results are visible in a single interactive graph. When a stakeholder asks why the revenue dashboard is stale, you click on the gold asset and trace backward through the graph to find the failed upstream asset. In Airflow, you would need to check multiple DAG runs across multiple DAGs to find the root cause.

Dagster also integrates with dbt, Airbyte, Fivetran, and other tools in the modern data stack. Each dbt model becomes an asset in the graph, giving you a unified view of your entire data platform — not just the Python pipelines, but the SQL transformations and ingestion jobs too.

The migration is not trivial, but it is predictable. Every team that has completed it reports the same outcome: faster development cycles, dramatically better testing, and an orchestration system that actually tells you whether your data is fresh. The four-to-six-week investment pays for itself in the first quarter through reduced incident response time alone.

Tags

Orchestrationdagsterairfloworchestrationmigrationdata pipelineassets

Related articles

Found this useful? Share it with your team.