Tutorial: Model Training Pipeline

This tutorial fits three classification models in parallel, collects their accuracies, and produces a comparison table. It demonstrates parallel step execution and reading upstream outputs via environment variables.

The pipeline

name: penguin-models
env:
  DATA_DIR: data
  MODEL_DIR: models

steps:
  - id: prepare
    r_expr: |
      dir.create(Sys.getenv("DATA_DIR"), showWarnings = FALSE)
      dir.create(Sys.getenv("MODEL_DIR"), showWarnings = FALSE)

      if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
        install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
      }

      library(palmerpenguins)
      penguins <- penguins[complete.cases(penguins), ]
      penguins$species_int <- as.integer(penguins$species)

      set.seed(42)
      n <- nrow(penguins)
      train_idx <- sample(n, size = floor(0.7 * n))

      train <- penguins[train_idx, ]
      test <- penguins[-train_idx, ]

      saveRDS(train, file.path(Sys.getenv("DATA_DIR"), "train.rds"))
      saveRDS(test, file.path(Sys.getenv("DATA_DIR"), "test.rds"))

      cat(sprintf("Train: %d rows, Test: %d rows\n", nrow(train), nrow(test)))
      cat("::daggle-output name=n_train::", nrow(train), "\n")
      cat("::daggle-output name=n_test::", nrow(test), "\n")

  - id: fit-lda
    r_expr: |
      train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
      test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))

      model <- MASS::lda(species ~ bill_length_mm + bill_depth_mm +
                          flipper_length_mm + body_mass_g, data = train)
      preds <- predict(model, test)$class
      accuracy <- mean(preds == test$species)

      saveRDS(model, file.path(Sys.getenv("MODEL_DIR"), "lda_model.rds"))
      cat(sprintf("LDA accuracy: %.1f%%\n", accuracy * 100))
      cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")
    depends: [prepare]

  - id: fit-tree
    r_expr: |
      train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
      test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))

      model <- rpart::rpart(species ~ bill_length_mm + bill_depth_mm +
                             flipper_length_mm + body_mass_g, data = train)
      preds <- predict(model, test, type = "class")
      accuracy <- mean(preds == test$species)

      saveRDS(model, file.path(Sys.getenv("MODEL_DIR"), "tree_model.rds"))
      cat(sprintf("Decision tree accuracy: %.1f%%\n", accuracy * 100))
      cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")
    depends: [prepare]

  - id: fit-knn
    r_expr: |
      train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
      test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))

      features <- c("bill_length_mm", "bill_depth_mm",
                     "flipper_length_mm", "body_mass_g")

      means <- colMeans(train[, features])
      sds <- apply(train[, features], 2, sd)
      train_scaled <- scale(train[, features], center = means, scale = sds)
      test_scaled <- scale(test[, features], center = means, scale = sds)

      preds <- class::knn(train_scaled, test_scaled, train$species, k = 5)
      accuracy <- mean(preds == test$species)

      cat(sprintf("KNN (k=5) accuracy: %.1f%%\n", accuracy * 100))
      cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")
    depends: [prepare]

  - id: compare
    r_expr: |
      lda_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_LDA_ACCURACY"))
      tree_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_TREE_ACCURACY"))
      knn_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_KNN_ACCURACY"))

      results <- data.frame(
        model = c("LDA", "Decision Tree", "KNN (k=5)"),
        accuracy = c(lda_acc, tree_acc, knn_acc)
      )
      results <- results[order(-results$accuracy), ]
      results$accuracy_pct <- sprintf("%.1f%%", results$accuracy * 100)

      cat("\n=== Model Comparison ===\n\n")
      print(results[, c("model", "accuracy_pct")], row.names = FALSE)
      cat(sprintf("\nBest model: %s (%.1f%%)\n",
                  results$model[1], results$accuracy[1] * 100))

      write.csv(results, file.path(Sys.getenv("MODEL_DIR"), "comparison.csv"),
                row.names = FALSE)
    depends: [fit-lda, fit-tree, fit-knn]

How the DAG executes

prepare -> fit-lda  -> compare
        -> fit-tree ->
        -> fit-knn  ->

After prepare finishes, all three model-fitting steps run simultaneously. The compare step waits for all three to complete, then reads their outputs.

Reading upstream outputs

Each fit step emits the same key (accuracy), but daggle namespaces outputs by step ID. The compare step reads them as:

lda_acc  <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_LDA_ACCURACY"))
tree_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_TREE_ACCURACY"))
knn_acc  <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_KNN_ACCURACY"))

The naming convention is DAGGLE_OUTPUT_<STEP_ID>_<KEY>, with the step ID uppercased and hyphens replaced by underscores.

Alternative: matrix runs

If your models share the same fitting logic and differ only by a parameter, you can use a matrix step instead of writing three separate steps:

steps:
  - id: prepare
    r_expr: |
      # ... same as above ...

  - id: fit-model
    script: models/fit.R
    matrix:
      algo: [lda, tree, knn]
    depends: [prepare]

  - id: compare
    script: models/compare.R
    depends: [fit-model]

The fit.R script reads DAGGLE_MATRIX_ALGO to decide which model to fit:

algo <- Sys.getenv("DAGGLE_MATRIX_ALGO")

train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))

features <- c("bill_length_mm", "bill_depth_mm",
               "flipper_length_mm", "body_mass_g")

accuracy <- switch(algo,
  lda = {
    model <- MASS::lda(species ~ ., data = train[, c("species", features)])
    mean(predict(model, test)$class == test$species)
  },
  tree = {
    model <- rpart::rpart(species ~ ., data = train[, c("species", features)])
    mean(predict(model, test, type = "class") == test$species)
  },
  knn = {
    means <- colMeans(train[, features])
    sds <- apply(train[, features], 2, sd)
    train_s <- scale(train[, features], center = means, scale = sds)
    test_s <- scale(test[, features], center = means, scale = sds)
    mean(class::knn(train_s, test_s, train$species, k = 5) == test$species)
  }
)

cat(sprintf("%s accuracy: %.1f%%\n", algo, accuracy * 100))
cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")

The matrix approach is cleaner when models share structure. Use explicit steps when each model has substantially different logic.

Running

daggle run penguin-models
daggle status penguin-models