daggleR::daggle_summary_md("## Results\n\nProcessed **42** rows successfully.")Tutorial: Using daggleR
The daggleR package provides two sets of helpers: in-step functions for use inside pipeline steps, and API wrappers for controlling daggle from an R session. This tutorial shows how to migrate from raw protocol calls to daggleR, and how to use the API wrappers.
Part 1: In-step helpers
In-step helpers replace raw cat() and Sys.getenv() calls with validated, readable functions. They work inside any R step executed by daggle.
Emitting outputs
Before:
cat("::daggle-output name=row_count::", nrow(data), "\n")After:
daggleR::daggle_output("row_count", nrow(data))daggle_output() validates the key format, coerces the value to character, and prints the marker. It returns the value invisibly.
Reading upstream outputs
Before:
n <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_ROW_COUNT"))After:
n <- daggleR::daggle_get_output("extract", "row_count")daggle_get_output() handles the uppercasing and hyphen-to-underscore conversion. It returns the value as a character string (you still need as.integer() or as.numeric() if you want a number).
Run metadata
Before:
run_id <- Sys.getenv("DAGGLE_RUN_ID")
dag <- Sys.getenv("DAGGLE_DAG_NAME")
dir <- Sys.getenv("DAGGLE_RUN_DIR")After:
run_id <- daggleR::daggle_run_id()
dag <- daggleR::daggle_dag_name()
dir <- daggleR::daggle_run_dir()Full example
A step using daggleR:
library(daggleR)
raw_count <- daggle_get_output("extract", "n_raw")
cat(sprintf("Upstream extracted %s rows\n", raw_count))
data <- readRDS("data/clean.rds")
cat(sprintf("Clean data has %d rows\n", nrow(data)))
daggle_output("n_clean", nrow(data))
daggle_output("pct_retained", round(nrow(data) / as.integer(raw_count) * 100, 1))In-step helpers are base R only. They require no network access and no running daggle server – they communicate through environment variables and stdout.
Summaries, metadata, and validations
Rich per-step metadata helpers, added in daggleR 0.2.0. Each emits the corresponding marker protocol documented in the Output Protocol reference.
daggle_summary_md()
Emit a markdown summary for display in dashboards:
Equivalent to printing ::daggle-summary format=markdown::....
daggle_meta_numeric()
Emit a numeric metadata entry:
daggleR::daggle_meta_numeric("row_count", 1542)daggle_meta_text()
Emit a text metadata entry:
daggleR::daggle_meta_text("model_desc", "Linear regression with 3 predictors")daggle_meta_table()
Emit a table metadata entry (serialized as JSON):
daggleR::daggle_meta_table("top5", head(results, 5))The data frame is converted to a JSON array of objects.
daggle_meta_image()
Emit an image metadata entry (file path):
daggleR::daggle_meta_image("residuals", "output/residuals.png")daggle_validation()
Emit a structured validation result:
daggleR::daggle_validation("row_count", "pass", "Expected > 0, got 1542")
daggleR::daggle_validation("schema", "fail", "Column 'date' expected date, got character")The status argument must be one of "pass", "warn", or "fail".
Part 2: API wrappers
API wrappers talk to the daggle REST API (started with daggle serve --port 8787). They require the httr2 package.
The base URL is resolved from (in order): the base_url parameter, the DAGGLE_API_URL environment variable, or http://127.0.0.1:8787.
All code below assumes a running daggle server. The blocks are shown with eval: false since there is no live instance during doc rendering.
List DAGs
library(daggleR)
dags <- daggle_list_dags()
dags
#> name steps schedule last_status last_run
#> 1 etl-daily 4 0 3 * * * success 2025-01-15T03:00:12Z
#> 2 reports 2 <NA> running 2025-01-15T10:30:00ZTrigger a run and poll for completion
result <- daggle_trigger("my-pipeline", params = list(date = "2026-04-04"))
run_id <- result$run_id
repeat {
run <- daggle_get_run("my-pipeline", run_id)
if (run$status %in% c("success", "failure", "cancelled")) break
Sys.sleep(2)
}
cat("Final status:", run$status, "\n")The daggle_trigger() function returns immediately with a run ID. Poll with daggle_get_run() until the status is terminal.
Get step outputs
outputs <- daggle_get_outputs("my-pipeline", run_id)
outputs
#> step_id key value
#> 1 extract row_count 1432
#> 2 clean n_clean 1398
#> 3 clean n_dropped 34Pass "latest" as the run ID to get outputs from the most recent run:
outputs <- daggle_get_outputs("my-pipeline", "latest")Approve a waiting step
If a pipeline has an approve: step, it pauses until someone approves or rejects:
daggle_approve("my-pipeline", run_id = "latest", step_id = "review")To reject instead:
daggle_reject("my-pipeline", run_id = "latest", step_id = "review")View step logs
log <- daggle_get_step_log("my-pipeline", "latest", "extract")
cat(log$stdout)Health check
daggle_health()
#> $status
#> [1] "ok"
#>
#> $version
#> [1] "0.5.0"Clean up old runs
daggle_cleanup("30d")
#> $removed
#> [1] 42
#>
#> $freed
#> [1] "10.0 MB"Artifacts, summaries, metadata, validations, and comparison
API wrappers added in daggleR 0.2.0. They talk to endpoints from daggle’s Phase 7 analyst workflow features.
daggle_list_artifacts()
List declared artifacts for a run:
artifacts <- daggleR::daggle_list_artifacts("my-pipeline", "latest")
artifacts
#> step_id name path hash size format
#> 1 train model output/model.rds e3b0c4 204800 rdsdaggle_plan()
Preview the execution plan with cache status before triggering:
daggleR::daggle_plan("my-pipeline")
#> step_id status reason
#> 1 fetch outdated input file changed
#> 2 transform cached inputs unchanged since last run
#> 3 report no-cache caching not enableddaggle_get_summaries()
Retrieve step summaries emitted during a run:
summaries <- daggleR::daggle_get_summaries("my-pipeline", "latest")
cat(summaries$content[1])daggle_get_metadata()
Retrieve typed metadata entries from a run:
meta <- daggleR::daggle_get_metadata("my-pipeline", "latest")
meta
#> step_id name type value
#> 1 train row_count numeric 1542
#> 2 train model_desc text Linear regressiondaggle_get_validations()
Retrieve validation results from a run:
vals <- daggleR::daggle_get_validations("my-pipeline", "latest")
vals
#> step_id name status message
#> 1 validate row_count pass Expected > 0, got 1542
#> 2 validate schema fail Column 'date' expected date, got chardaggle_compare_runs()
Compare two runs side-by-side:
diff <- daggleR::daggle_compare_runs("my-pipeline", run1 = "20250115T030012", run2 = "20250116T030015")
diff$duration_diff
#> $run1_seconds
#> [1] 47.2
#>
#> $run2_seconds
#> [1] 52.8
#>
#> $diff_seconds
#> [1] 5.6Annotations and impact (daggleR 0.3.0)
API wrappers added in daggleR 0.3.0, covering daggle’s Phase 8 collaboration and observability endpoints.
daggle_list_annotations() and daggle_add_annotation()
Attach free-form notes to a run — useful for post-mortems ("DB was down — manual restart at 08:30") that should surface next to the run in status output and dashboards:
daggleR::daggle_add_annotation("penguin-monitor", "latest",
note = "Restarted after upstream feed recovered",
author = "alice")
daggleR::daggle_list_annotations("penguin-monitor", "latest")
#> timestamp author note
#> 1 2026-04-17T08:45:12Z alice Restarted after upstream feed recoveredauthor defaults to Sys.getenv("USER").
daggle_get_impact()
List DAGs that depend on a given DAG (via trigger.on_dag.name) plus any exposures declared on it (Shiny apps, Quarto reports, dashboards). Purely informational — daggle does not deploy or monitor the exposures.
impact <- daggleR::daggle_get_impact("penguin-clean")
impact$downstream_dags
#> name project trigger_on_status
#> 1 penguin-models data completed
#> 2 penguin-report data any
impact$exposures
#> name type url description
#> 1 ops-dashboard dashboard https://dash.example.com/ops Main opsList and filter DAGs by ownership
daggle_list_dags() accepts tag, team, and owner filters that map directly to the ?tag=&team=&owner= query params on GET /api/v1/dags:
etl_dags <- daggleR::daggle_list_dags(tag = "etl", team = "data")When a DAG declares owner:, team:, description:, or tags:, those fields appear as columns in the response data frame.
When to use which
| Scenario | Use |
|---|---|
| Inside a pipeline step | In-step helpers (daggle_output(), daggle_get_output()) |
| Interactive R session | API wrappers (daggle_trigger(), daggle_get_run()) |
| Shiny app controlling pipelines | API wrappers |
| CI script kicking off a pipeline | API wrappers |
| Reading upstream data in a step | In-step daggle_get_output() |
| Post-mortem notes on a failed run | daggle_add_annotation() |
| Auditing downstream consumers before a schema change | daggle_get_impact() |