Skip to content

Consume Stream (SSE)

Recommended path: Use createAgentApp from @agentrail/app to mount the /stream endpoint. createStreamRoute is available as a lower-level escape hatch when you need direct control over route mounting.

createAgentApp (and the underlying createStreamRoute) returns a streaming HTTP response in newline-delimited JSON format. This guide shows how to consume that stream from a browser client or a Node.js service.

Prerequisites

Read this guide after:

How the Stream Works

When you POST to a stream endpoint:

  1. The server immediately begins executing the agent and starts streaming.
  2. The response body is a series of newline-separated JSON objects, one event per line.
  3. The response header X-Session-Id contains the session ID for subsequent requests.
  4. The stream closes after the session.end event and any final host events (context_usage).

Each line in the body is a serialized AgentrailEvent from @agentrail/app. The event's type field identifies what happened.

Request Format

ts
POST /api/stream
Content-Type: application/json

{
  "message": "Hello, how can you help me?",
  "agentId": "default",         // optional, uses server default if omitted
  "sessionId": "session-uuid",  // optional, creates new session if omitted
  "tenantId": "tenant-1",       // optional
  "userId": "user-1"            // optional
}

Minimal Browser Example

ts
async function chat(message: string, sessionId?: string) {
  const response = await fetch("/api/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message, sessionId }),
  });

  if (!response.ok) {
    throw new Error(`HTTP ${response.status}`);
  }

  // Extract sessionId from response header for future requests
  const nextSessionId = response.headers.get("X-Session-Id") ?? sessionId;

  // Read the stream line by line
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");
    buffer = lines.pop() ?? "";

    for (const line of lines) {
      if (!line.trim()) continue;
      const event = JSON.parse(line);
      handleEvent(event);
    }
  }

  return nextSessionId;
}

Key Event Types

Handle these events to build a complete streaming UI:

Event typeWhenKey fields
message.startLLM starts generating
message.updateText delta from LLMdelta: string
message.endLLM finished generating
tool.beforeAgent begins a tool calltoolName, toolCallId
tool.afterTool call completestoolCallId, result
context_compaction_startHistory compaction begins
context_compaction_endCompaction finished
context_usageToken budget after turninputTokens, outputTokens, budgetUsedPct
session.endAgent loop finishedmessages, usage
errorUnrecoverable errorerror.message

Example Event Handler

ts
import type { AgentrailEvent } from "@agentrail/app";

function handleEvent(event: AgentrailEvent) {
  switch (event.type) {
    case "message.update":
      // Append text delta to the UI
      appendText(event.delta ?? "");
      break;

    case "tool.before":
      // Show a tool-in-progress indicator
      showToolIndicator(event.toolName, "running");
      break;

    case "tool.after":
      // Update indicator to completed
      showToolIndicator(event.toolCallId, "done");
      break;

    case "context_compaction_start":
      showCompactionBadge(true);
      break;

    case "context_compaction_end":
      showCompactionBadge(false);
      break;

    case "context_usage":
      // Update token budget bar
      updateBudgetBar(event.budgetUsedPct ?? 0);
      break;

    case "session.end":
      // Final usage stats
      console.log("Total tokens:", event.usage?.totalTokens);
      break;

    case "error":
      showError(event.error.message);
      break;
  }
}

Session Continuity

The stream endpoint returns the session ID in the X-Session-Id response header. Store this and include it in subsequent requests to continue the conversation:

ts
let currentSessionId: string | undefined;

async function sendMessage(text: string) {
  const nextSessionId = await chat(text, currentSessionId);
  currentSessionId = nextSessionId;
}

On first request, omit sessionId — the server creates a new session. On all subsequent requests, pass the stored sessionId to resume the same conversation.

Node.js / Server-Side Consumption

When consuming the stream from another Node.js service (for example, a BFF layer):

ts
import { Readable } from "node:stream";

async function streamFromServer(message: string, sessionId?: string) {
  const response = await fetch("http://agent-server/api/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message, sessionId }),
  });

  const nextSessionId = response.headers.get("X-Session-Id");
  const events: AgentrailEvent[] = [];

  const readable = Readable.fromWeb(response.body as ReadableStream);
  let buffer = "";

  for await (const chunk of readable) {
    buffer += chunk.toString();
    const lines = buffer.split("\n");
    buffer = lines.pop() ?? "";
    for (const line of lines) {
      if (line.trim()) events.push(JSON.parse(line));
    }
  }

  return { events, nextSessionId };
}

Orchestration Events

If the server uses multi-agent orchestration, additional events appear in the stream:

Event typeMeaning
orchestration_run_startA multi-agent run began
subagent_spawnedA sub-agent was created
subagent_statusA sub-agent's state changed
subagent_job_startedA sub-agent started processing
subagent_job_completedA sub-agent finished a job
subagent_closedA sub-agent was closed
orchestration_run_completeThe multi-agent run finished

These events are especially useful for building orchestration dashboards or progress views.

TypeScript Types

Import the full event union from @agentrail/app:

ts
import type { AgentrailEvent, AgentrailHostEvent } from "@agentrail/app";

AgentrailEvent combines runtime events, skill events, and host events — it is the right type for a general-purpose event consumer.

Released under the Apache 2.0 License.