Nenjo Docs
Pipelines

Executions

Pipeline execution context, step results, data flow, and real-time monitoring via events.

When a pipeline runs, the executor creates a PipelineContext that flows through every step. Each step produces a StepResult, and events are broadcast in real time for monitoring.

PipelineContext

The execution context is created at the start of a pipeline run and passed to every step. Steps can read from it (e.g., to access ticket metadata or gate feedback) and the executor writes to it after each step completes.

Core fields

FieldTypeDescription
pipeline_idUUIDThe pipeline being executed
project_idUUIDThe project this execution belongs to
initial_inputstringThe input text that started the pipeline
max_retriesintegerMaximum retry attempts (from pipeline config)
step_resultsmapResults keyed by step ID, accumulated during execution
execution_run_idUUID?The execution run record ID (null for chat pipelines)
ticket_idUUID?The ticket being processed (null for chat/cron pipelines)
is_cron_triggerbooleanWhen true, agent steps use the role's cron_task template

Ticket metadata

These fields are populated from the ticket being executed and are available in prompt templates via {{ ticket.* }}.

FieldTemplate variableDescription
ticket_title{{ ticket.title }}Ticket title
ticket_description{{ ticket.description }}Ticket description
ticket_acceptance_criteria{{ ticket.acceptance_criteria }}Acceptance criteria
ticket_tags{{ ticket.tags }}List of tags
ticket_source{{ ticket.source }}Who created/triggered: "user" or a role name
ticket_status{{ ticket.status }}Status: open, ready, in_progress, done
ticket_priority{{ ticket.priority }}Priority: low, medium, high, critical
ticket_type{{ ticket.type }}Type: bug, feature, task
ticket_slug{{ ticket.slug }}URL-safe identifier
ticket_complexity{{ ticket.complexity }}Estimated complexity score

Git context

FieldTemplate variableDescription
git_branch{{ git.branch }}Current working branch (e.g., agent/<run>/<slug>)
git_target_branch{{ git.target_branch }}Target branch for merges/PRs (defaults to main)
git_work_dir{{ git.work_dir }}Absolute path to the worktree directory
git_repo_url{{ git.repo_url }}Remote clone URL

Pipeline and step context

FieldTemplate variableDescription
pipeline_name{{ pipeline.name }}Name of the pipeline
current_step_name{{ step.name }}Name of the currently executing step
step_metadata{{ step.metadata }}JSON from step.config.metadata
gate_feedback{{ gate_feedback }}Output from the most recent failed gate/cron/lambda step

Project context

FieldTemplate variableDescription
project_name{{ project.name }}Project name
project_description{{ project.description }}Project description

StepResult

Every step produces a StepResult that is stored in the context and used for edge condition evaluation.

FieldTypeDescription
passedbooleanWhether the step succeeded
outputstringThe step's output text
dataJSONStructured data (optional)
step_idUUIDID of the step that produced this result
step_namestringName of the step
input_tokensu64Total input tokens consumed (LLM steps)
output_tokensu64Total output tokens consumed (LLM steps)

Data flow between steps

Data flows through the pipeline context:

  1. Initial input -- The initial_input string (ticket description or user message) is available to all steps.
  2. Step results -- Each step's StepResult is stored in step_results keyed by step ID. The most recent result is used as input for gate steps.
  3. Gate feedback -- When a gate, cron, or lambda step fails, its output is stored as gate_feedback. The next agent step (reached via an on_fail edge) can use this to understand what went wrong.
  4. Source tracking -- When a step fails, ticket_source is updated to the failing step's role name, so downstream steps know where feedback originated.
  5. Metrics -- Token usage and execution counts are accumulated per step in PipelineMetrics.

Pipeline events

Pipeline events are broadcast via tokio::sync::broadcast for real-time monitoring. The event channel has a capacity of 512 buffered events.

PipelineEvent structure

{
  "ticket_id": "uuid",
  "execution_run_id": "uuid",
  "event_type": "phase_started",
  "phase": "implementation",
  "data": {},
  "timestamp": "2024-01-01T00:00:00Z"
}

Event types

Event typeDescription
phase_startedA phase has started executing
phase_completedA phase completed successfully
phase_failedA phase failed (may retry)
status_changedThe ticket's status changed
retry_scheduledA retry has been scheduled after a failure
pipeline_completedThe entire pipeline completed successfully
pipeline_failedThe pipeline failed permanently (max retries exceeded)

Pipeline phases

PhaseDescription
implementationCode generation, branch creation, commit
reviewLLM code review of the diff
testingTest execution and acceptance verification

WebSocket step events

In addition to pipeline events, the executor emits richer step-level events via WebSocket for frontend execution tracking.

Step started

Emitted when a step begins execution. Includes role info (ID, color, name) for frontend identicon rendering.

Step completed

Emitted when a step finishes. Includes:

  • Event type: step_completed (pass), step_warning (fail with on_fail edge available), or step_failed (fail with no recovery path)
  • Metrics: execution_count, input_tokens, output_tokens, total_tokens
  • Duration: Execution time in milliseconds

The distinction between step_warning and step_failed is important: a gate failure with an on_fail edge is expected (the pipeline has a recovery path), while a failure with no matching edge is terminal.

Monitoring

Subscribe to pipeline events by connecting to the WebSocket endpoint. Events are fire-and-forget -- if no receivers are connected, events are silently dropped.

// Subscribe to execution events
const ws = new WebSocket('wss://your-instance.com/ws');
ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.event_type === 'step_started') {
    console.log(`Step started: ${data.step_name}`);
  }
};

On this page