Tutorial: Data Quality Monitoring

This tutorial builds a scheduled data quality pipeline that checks completeness and distributions, logs results over time, and alerts on failures.

The pipeline

name: penguin-monitor
trigger:
  schedule: "@every 6h"

env:
  REPORT_DIR: reports

steps:
  - id: load
    r_expr: |
      dir.create(Sys.getenv("REPORT_DIR"), showWarnings = FALSE)

      if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
        install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
      }

      library(palmerpenguins)
      penguins <- penguins
      saveRDS(penguins, "penguins_snapshot.rds")

      cat(sprintf("Loaded %d rows at %s\n", nrow(penguins), Sys.time()))
      cat("::daggle-output name=n_rows::", nrow(penguins), "\n")

  - id: check-completeness
    r_expr: |
      penguins <- readRDS("penguins_snapshot.rds")

      missing <- colSums(is.na(penguins))
      total <- nrow(penguins)
      pct_complete <- round((1 - sum(missing) / (total * ncol(penguins))) * 100, 1)

      cat("Missing values per column:\n")
      print(missing[missing > 0])
      cat(sprintf("\nOverall completeness: %.1f%%\n", pct_complete))

      if (pct_complete < 90) {
        stop(sprintf("Data completeness %.1f%% is below 90%% threshold!", pct_complete))
      }

      cat("::daggle-output name=completeness::", pct_complete, "\n")
    depends: [load]

  - id: check-distributions
    r_expr: |
      penguins <- readRDS("penguins_snapshot.rds")
      penguins <- penguins[complete.cases(penguins), ]

      numeric_cols <- c("bill_length_mm", "bill_depth_mm",
                        "flipper_length_mm", "body_mass_g")

      outlier_count <- 0
      for (col in numeric_cols) {
        vals <- penguins[[col]]
        z <- abs((vals - mean(vals)) / sd(vals))
        n_outliers <- sum(z > 3)
        if (n_outliers > 0) {
          cat(sprintf("WARNING: %d outliers in %s (> 3 SD)\n", n_outliers, col))
        }
        outlier_count <- outlier_count + n_outliers
      }

      cat(sprintf("\nTotal outliers detected: %d\n", outlier_count))
      cat("::daggle-output name=outlier_count::", outlier_count, "\n")

      counts <- table(penguins$species)
      cat("\nSpecies distribution:\n")
      print(counts)

      min_pct <- min(prop.table(counts)) * 100
      if (min_pct < 10) {
        warning(sprintf("Smallest species group is only %.1f%% -- possible imbalance", min_pct))
      }
    depends: [load]

  - id: report
    r_expr: |
      completeness <- Sys.getenv("DAGGLE_OUTPUT_CHECK_COMPLETENESS_COMPLETENESS")
      outliers <- Sys.getenv("DAGGLE_OUTPUT_CHECK_DISTRIBUTIONS_OUTLIER_COUNT")
      n_rows <- Sys.getenv("DAGGLE_OUTPUT_LOAD_N_ROWS")
      run_id <- Sys.getenv("DAGGLE_RUN_ID")

      report <- data.frame(
        timestamp = Sys.time(),
        run_id = run_id,
        rows = as.integer(n_rows),
        completeness_pct = as.numeric(completeness),
        outliers = as.integer(outliers),
        status = "OK"
      )

      report_file <- file.path(Sys.getenv("REPORT_DIR"), "quality_log.csv")
      write_header <- !file.exists(report_file)
      write.table(report, report_file, append = TRUE, sep = ",",
                  row.names = FALSE, col.names = write_header)

      cat("\n=== Data Quality Report ===\n")
      cat(sprintf("Run:          %s\n", run_id))
      cat(sprintf("Rows:         %s\n", n_rows))
      cat(sprintf("Completeness: %s%%\n", completeness))
      cat(sprintf("Outliers:     %s\n", outliers))
      cat(sprintf("Status:       OK\n"))
    depends: [check-completeness, check-distributions]

on_failure:
  r_expr: |
    cat(sprintf("DATA QUALITY CHECK FAILED at %s\n", Sys.time()))
    cat("Review the step logs for details.\n")

How the DAG executes

load -> check-completeness  -> report
     -> check-distributions ->

The two check steps run in parallel after load. Both must complete before report runs.

The check-completeness step calls stop() if completeness drops below 90%. This causes the step to fail, which blocks report and triggers the on_failure hook.

The cron trigger

trigger:
  schedule: "@every 6h"

Start the scheduler to activate the trigger:

daggle serve

The pipeline runs every 6 hours. In a real scenario, the load step would fetch a live data snapshot (from a database, API, or file drop) rather than loading a static package dataset.

The quality log

Each successful run appends a row to reports/quality_log.csv:

timestamp,run_id,rows,completeness_pct,outliers,status
2026-04-04 06:00:12,20260404T060012,344,97.2,2,OK
2026-04-04 12:00:15,20260404T120015,344,97.2,2,OK

This CSV accumulates over time, giving you a historical record of data quality.

Tracking quality over time

Use daggle history to see recent runs:

daggle history penguin-monitor --last 10
Run ID               Status     Duration  DAG Hash
20260404T120015      completed  12s       c4a8f2
20260404T060012      completed  11s       c4a8f2
20260403T180009      completed  13s       c4a8f2
20260403T120011      failed     3s        c4a8f2

Use daggle stats to see duration trends and success rates:

daggle stats penguin-monitor --last 30
Step                  Avg      P50      P95
load                  2s       2s       3s
check-completeness    4s       4s       6s
check-distributions   5s       5s       7s
report                1s       1s       2s

Overall success rate: 96% (29/30)

A drop in success rate tells you something changed in the data. The quality log CSV and the step logs together give you the full picture.

Failure handling

The on_failure hook runs when any step fails. In production, you would replace the cat() call with a real notification:

on_failure:
  r_expr: |
    slackr::slackr_msg(
      sprintf("Data quality check FAILED (run %s) at %s",
              Sys.getenv("DAGGLE_RUN_ID"), Sys.time())
    )

Running manually

You do not need to wait for the cron trigger. Run the pipeline any time:

daggle run penguin-monitor
daggle status penguin-monitor