Tutorial: ETL Pipeline
This tutorial builds a data extraction, cleaning, summarization, plotting, and reporting pipeline using the Palmer Penguins dataset. It demonstrates step dependencies, parallel execution, output markers, and lifecycle hooks.
The pipeline
Create pipeline.yaml in your project directory:
name: penguin-report
env:
DATA_DIR: data
OUTPUT_DIR: output
steps:
- id: setup
r_expr: |
dir.create(Sys.getenv("DATA_DIR"), showWarnings = FALSE)
dir.create(Sys.getenv("OUTPUT_DIR"), showWarnings = FALSE)
if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
}
cat("Setup complete\n")
- id: extract
r_expr: |
library(palmerpenguins)
raw <- penguins
cat(sprintf("Loaded %d rows, %d columns\n", nrow(raw), ncol(raw)))
saveRDS(raw, file.path(Sys.getenv("DATA_DIR"), "raw_penguins.rds"))
cat("::daggle-output name=n_raw::", nrow(raw), "\n")
depends: [setup]
- id: clean
r_expr: |
raw <- readRDS(file.path(Sys.getenv("DATA_DIR"), "raw_penguins.rds"))
clean <- raw[complete.cases(raw[, c("bill_length_mm", "bill_depth_mm",
"flipper_length_mm", "body_mass_g")]), ]
cat(sprintf("Removed %d incomplete rows (%d remaining)\n",
nrow(raw) - nrow(clean), nrow(clean)))
saveRDS(clean, file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
cat("::daggle-output name=n_clean::", nrow(clean), "\n")
cat("::daggle-output name=n_dropped::", nrow(raw) - nrow(clean), "\n")
depends: [extract]
- id: summarize
r_expr: |
penguins <- readRDS(file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
species_list <- split(penguins, penguins$species)
summary_rows <- lapply(names(species_list), function(sp) {
d <- species_list[[sp]]
data.frame(
species = sp,
n = nrow(d),
mean_bill_length = round(mean(d$bill_length_mm), 1),
mean_bill_depth = round(mean(d$bill_depth_mm), 1),
mean_flipper_length = round(mean(d$flipper_length_mm), 1),
mean_body_mass = round(mean(d$body_mass_g), 0)
)
})
summary_df <- do.call(rbind, summary_rows)
print(summary_df, row.names = FALSE)
write.csv(summary_df, file.path(Sys.getenv("OUTPUT_DIR"), "species_summary.csv"),
row.names = FALSE)
cat("::daggle-output name=n_species::", nrow(summary_df), "\n")
depends: [clean]
- id: plot
r_expr: |
penguins <- readRDS(file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
output_dir <- Sys.getenv("OUTPUT_DIR")
png(file.path(output_dir, "bill_scatter.png"), width = 800, height = 600)
colors <- c(Adelie = "#FF6B35", Chinstrap = "#004E89", Gentoo = "#2BA84A")
plot(penguins$bill_length_mm, penguins$bill_depth_mm,
col = colors[as.character(penguins$species)],
pch = 19, cex = 1.2,
xlab = "Bill length (mm)", ylab = "Bill depth (mm)",
main = "Palmer Penguins: Bill Dimensions by Species")
legend("topright", legend = names(colors), col = colors, pch = 19)
dev.off()
png(file.path(output_dir, "flipper_boxplot.png"), width = 800, height = 600)
boxplot(flipper_length_mm ~ species, data = penguins,
col = colors[levels(penguins$species)],
xlab = "Species", ylab = "Flipper length (mm)",
main = "Palmer Penguins: Flipper Length Distribution")
dev.off()
cat(sprintf("Saved 2 plots to %s\n", output_dir))
depends: [clean]
- id: report
command: quarto render report.qmd -P data_dir:data -P output_dir:output
depends: [summarize, plot]
on_success:
command: echo "Pipeline complete -- report at output/report.html"
on_failure:
command: echo "Pipeline failed!"How the DAG executes
The dependency graph looks like this:
setup -> extract -> clean -> summarize -> report
-> plot -------->
daggle resolves dependencies into execution tiers. Validate to see the plan:
daggle validate penguin-reportThe key insight: summarize and plot both depend only on clean, so they run in parallel. The report step waits for both to finish before rendering.
Output markers
Steps communicate values downstream using the ::daggle-output:: protocol. The extract step emits:
cat("::daggle-output name=n_raw::", nrow(raw), "\n")daggle strips these lines from terminal output, parses them, and injects them as environment variables into downstream steps. The clean step could read this value as:
n_raw <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_N_RAW"))Step IDs are uppercased and hyphens become underscores in the variable name.
Environment variables
The env: block at the top of the YAML defines variables available to every step. This avoids hardcoding paths:
env:
DATA_DIR: data
OUTPUT_DIR: outputSteps read them with Sys.getenv(). You can override these at run time:
daggle run penguin-report -e DATA_DIR=/tmp/data -e OUTPUT_DIR=/tmp/outputLifecycle hooks
The on_success and on_failure hooks at the DAG level run after all steps complete (or after any step fails). They receive all accumulated outputs and run metadata as environment variables.
Common uses: send a Slack message, write to a log file, clean up temporary files.
Running the pipeline
daggle run penguin-reportCheck results:
daggle status penguin-report