Tutorial: Using daggleR

The daggleR package provides two sets of helpers: in-step functions for use inside pipeline steps, and API wrappers for controlling daggle from an R session. This tutorial shows how to migrate from raw protocol calls to daggleR, and how to use the API wrappers.

Part 1: In-step helpers

In-step helpers replace raw cat() and Sys.getenv() calls with validated, readable functions. They work inside any R step executed by daggle.

Emitting outputs

Before:

cat("::daggle-output name=row_count::", nrow(data), "\n")

After:

daggleR::output("row_count", nrow(data))

output() validates the key format, coerces the value to character, and prints the marker. It returns the value invisibly.

Reading upstream outputs

Before:

n <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_ROW_COUNT"))

After:

n <- daggleR::get_output("extract", "row_count")

get_output() handles the uppercasing and hyphen-to-underscore conversion. It returns the value as a character string (you still need as.integer() or as.numeric() if you want a number).

Run metadata

Before:

run_id <- Sys.getenv("DAGGLE_RUN_ID")
dag <- Sys.getenv("DAGGLE_DAG_NAME")
dir <- Sys.getenv("DAGGLE_RUN_DIR")

After:

run_id <- daggleR::run_id()
dag <- daggleR::dag_name()
dir <- daggleR::run_dir()

Full example

A step using daggleR:

library(daggleR)

raw_count <- get_output("extract", "n_raw")
cat(sprintf("Upstream extracted %s rows\n", raw_count))

data <- readRDS("data/clean.rds")
cat(sprintf("Clean data has %d rows\n", nrow(data)))

output("n_clean", nrow(data))
output("pct_retained", round(nrow(data) / as.integer(raw_count) * 100, 1))

In-step helpers are base R only. They require no network access and no running daggle server – they communicate through environment variables and stdout.

Part 2: API wrappers

API wrappers talk to the daggle REST API (started with daggle serve --port 8787). They require the httr2 package.

The base URL is resolved from (in order): the base_url parameter, the DAGGLE_API_URL environment variable, or http://127.0.0.1:8787.

All code below assumes a running daggle server. The blocks are shown with eval: false since there is no live instance during doc rendering.

List DAGs

library(daggleR)

dags <- list_dags()
dags
#>       name steps   schedule last_status            last_run
#> 1 etl-daily     4 0 3 * * *     success 2025-01-15T03:00:12Z
#> 2   reports     2       <NA>     running 2025-01-15T10:30:00Z

Trigger a run and poll for completion

result <- trigger("my-pipeline", params = list(date = "2026-04-04"))
run_id <- result$run_id

repeat {
  run <- get_run("my-pipeline", run_id)
  if (run$status %in% c("success", "failure", "cancelled")) break
  Sys.sleep(2)
}

cat("Final status:", run$status, "\n")

The trigger() function returns immediately with a run ID. Poll with get_run() until the status is terminal.

Get step outputs

outputs <- get_outputs("my-pipeline", run_id)
outputs
#>     step_id          key                     value
#> 1   extract    row_count                      1432
#> 2     clean      n_clean                      1398
#> 3     clean    n_dropped                        34

Pass "latest" as the run ID to get outputs from the most recent run:

outputs <- get_outputs("my-pipeline", "latest")

Approve a waiting step

If a pipeline has an approve: step, it pauses until someone approves or rejects:

approve("my-pipeline", run_id = "latest", step_id = "review")

To reject instead:

reject("my-pipeline", run_id = "latest", step_id = "review")

View step logs

log <- get_step_log("my-pipeline", "latest", "extract")
cat(log$stdout)

Health check

health()
#> $status
#> [1] "ok"
#>
#> $version
#> [1] "0.5.0"

Clean up old runs

cleanup("30d")
#> $removed
#> [1] 42
#>
#> $freed
#> [1] "10.0 MB"

When to use which

Scenario Use
Inside a pipeline step In-step helpers (output(), get_output())
Interactive R session API wrappers (trigger(), get_run())
Shiny app controlling pipelines API wrappers
CI script kicking off a pipeline API wrappers
Reading upstream data in a step In-step get_output()