library(daggleR)
dags <- list_dags()
dags
#> name steps schedule last_status last_run
#> 1 etl-daily 4 0 3 * * * success 2025-01-15T03:00:12Z
#> 2 reports 2 <NA> running 2025-01-15T10:30:00ZTutorial: Using daggleR
The daggleR package provides two sets of helpers: in-step functions for use inside pipeline steps, and API wrappers for controlling daggle from an R session. This tutorial shows how to migrate from raw protocol calls to daggleR, and how to use the API wrappers.
Part 1: In-step helpers
In-step helpers replace raw cat() and Sys.getenv() calls with validated, readable functions. They work inside any R step executed by daggle.
Emitting outputs
Before:
cat("::daggle-output name=row_count::", nrow(data), "\n")After:
daggleR::output("row_count", nrow(data))output() validates the key format, coerces the value to character, and prints the marker. It returns the value invisibly.
Reading upstream outputs
Before:
n <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_ROW_COUNT"))After:
n <- daggleR::get_output("extract", "row_count")get_output() handles the uppercasing and hyphen-to-underscore conversion. It returns the value as a character string (you still need as.integer() or as.numeric() if you want a number).
Run metadata
Before:
run_id <- Sys.getenv("DAGGLE_RUN_ID")
dag <- Sys.getenv("DAGGLE_DAG_NAME")
dir <- Sys.getenv("DAGGLE_RUN_DIR")After:
run_id <- daggleR::run_id()
dag <- daggleR::dag_name()
dir <- daggleR::run_dir()Full example
A step using daggleR:
library(daggleR)
raw_count <- get_output("extract", "n_raw")
cat(sprintf("Upstream extracted %s rows\n", raw_count))
data <- readRDS("data/clean.rds")
cat(sprintf("Clean data has %d rows\n", nrow(data)))
output("n_clean", nrow(data))
output("pct_retained", round(nrow(data) / as.integer(raw_count) * 100, 1))In-step helpers are base R only. They require no network access and no running daggle server – they communicate through environment variables and stdout.
Part 2: API wrappers
API wrappers talk to the daggle REST API (started with daggle serve --port 8787). They require the httr2 package.
The base URL is resolved from (in order): the base_url parameter, the DAGGLE_API_URL environment variable, or http://127.0.0.1:8787.
All code below assumes a running daggle server. The blocks are shown with eval: false since there is no live instance during doc rendering.
List DAGs
Trigger a run and poll for completion
result <- trigger("my-pipeline", params = list(date = "2026-04-04"))
run_id <- result$run_id
repeat {
run <- get_run("my-pipeline", run_id)
if (run$status %in% c("success", "failure", "cancelled")) break
Sys.sleep(2)
}
cat("Final status:", run$status, "\n")The trigger() function returns immediately with a run ID. Poll with get_run() until the status is terminal.
Get step outputs
outputs <- get_outputs("my-pipeline", run_id)
outputs
#> step_id key value
#> 1 extract row_count 1432
#> 2 clean n_clean 1398
#> 3 clean n_dropped 34Pass "latest" as the run ID to get outputs from the most recent run:
outputs <- get_outputs("my-pipeline", "latest")Approve a waiting step
If a pipeline has an approve: step, it pauses until someone approves or rejects:
approve("my-pipeline", run_id = "latest", step_id = "review")To reject instead:
reject("my-pipeline", run_id = "latest", step_id = "review")View step logs
log <- get_step_log("my-pipeline", "latest", "extract")
cat(log$stdout)Health check
health()
#> $status
#> [1] "ok"
#>
#> $version
#> [1] "0.5.0"Clean up old runs
cleanup("30d")
#> $removed
#> [1] 42
#>
#> $freed
#> [1] "10.0 MB"When to use which
| Scenario | Use |
|---|---|
| Inside a pipeline step | In-step helpers (output(), get_output()) |
| Interactive R session | API wrappers (trigger(), get_run()) |
| Shiny app controlling pipelines | API wrappers |
| CI script kicking off a pipeline | API wrappers |
| Reading upstream data in a step | In-step get_output() |