Tutorial: ETL Pipeline
This tutorial builds a data extraction, cleaning, summarization, plotting, and reporting pipeline using the Palmer Penguins dataset. It demonstrates step dependencies, parallel execution, output markers, and lifecycle hooks.
Examples below use the daggleR companion package for in-step helpers; install it with pak::pkg_install("cynkra/daggleR").
The pipeline
Create pipeline.yaml in your project directory:
name: penguin-report
env:
DATA_DIR: data
OUTPUT_DIR: output
steps:
- id: setup
r_expr: |
dir.create(Sys.getenv("DATA_DIR"), showWarnings = FALSE)
dir.create(Sys.getenv("OUTPUT_DIR"), showWarnings = FALSE)
if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
}
cat("Setup complete\n")
- id: extract
r_expr: |
library(palmerpenguins)
raw <- penguins
cat(sprintf("Loaded %d rows, %d columns\n", nrow(raw), ncol(raw)))
saveRDS(raw, file.path(Sys.getenv("DATA_DIR"), "raw_penguins.rds"))
daggleR::daggle_output("n_raw", nrow(raw))
depends: [setup]
- id: clean
r_expr: |
raw <- readRDS(file.path(Sys.getenv("DATA_DIR"), "raw_penguins.rds"))
clean <- raw[complete.cases(raw[, c("bill_length_mm", "bill_depth_mm",
"flipper_length_mm", "body_mass_g")]), ]
cat(sprintf("Removed %d incomplete rows (%d remaining)\n",
nrow(raw) - nrow(clean), nrow(clean)))
saveRDS(clean, file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
daggleR::daggle_output("n_clean", nrow(clean))
daggleR::daggle_output("n_dropped", nrow(raw) - nrow(clean))
depends: [extract]
- id: summarize
r_expr: |
penguins <- readRDS(file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
species_list <- split(penguins, penguins$species)
summary_rows <- lapply(names(species_list), function(sp) {
d <- species_list[[sp]]
data.frame(
species = sp,
n = nrow(d),
mean_bill_length = round(mean(d$bill_length_mm), 1),
mean_bill_depth = round(mean(d$bill_depth_mm), 1),
mean_flipper_length = round(mean(d$flipper_length_mm), 1),
mean_body_mass = round(mean(d$body_mass_g), 0)
)
})
summary_df <- do.call(rbind, summary_rows)
print(summary_df, row.names = FALSE)
write.csv(summary_df, file.path(Sys.getenv("OUTPUT_DIR"), "species_summary.csv"),
row.names = FALSE)
daggleR::daggle_output("n_species", nrow(summary_df))
depends: [clean]
- id: plot
r_expr: |
penguins <- readRDS(file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
output_dir <- Sys.getenv("OUTPUT_DIR")
png(file.path(output_dir, "bill_scatter.png"), width = 800, height = 600)
colors <- c(Adelie = "#FF6B35", Chinstrap = "#004E89", Gentoo = "#2BA84A")
plot(penguins$bill_length_mm, penguins$bill_depth_mm,
col = colors[as.character(penguins$species)],
pch = 19, cex = 1.2,
xlab = "Bill length (mm)", ylab = "Bill depth (mm)",
main = "Palmer Penguins: Bill Dimensions by Species")
legend("topright", legend = names(colors), col = colors, pch = 19)
dev.off()
png(file.path(output_dir, "flipper_boxplot.png"), width = 800, height = 600)
boxplot(flipper_length_mm ~ species, data = penguins,
col = colors[levels(penguins$species)],
xlab = "Species", ylab = "Flipper length (mm)",
main = "Palmer Penguins: Flipper Length Distribution")
dev.off()
cat(sprintf("Saved 2 plots to %s\n", output_dir))
depends: [clean]
- id: report
command: quarto render report.qmd -P data_dir:data -P output_dir:output
depends: [summarize, plot]
on_success:
command: echo "Pipeline complete -- report at output/report.html"
on_failure:
command: echo "Pipeline failed!"How the DAG executes
The dependency graph looks like this:
setup -> extract -> clean -> summarize -> report
-> plot -------->
daggle resolves dependencies into execution tiers. Validate to see the plan:
daggle validate penguin-reportThe key insight: summarize and plot both depend only on clean, so they run in parallel. The report step waits for both to finish before rendering.
Output markers
Steps communicate values downstream by emitting the ::daggle-output:: protocol on stdout. In R, daggleR::daggle_output() wraps the protocol with name validation:
daggleR::daggle_output("n_raw", nrow(raw))daggle strips these lines from terminal output, parses them, and injects them as environment variables into downstream steps. The clean step reads the value with daggle_get_output():
n_raw <- as.integer(daggleR::daggle_get_output("extract", "n_raw"))Under the hood this reads DAGGLE_OUTPUT_EXTRACT_N_RAW from the environment — the helper handles the uppercasing and hyphen-to-underscore conversion. See Output Marker Protocol for the wire format.
Environment variables
The env: block at the top of the YAML defines variables available to every step. This avoids hardcoding paths:
env:
DATA_DIR: data
OUTPUT_DIR: outputSteps read them with Sys.getenv(). DAG-level env: values may reference shell environment variables with the ${env:VARNAME} secret-reference syntax:
env:
DATA_DIR: "${env:DATA_DIR}"
OUTPUT_DIR: "${env:OUTPUT_DIR}"Then export them in the shell before invoking the pipeline:
DATA_DIR=/tmp/data OUTPUT_DIR=/tmp/output daggle run penguin-reportFor values that should appear in meta.json and be changeable per run without editing YAML, use params: instead and pass -p key=value on the command line.
Lifecycle hooks
The on_success and on_failure hooks at the DAG level run after all steps complete (or after any step fails). They receive all accumulated outputs and run metadata as environment variables.
Common uses: send a Slack message, write to a log file, clean up temporary files.
Running the pipeline
daggle run penguin-reportCheck results:
daggle status penguin-report