Tutorial: Data Quality Monitoring
This tutorial builds a scheduled data quality pipeline that checks completeness and distributions, logs results over time, and alerts on failures.
The pipeline
name: penguin-monitor
trigger:
schedule: "@every 6h"
env:
REPORT_DIR: reports
steps:
- id: load
r_expr: |
dir.create(Sys.getenv("REPORT_DIR"), showWarnings = FALSE)
if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
}
library(palmerpenguins)
penguins <- penguins
saveRDS(penguins, "penguins_snapshot.rds")
cat(sprintf("Loaded %d rows at %s\n", nrow(penguins), Sys.time()))
cat("::daggle-output name=n_rows::", nrow(penguins), "\n")
- id: check-completeness
r_expr: |
penguins <- readRDS("penguins_snapshot.rds")
missing <- colSums(is.na(penguins))
total <- nrow(penguins)
pct_complete <- round((1 - sum(missing) / (total * ncol(penguins))) * 100, 1)
cat("Missing values per column:\n")
print(missing[missing > 0])
cat(sprintf("\nOverall completeness: %.1f%%\n", pct_complete))
if (pct_complete < 90) {
stop(sprintf("Data completeness %.1f%% is below 90%% threshold!", pct_complete))
}
cat("::daggle-output name=completeness::", pct_complete, "\n")
depends: [load]
- id: check-distributions
r_expr: |
penguins <- readRDS("penguins_snapshot.rds")
penguins <- penguins[complete.cases(penguins), ]
numeric_cols <- c("bill_length_mm", "bill_depth_mm",
"flipper_length_mm", "body_mass_g")
outlier_count <- 0
for (col in numeric_cols) {
vals <- penguins[[col]]
z <- abs((vals - mean(vals)) / sd(vals))
n_outliers <- sum(z > 3)
if (n_outliers > 0) {
cat(sprintf("WARNING: %d outliers in %s (> 3 SD)\n", n_outliers, col))
}
outlier_count <- outlier_count + n_outliers
}
cat(sprintf("\nTotal outliers detected: %d\n", outlier_count))
cat("::daggle-output name=outlier_count::", outlier_count, "\n")
counts <- table(penguins$species)
cat("\nSpecies distribution:\n")
print(counts)
min_pct <- min(prop.table(counts)) * 100
if (min_pct < 10) {
warning(sprintf("Smallest species group is only %.1f%% -- possible imbalance", min_pct))
}
depends: [load]
- id: report
r_expr: |
completeness <- Sys.getenv("DAGGLE_OUTPUT_CHECK_COMPLETENESS_COMPLETENESS")
outliers <- Sys.getenv("DAGGLE_OUTPUT_CHECK_DISTRIBUTIONS_OUTLIER_COUNT")
n_rows <- Sys.getenv("DAGGLE_OUTPUT_LOAD_N_ROWS")
run_id <- Sys.getenv("DAGGLE_RUN_ID")
report <- data.frame(
timestamp = Sys.time(),
run_id = run_id,
rows = as.integer(n_rows),
completeness_pct = as.numeric(completeness),
outliers = as.integer(outliers),
status = "OK"
)
report_file <- file.path(Sys.getenv("REPORT_DIR"), "quality_log.csv")
write_header <- !file.exists(report_file)
write.table(report, report_file, append = TRUE, sep = ",",
row.names = FALSE, col.names = write_header)
cat("\n=== Data Quality Report ===\n")
cat(sprintf("Run: %s\n", run_id))
cat(sprintf("Rows: %s\n", n_rows))
cat(sprintf("Completeness: %s%%\n", completeness))
cat(sprintf("Outliers: %s\n", outliers))
cat(sprintf("Status: OK\n"))
depends: [check-completeness, check-distributions]
on_failure:
r_expr: |
cat(sprintf("DATA QUALITY CHECK FAILED at %s\n", Sys.time()))
cat("Review the step logs for details.\n")How the DAG executes
load -> check-completeness -> report
-> check-distributions ->
The two check steps run in parallel after load. Both must complete before report runs.
The check-completeness step calls stop() if completeness drops below 90%. This causes the step to fail, which blocks report and triggers the on_failure hook.
The cron trigger
trigger:
schedule: "@every 6h"Start the scheduler to activate the trigger:
daggle serveThe pipeline runs every 6 hours. In a real scenario, the load step would fetch a live data snapshot (from a database, API, or file drop) rather than loading a static package dataset.
The quality log
Each successful run appends a row to reports/quality_log.csv:
timestamp,run_id,rows,completeness_pct,outliers,status
2026-04-04 06:00:12,20260404T060012,344,97.2,2,OK
2026-04-04 12:00:15,20260404T120015,344,97.2,2,OK
This CSV accumulates over time, giving you a historical record of data quality.
Tracking quality over time
Use daggle history to see recent runs:
daggle history penguin-monitor --last 10Run ID Status Duration DAG Hash
20260404T120015 completed 12s c4a8f2
20260404T060012 completed 11s c4a8f2
20260403T180009 completed 13s c4a8f2
20260403T120011 failed 3s c4a8f2
Use daggle stats to see duration trends and success rates:
daggle stats penguin-monitor --last 30Step Avg P50 P95
load 2s 2s 3s
check-completeness 4s 4s 6s
check-distributions 5s 5s 7s
report 1s 1s 2s
Overall success rate: 96% (29/30)
A drop in success rate tells you something changed in the data. The quality log CSV and the step logs together give you the full picture.
Failure handling
The on_failure hook runs when any step fails. In production, you would replace the cat() call with a real notification:
on_failure:
r_expr: |
slackr::slackr_msg(
sprintf("Data quality check FAILED (run %s) at %s",
Sys.getenv("DAGGLE_RUN_ID"), Sys.time())
)Running manually
You do not need to wait for the cron trigger. Run the pipeline any time:
daggle run penguin-monitor
daggle status penguin-monitor