Tutorial: Model Training Pipeline
This tutorial fits three classification models in parallel, collects their accuracies, and produces a comparison table. It demonstrates parallel step execution and reading upstream outputs via environment variables.
The pipeline
name: penguin-models
env:
DATA_DIR: data
MODEL_DIR: models
steps:
- id: prepare
r_expr: |
dir.create(Sys.getenv("DATA_DIR"), showWarnings = FALSE)
dir.create(Sys.getenv("MODEL_DIR"), showWarnings = FALSE)
if (!requireNamespace("palmerpenguins", quietly = TRUE)) {
install.packages("palmerpenguins", repos = "https://cloud.r-project.org")
}
library(palmerpenguins)
penguins <- penguins[complete.cases(penguins), ]
penguins$species_int <- as.integer(penguins$species)
set.seed(42)
n <- nrow(penguins)
train_idx <- sample(n, size = floor(0.7 * n))
train <- penguins[train_idx, ]
test <- penguins[-train_idx, ]
saveRDS(train, file.path(Sys.getenv("DATA_DIR"), "train.rds"))
saveRDS(test, file.path(Sys.getenv("DATA_DIR"), "test.rds"))
cat(sprintf("Train: %d rows, Test: %d rows\n", nrow(train), nrow(test)))
cat("::daggle-output name=n_train::", nrow(train), "\n")
cat("::daggle-output name=n_test::", nrow(test), "\n")
- id: fit-lda
r_expr: |
train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))
model <- MASS::lda(species ~ bill_length_mm + bill_depth_mm +
flipper_length_mm + body_mass_g, data = train)
preds <- predict(model, test)$class
accuracy <- mean(preds == test$species)
saveRDS(model, file.path(Sys.getenv("MODEL_DIR"), "lda_model.rds"))
cat(sprintf("LDA accuracy: %.1f%%\n", accuracy * 100))
cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")
depends: [prepare]
- id: fit-tree
r_expr: |
train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))
model <- rpart::rpart(species ~ bill_length_mm + bill_depth_mm +
flipper_length_mm + body_mass_g, data = train)
preds <- predict(model, test, type = "class")
accuracy <- mean(preds == test$species)
saveRDS(model, file.path(Sys.getenv("MODEL_DIR"), "tree_model.rds"))
cat(sprintf("Decision tree accuracy: %.1f%%\n", accuracy * 100))
cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")
depends: [prepare]
- id: fit-knn
r_expr: |
train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))
features <- c("bill_length_mm", "bill_depth_mm",
"flipper_length_mm", "body_mass_g")
means <- colMeans(train[, features])
sds <- apply(train[, features], 2, sd)
train_scaled <- scale(train[, features], center = means, scale = sds)
test_scaled <- scale(test[, features], center = means, scale = sds)
preds <- class::knn(train_scaled, test_scaled, train$species, k = 5)
accuracy <- mean(preds == test$species)
cat(sprintf("KNN (k=5) accuracy: %.1f%%\n", accuracy * 100))
cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")
depends: [prepare]
- id: compare
r_expr: |
lda_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_LDA_ACCURACY"))
tree_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_TREE_ACCURACY"))
knn_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_KNN_ACCURACY"))
results <- data.frame(
model = c("LDA", "Decision Tree", "KNN (k=5)"),
accuracy = c(lda_acc, tree_acc, knn_acc)
)
results <- results[order(-results$accuracy), ]
results$accuracy_pct <- sprintf("%.1f%%", results$accuracy * 100)
cat("\n=== Model Comparison ===\n\n")
print(results[, c("model", "accuracy_pct")], row.names = FALSE)
cat(sprintf("\nBest model: %s (%.1f%%)\n",
results$model[1], results$accuracy[1] * 100))
write.csv(results, file.path(Sys.getenv("MODEL_DIR"), "comparison.csv"),
row.names = FALSE)
depends: [fit-lda, fit-tree, fit-knn]How the DAG executes
prepare -> fit-lda -> compare
-> fit-tree ->
-> fit-knn ->
After prepare finishes, all three model-fitting steps run simultaneously. The compare step waits for all three to complete, then reads their outputs.
Reading upstream outputs
Each fit step emits the same key (accuracy), but daggle namespaces outputs by step ID. The compare step reads them as:
lda_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_LDA_ACCURACY"))
tree_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_TREE_ACCURACY"))
knn_acc <- as.numeric(Sys.getenv("DAGGLE_OUTPUT_FIT_KNN_ACCURACY"))The naming convention is DAGGLE_OUTPUT_<STEP_ID>_<KEY>, with the step ID uppercased and hyphens replaced by underscores.
Alternative: matrix runs
If your models share the same fitting logic and differ only by a parameter, you can use a matrix step instead of writing three separate steps:
steps:
- id: prepare
r_expr: |
# ... same as above ...
- id: fit-model
script: models/fit.R
matrix:
algo: [lda, tree, knn]
depends: [prepare]
- id: compare
script: models/compare.R
depends: [fit-model]The fit.R script reads DAGGLE_MATRIX_ALGO to decide which model to fit:
algo <- Sys.getenv("DAGGLE_MATRIX_ALGO")
train <- readRDS(file.path(Sys.getenv("DATA_DIR"), "train.rds"))
test <- readRDS(file.path(Sys.getenv("DATA_DIR"), "test.rds"))
features <- c("bill_length_mm", "bill_depth_mm",
"flipper_length_mm", "body_mass_g")
accuracy <- switch(algo,
lda = {
model <- MASS::lda(species ~ ., data = train[, c("species", features)])
mean(predict(model, test)$class == test$species)
},
tree = {
model <- rpart::rpart(species ~ ., data = train[, c("species", features)])
mean(predict(model, test, type = "class") == test$species)
},
knn = {
means <- colMeans(train[, features])
sds <- apply(train[, features], 2, sd)
train_s <- scale(train[, features], center = means, scale = sds)
test_s <- scale(test[, features], center = means, scale = sds)
mean(class::knn(train_s, test_s, train$species, k = 5) == test$species)
}
)
cat(sprintf("%s accuracy: %.1f%%\n", algo, accuracy * 100))
cat("::daggle-output name=accuracy::", round(accuracy, 4), "\n")The matrix approach is cleaner when models share structure. Use explicit steps when each model has substantially different logic.
Running
daggle run penguin-models
daggle status penguin-models