Data Pipelines with Kedro-Style DAGs and Dagster
Building reproducible data pipelines using Kedro-inspired catalog patterns, orchestrated with Dagster and Prefect.
The Pipeline Problem
ML projects accumulate data processing steps -- scraping, cleaning, feature engineering, training, evaluation. Without structure, these become a tangled mess of scripts with implicit dependencies and no reproducibility.
Agentic Assistants borrows Kedro's pipeline and catalog abstractions, then adds Dagster and Prefect as orchestration backends for scheduling, monitoring, and recovery.
The Data Catalog
The catalog (src/agentic_assistants/data/catalog.py) is a registry of typed datasets organized into layers:
# conf/base/catalog.yaml
raw_market_data:
type: pandas.CSVDataSet
filepath: data/01_raw/market_data.csv
layer: raw
processed_features:
type: pandas.ParquetDataSet
filepath: data/03_primary/features.parquet
layer: primary
trained_model:
type: pickle.PickleDataSet
filepath: data/06_models/model.pkl
layer: model_output
Each dataset has lineage tracking built in -- the catalog records which pipeline node produced it, when, and from which inputs. This makes debugging data issues straightforward: trace any artifact back to its source.
Pipeline Nodes and DAGs
Pipelines are built from pure functions (nodes) connected by their inputs and outputs:
from agentic_assistants.pipelines import Pipeline, node
pipeline = Pipeline([
node(func=load_data, inputs="raw_market_data", outputs="clean_data"),
node(func=engineer_features, inputs="clean_data", outputs="features"),
node(func=train_model, inputs=["features", "parameters"], outputs="model"),
node(func=evaluate, inputs=["model", "features"], outputs="metrics"),
])
The framework resolves the DAG automatically from input/output names. Nodes with no dependencies between them can run in parallel.
Pipeline Runners
The framework provides multiple execution backends:
| Runner | Use Case |
|---|---|
SequentialRunner | Local development, debugging |
ThreadRunner | CPU-bound parallelism on a single machine |
ParallelRunner | Process-based parallelism for heavy workloads |
DagsterRunner | Production orchestration with Dagster |
KubernetesRunner | Distributed execution across a cluster |
Switching runners is a single parameter change -- the pipeline definition stays the same.
Dagster Integration
The Dagster adapter (orchestration/dagster_code/) translates pipeline definitions into Dagster assets, jobs, and schedules. Each catalog dataset becomes a Dagster asset, each pipeline becomes a job, and you get Dagster's full observability UI for free.
@asset
def processed_features(raw_market_data):
return engineer_features(raw_market_data)
@asset
def trained_model(processed_features, parameters):
return train(processed_features, parameters)
Dagster handles scheduling, retries, alerting, and provides a web UI for monitoring pipeline runs.
Pipeline Templates
The framework ships with pre-built pipeline templates under pipelines/templates/:
- Dataset pipeline -- Load, validate, transform, and store tabular data
- Document pipeline -- Ingest documents, chunk, embed, and store in vector DB
- Web scraping pipeline -- Crawl URLs, extract content, clean, and catalog
- RSS pipeline -- Monitor feeds, deduplicate, process new entries
These templates are starting points -- fork and customize for your specific needs.