Tutorial: Using daggleR

The daggleR package provides two sets of helpers: in-step functions for use inside pipeline steps, and API wrappers for controlling daggle from an R session. This tutorial shows how to migrate from raw protocol calls to daggleR, and how to use the API wrappers.

Part 1: In-step helpers

In-step helpers replace raw cat() and Sys.getenv() calls with validated, readable functions. They work inside any R step executed by daggle.

Emitting outputs

Before:

cat("::daggle-output name=row_count::", nrow(data), "\n")

After:

daggleR::daggle_output("row_count", nrow(data))

daggle_output() validates the key format, coerces the value to character, and prints the marker. It returns the value invisibly.

Reading upstream outputs

Before:

n <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_ROW_COUNT"))

After:

n <- daggleR::daggle_get_output("extract", "row_count")

daggle_get_output() handles the uppercasing and hyphen-to-underscore conversion. It returns the value as a character string (you still need as.integer() or as.numeric() if you want a number).

Run metadata

Before:

run_id <- Sys.getenv("DAGGLE_RUN_ID")
dag <- Sys.getenv("DAGGLE_DAG_NAME")
dir <- Sys.getenv("DAGGLE_RUN_DIR")

After:

run_id <- daggleR::daggle_run_id()
dag <- daggleR::daggle_dag_name()
dir <- daggleR::daggle_run_dir()

Full example

A step using daggleR:

library(daggleR)

raw_count <- daggle_get_output("extract", "n_raw")
cat(sprintf("Upstream extracted %s rows\n", raw_count))

data <- readRDS("data/clean.rds")
cat(sprintf("Clean data has %d rows\n", nrow(data)))

daggle_output("n_clean", nrow(data))
daggle_output("pct_retained", round(nrow(data) / as.integer(raw_count) * 100, 1))

In-step helpers are base R only. They require no network access and no running daggle server – they communicate through environment variables and stdout.

Summaries, metadata, and validations

Rich per-step metadata helpers, added in daggleR 0.2.0. Each emits the corresponding marker protocol documented in the Output Protocol reference.

daggle_summary_md()

Emit a markdown summary for display in dashboards:

daggleR::daggle_summary_md("## Results\n\nProcessed **42** rows successfully.")

Equivalent to printing ::daggle-summary format=markdown::....

daggle_meta_numeric()

Emit a numeric metadata entry:

daggleR::daggle_meta_numeric("row_count", 1542)

daggle_meta_text()

Emit a text metadata entry:

daggleR::daggle_meta_text("model_desc", "Linear regression with 3 predictors")

daggle_meta_table()

Emit a table metadata entry (serialized as JSON):

daggleR::daggle_meta_table("top5", head(results, 5))

The data frame is converted to a JSON array of objects.

daggle_meta_image()

Emit an image metadata entry (file path):

daggleR::daggle_meta_image("residuals", "output/residuals.png")

daggle_validation()

Emit a structured validation result:

daggleR::daggle_validation("row_count", "pass", "Expected > 0, got 1542")
daggleR::daggle_validation("schema", "fail", "Column 'date' expected date, got character")

The status argument must be one of "pass", "warn", or "fail".

Part 2: API wrappers

API wrappers talk to the daggle REST API (started with daggle serve --port 8787). They require the httr2 package.

The base URL is resolved from (in order): the base_url parameter, the DAGGLE_API_URL environment variable, or http://127.0.0.1:8787.

All code below assumes a running daggle server. The blocks are shown with eval: false since there is no live instance during doc rendering.

List DAGs

library(daggleR)

dags <- daggle_list_dags()
dags
#>       name steps   schedule last_status            last_run
#> 1 etl-daily     4 0 3 * * *     success 2025-01-15T03:00:12Z
#> 2   reports     2       <NA>     running 2025-01-15T10:30:00Z

Trigger a run and poll for completion

result <- daggle_trigger("my-pipeline", params = list(date = "2026-04-04"))
run_id <- result$run_id

repeat {
  run <- daggle_get_run("my-pipeline", run_id)
  if (run$status %in% c("success", "failure", "cancelled")) break
  Sys.sleep(2)
}

cat("Final status:", run$status, "\n")

The daggle_trigger() function returns immediately with a run ID. Poll with daggle_get_run() until the status is terminal.

Get step outputs

outputs <- daggle_get_outputs("my-pipeline", run_id)
outputs
#>     step_id          key                     value
#> 1   extract    row_count                      1432
#> 2     clean      n_clean                      1398
#> 3     clean    n_dropped                        34

Pass "latest" as the run ID to get outputs from the most recent run:

outputs <- daggle_get_outputs("my-pipeline", "latest")

