Tutorial: ETL Pipeline

This tutorial builds a data extraction, cleaning, summarization, plotting, and reporting pipeline using the Palmer Penguins dataset. It demonstrates step dependencies, parallel execution, output markers, and lifecycle hooks.

The pipeline

Create pipeline.yaml in your project directory:

name: penguin-report
env:
  DATA_DIR: data
  OUTPUT_DIR: output

steps:
  - id: setup
    r_expr: |
      dir.create(Sys.getenv("DATA_DIR"), showWarnings = FALSE)
      dir.create(Sys.getenv("OUTPUT_DIR"), showWarnings = FALSE)
      if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
        install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
      }
      cat("Setup complete\n")

  - id: extract
    r_expr: |
      library(palmerpenguins)
      raw <- penguins
      cat(sprintf("Loaded %d rows, %d columns\n", nrow(raw), ncol(raw)))
      saveRDS(raw, file.path(Sys.getenv("DATA_DIR"), "raw_penguins.rds"))
      cat("::daggle-output name=n_raw::", nrow(raw), "\n")
    depends: [setup]

  - id: clean
    r_expr: |
      raw <- readRDS(file.path(Sys.getenv("DATA_DIR"), "raw_penguins.rds"))
      clean <- raw[complete.cases(raw[, c("bill_length_mm", "bill_depth_mm",
                                           "flipper_length_mm", "body_mass_g")]), ]
      cat(sprintf("Removed %d incomplete rows (%d remaining)\n",
                  nrow(raw) - nrow(clean), nrow(clean)))
      saveRDS(clean, file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
      cat("::daggle-output name=n_clean::", nrow(clean), "\n")
      cat("::daggle-output name=n_dropped::", nrow(raw) - nrow(clean), "\n")
    depends: [extract]

  - id: summarize
    r_expr: |
      penguins <- readRDS(file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
      species_list <- split(penguins, penguins$species)
      summary_rows <- lapply(names(species_list), function(sp) {
        d <- species_list[[sp]]
        data.frame(
          species = sp,
          n = nrow(d),
          mean_bill_length = round(mean(d$bill_length_mm), 1),
          mean_bill_depth = round(mean(d$bill_depth_mm), 1),
          mean_flipper_length = round(mean(d$flipper_length_mm), 1),
          mean_body_mass = round(mean(d$body_mass_g), 0)
        )
      })
      summary_df <- do.call(rbind, summary_rows)
      print(summary_df, row.names = FALSE)
      write.csv(summary_df, file.path(Sys.getenv("OUTPUT_DIR"), "species_summary.csv"),
                row.names = FALSE)
      cat("::daggle-output name=n_species::", nrow(summary_df), "\n")
    depends: [clean]

  - id: plot
    r_expr: |
      penguins <- readRDS(file.path(Sys.getenv("DATA_DIR"), "clean_penguins.rds"))
      output_dir <- Sys.getenv("OUTPUT_DIR")

      png(file.path(output_dir, "bill_scatter.png"), width = 800, height = 600)
      colors <- c(Adelie = "#FF6B35", Chinstrap = "#004E89", Gentoo = "#2BA84A")
      plot(penguins$bill_length_mm, penguins$bill_depth_mm,
           col = colors[as.character(penguins$species)],
           pch = 19, cex = 1.2,
           xlab = "Bill length (mm)", ylab = "Bill depth (mm)",
           main = "Palmer Penguins: Bill Dimensions by Species")
      legend("topright", legend = names(colors), col = colors, pch = 19)
      dev.off()

      png(file.path(output_dir, "flipper_boxplot.png"), width = 800, height = 600)
      boxplot(flipper_length_mm ~ species, data = penguins,
              col = colors[levels(penguins$species)],
              xlab = "Species", ylab = "Flipper length (mm)",
              main = "Palmer Penguins: Flipper Length Distribution")
      dev.off()

      cat(sprintf("Saved 2 plots to %s\n", output_dir))
    depends: [clean]

  - id: report
    command: quarto render report.qmd -P data_dir:data -P output_dir:output
    depends: [summarize, plot]

on_success:
  command: echo "Pipeline complete -- report at output/report.html"
on_failure:
  command: echo "Pipeline failed!"

How the DAG executes

The dependency graph looks like this:

setup -> extract -> clean -> summarize -> report
                          -> plot -------->

daggle resolves dependencies into execution tiers. Validate to see the plan:

daggle validate penguin-report

The key insight: summarize and plot both depend only on clean, so they run in parallel. The report step waits for both to finish before rendering.

Output markers

Steps communicate values downstream using the ::daggle-output:: protocol. The extract step emits:

cat("::daggle-output name=n_raw::", nrow(raw), "\n")

daggle strips these lines from terminal output, parses them, and injects them as environment variables into downstream steps. The clean step could read this value as:

n_raw <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_N_RAW"))

Step IDs are uppercased and hyphens become underscores in the variable name.

Environment variables

The env: block at the top of the YAML defines variables available to every step. This avoids hardcoding paths:

env:
  DATA_DIR: data
  OUTPUT_DIR: output

Steps read them with Sys.getenv(). You can override these at run time:

daggle run penguin-report -e DATA_DIR=/tmp/data -e OUTPUT_DIR=/tmp/output

Lifecycle hooks

The on_success and on_failure hooks at the DAG level run after all steps complete (or after any step fails). They receive all accumulated outputs and run metadata as environment variables.

Common uses: send a Slack message, write to a log file, clean up temporary files.

Running the pipeline

daggle run penguin-report

Check results:

daggle status penguin-report