Approve a waiting step

If a pipeline has an approve: step, it pauses until someone approves or rejects:

daggle_approve("my-pipeline", run_id = "latest", step_id = "review")

To reject instead:

daggle_reject("my-pipeline", run_id = "latest", step_id = "review")

View step logs

log <- daggle_get_step_log("my-pipeline", "latest", "extract")
cat(log$stdout)

Health check

daggle_health()
#> $status
#> [1] "ok"
#>
#> $version
#> [1] "0.5.0"

Clean up old runs

daggle_cleanup("30d")
#> $removed
#> [1] 42
#>
#> $freed
#> [1] "10.0 MB"

Artifacts, summaries, metadata, validations, and comparison

API wrappers added in daggleR 0.2.0. They talk to endpoints from daggle’s Phase 7 analyst workflow features.

daggle_list_artifacts()

List declared artifacts for a run:

artifacts <- daggleR::daggle_list_artifacts("my-pipeline", "latest")
artifacts
#>   step_id   name              path   hash      size format
#> 1   train  model output/model.rds  e3b0c4   204800    rds

daggle_plan()

Preview the execution plan with cache status before triggering:

daggleR::daggle_plan("my-pipeline")
#>      step_id   status                      reason
#> 1      fetch outdated          input file changed
#> 2  transform   cached inputs unchanged since last run
#> 3     report no-cache         caching not enabled

daggle_get_summaries()

Retrieve step summaries emitted during a run:

summaries <- daggleR::daggle_get_summaries("my-pipeline", "latest")
cat(summaries$content[1])

daggle_get_metadata()

Retrieve typed metadata entries from a run:

meta <- daggleR::daggle_get_metadata("my-pipeline", "latest")
meta
#>   step_id       name    type  value
#> 1   train  row_count numeric   1542
#> 2   train model_desc    text Linear regression

daggle_get_validations()

Retrieve validation results from a run:

vals <- daggleR::daggle_get_validations("my-pipeline", "latest")
vals
#>     step_id       name status                               message
#> 1  validate  row_count   pass              Expected > 0, got 1542
#> 2  validate     schema   fail Column 'date' expected date, got char

daggle_compare_runs()

Compare two runs side-by-side:

diff <- daggleR::daggle_compare_runs("my-pipeline", run1 = "20250115T030012", run2 = "20250116T030015")
diff$duration_diff
#> $run1_seconds
#> [1] 47.2
#>
#> $run2_seconds
#> [1] 52.8
#>
#> $diff_seconds
#> [1] 5.6

Annotations and impact (daggleR 0.3.0)

API wrappers added in daggleR 0.3.0, covering daggle’s Phase 8 collaboration and observability endpoints.

daggle_list_annotations() and daggle_add_annotation()

Attach free-form notes to a run — useful for post-mortems ("DB was down — manual restart at 08:30") that should surface next to the run in status output and dashboards:

daggleR::daggle_add_annotation("penguin-monitor", "latest",
  note = "Restarted after upstream feed recovered",
  author = "alice")

daggleR::daggle_list_annotations("penguin-monitor", "latest")
#>              timestamp  author                                     note
#> 1 2026-04-17T08:45:12Z   alice  Restarted after upstream feed recovered

author defaults to Sys.getenv("USER").

daggle_get_impact()

List DAGs that depend on a given DAG (via trigger.on_dag.name) plus any exposures declared on it (Shiny apps, Quarto reports, dashboards). Purely informational — daggle does not deploy or monitor the exposures.

impact <- daggleR::daggle_get_impact("penguin-clean")
impact$downstream_dags
#>              name project trigger_on_status
#> 1 penguin-models    data          completed
#> 2 penguin-report    data                any

impact$exposures
#>             name      type                             url description
#> 1  ops-dashboard dashboard  https://dash.example.com/ops Main ops

List and filter DAGs by ownership

daggle_list_dags() accepts tag, team, and owner filters that map directly to the ?tag=&team=&owner= query params on GET /api/v1/dags:

etl_dags <- daggleR::daggle_list_dags(tag = "etl", team = "data")

When a DAG declares owner:, team:, description:, or tags:, those fields appear as columns in the response data frame.

When to use which

Scenario Use
Inside a pipeline step In-step helpers (daggle_output(), daggle_get_output())
Interactive R session API wrappers (daggle_trigger(), daggle_get_run())
Shiny app controlling pipelines API wrappers
CI script kicking off a pipeline API wrappers
Reading upstream data in a step In-step daggle_get_output()
Post-mortem notes on a failed run daggle_add_annotation()
Auditing downstream consumers before a schema change daggle_get_impact()