Gateway overview

Gateway

The gateway is the single HTTP + WebSocket process that powers Open Astra. It runs an Express server with a shared http.Server for REST, SSE streaming, and full-duplex WebSocket — all on the same port. Every agent conversation, channel webhook, admin operation, and diagnostic check flows through this process.

What the gateway does

  • Authenticates users via JWT (HS256 with access + refresh token rotation)
  • Routes requests across 17 route groups and 80+ endpoints
  • Streams agent responses over SSE (HTTP) and WebSocket (full-duplex)
  • Bootstraps all 12 channel adapters (Telegram, Discord, Slack, WhatsApp, etc.) at startup
  • Runs DB migrations, seeds agents from astra.yml, and starts background workers
  • Manages the plugin/skill hot-reload file watcher
  • Starts the cron scheduler, self-healing monitor, and presence tracker
  • Optionally starts a gRPC sidecar on a separate port for inter-agent spawn communication

Source layout

FilePurpose
gateway/index.tsExpress app, bootstrap, start/shutdown lifecycle
gateway/ws.tsWebSocket server — auth, connection map, push helpers
gateway/ws-handler.tsBidirectional WS message handling and agent streaming
gateway/commands.tsIn-chat slash commands (/reset, /status, /help, etc.)
gateway/presence.tsAgent presence state — real-time status pushed over WS
gateway/errors.tsStructured error class hierarchy (AppError subclasses)
gateway/middleware/15 middleware modules (auth, RBAC, rate limiting, CSRF, brute-force, RLS, etc.)
gateway/routes/17 route files — one per domain (chat, memory, agents, etc.)

Middleware stack

Every request passes through the global middleware in order. Agent-scoped routes add six additional layers for auth, budgets, workspace resolution, and rate limiting.

text
# Request lifecycle — every HTTP request passes through in order
1. securityHeaders()       X-Content-Type-Options, HSTS, CSP, X-Frame-Options, Referrer-Policy
2. correlationId()         Reads or generates X-Request-ID, echoes it in response
3. cors()                  Production: ALLOWED_ORIGINS allowlist; dev: allow all
4. express.json()          1 MB limit, captures rawBody for HMAC verification
5. csrfProtection()        Double-submit cookie (Bearer token holders exempt)
6. requestLogger()         Structured JSON log: method / path / status / duration / uid

# Agent-scoped routes add these after global middleware:
7. jwtAuth()               Validates Bearer JWT, attaches req.uid
8. tokenBudget()           Per-user token budget — hard cap → 429, soft cap → override model
9. workspaceResolver()     X-Workspace-Id header → validates membership, attaches req.workspace
10. rateLimiter()          Postgres-backed sliding window (30 req / 60s), in-memory fallback
11. userRateLimit()        In-memory per-uid: USER_RATE_LIMIT_RPM (default 60) + burst (10)
12. requestTimeout()       Hard 120s timeout — SSE streams closed gracefully
Postgres-backed rate limiting. The main rate limiter stores timestamps in a rate_limit_entries table so limits survive restarts and work across horizontal replicas. If Postgres is unreachable, it falls back to an in-memory sliding window transparently.

Dual transport — SSE and WebSocket

The gateway provides the same agent streaming over two transports. Use whichever fits your client.

WebSocket

Connect to ws://host/ws?token=<jwt>. The gateway authenticates via the query-string JWT, supports multi-device (one uid can have N concurrent sockets), and sends a heartbeat ping every 30 seconds — sockets that don't pong back are terminated.

typescript
// Inbound: client → server
{ type: "message", id: "req-1", agentId: "chad", message: "Hello",
  surface: "chat", surfaceId: "main" }
{ type: "ping" }

// Outbound: server → client
{ type: "connected",        timestamp }
{ type: "session_resolved", id, sessionId, isNew }
{ type: "stream_chunk",     id, content }
{ type: "tool_call",        id, index, toolCallId, name }
{ type: "tool_result",      id, name, durationMs, error }
{ type: "stream_done",      id, content, sessionId, usage, toolsUsed }
{ type: "presence",         agentId, sessionId, status, currentTool }
{ type: "error",            id, message }
{ type: "pong" }
Concurrency guard. Max 1 in-flight agent request per socket. If a second message arrives while one is streaming, the gateway returns an error frame instead of queueing. Each in-flight request has an AbortController — closing the socket aborts the request.

Back-pressure: the gateway monitors socket.bufferedAmount and pauses streaming when it exceeds 64 KB, polling every 50ms until drained.

SSE (Server-Sent Events)

POST /agent/chat/stream returns an SSE stream with typed events. Stateless — no persistent connection required.

text
event: session_resolved
data: { "sessionId": "ses_abc", "isNew": true }

event: content
data: { "content": "Here's what I found..." }

event: tool_call
data: { "index": 0, "name": "web_search", "toolCallId": "tc_1" }

event: tool_call_complete
data: { "index": 0, "name": "web_search", "arguments": "{...}" }

event: tool_result
data: { "name": "web_search", "durationMs": 320 }

event: done
data: { "content": "...", "sessionId": "ses_abc", "usage": {...}, "toolsUsed": [...] }

Presence system

The gateway tracks real-time agent status per user per session. Every state change pushes a presence frame to all of the user's WebSocket connections and emits to the event bus. Stale entries are cleaned up every 60 seconds (5-minute threshold).

text
starting → streaming → tool_calling → completed
                                              ↘ error

Route groups

The gateway organizes endpoints into 17 route groups. Each group has its own auth and middleware requirements.

GroupAuthKey endpoints
AuthPublic/auth/register, /login, /refresh, /logout
ChatJWT + budget + workspace/agent/chat, /agent/chat/stream, /agent/reset
MemoryJWT + budget + workspace/agent/memory/profile, /agent/memory/search
AgentsJWT + workspaceCRUD /agents, /agents/:id/capabilities, versioning, rollback
WorkspacesJWTCRUD /workspaces, members, stats, grants, restrictions
SessionsJWT/sessions, /sessions/:id, search, handoff
WebhooksJWTCRUD /webhooks, deliveries, test
CostsJWT + workspace/costs, /costs/breakdown
HeartbeatJWT + workspaceCRUD /heartbeat, run history
SkillsPublic (cache mgmt: JWT)/skills, /skills/:id, metrics, cache
TracesJWT + workspace/traces, /traces/:id
JobsJWT + workspaceCRUD /jobs
Memory ProfilesJWT + workspaceCRUD /memory-profiles, agent assignment
OnboardingJWT/onboarding/setup
AutonomousInternal API key/autonomous/run, /autonomous/batch
AdminInternal API key/admin/health, usage, sessions, audit
DiagnosticsInternal API key/diagnostics, circuit breakers
GDPRJWT (self-only)DELETE /memory/user/:uid — full data purge
Idempotency. POST /agent/chat accepts an X-Idempotency-Key header. Responses are cached in-process for 5 minutes — duplicate requests return the cached response without running the agent again.

Health, readiness, and metrics

Three public endpoints for monitoring. No authentication required.

json
// GET /health — no auth required
{
  "status": "ok",           // "ok" | "degraded"
  "checks": {
    "typesense": true,
    "postgres": true
  },
  "wsConnections": 42,
  "timestamp": "2026-02-27T12:00:00.000Z"
}

// GET /readiness — Kubernetes probe
{ "ready": true }           // 200 if both Postgres + Typesense reachable, 503 otherwise

// GET /metrics — operational dashboard
{
  "uptime": 86400,
  "wsConnections": 42,
  "tools": 106,
  "skills": 48,
  "sessions": { "active": 15, "total": 1240 },
  "messages": 28400,
  "billing": { "month": "2026-02", "totalCost": 12.40 }
}

Structured errors

Every domain error extends AppError and serializes to a consistent JSON shape. The errorHandler() middleware catches them automatically.

json
// All domain errors extend AppError → structured JSON response
{
  "error": "Rate limit exceeded",
  "code": "RATE_LIMIT",
  "status": 429,
  "details": {
    "retryAfter": 12,
    "limit": 30,
    "window": "60s"
  }
}

// Error subclasses:
// AuthError (401)  ForbiddenError (403)  ValidationError (400)
// NotFoundError (404)  ConflictError (409)  RateLimitError (429)
// AgentError (500)  ToolError (500)  QuotaExceededError (429)
// InferenceError (502)  ConfigError (500)

RBAC

Four capability roles form a hierarchy: owner > editor > tool_runner > viewer. Mapped from workspace_members.role values. The requireRole(minimum) middleware can be applied to any route.

Notable features

FeatureDetail
Session handoffPOST /sessions/:id/handoff transfers a session to a different agent, preserving full history
Slash commands/reset, /new, /compact, /status, /help — intercepted before the agent loop on both HTTP and WS
gRPC sidecarOptional second server on GRPC_PORT (50051) for inter-agent spawn communication. Supports mTLS
Hot reloadFile watcher on plugins and skills directories — 1000ms debounce, syntax errors roll back
Self-healingMonitors sessions for consecutive failures, restarts with exponential backoff, triggers compaction
SSRF guardDNS-pinning fetch with blocklists for private IPs, cloud metadata, IPv4-mapped IPv6, hex/octal encodings. Optional ALLOWED_OUTBOUND_HOSTS allowlist
Intrusion detectionTracks SSRF, SQLi, path traversal, auth failures, sandbox escapes in a sliding 10-minute window. Escalates to SECURITY_WEBHOOK_URL when thresholds are breached
GDPR purgeDELETE /memory/user/:uid — purges all user data across 16 tables (sessions, memory, tokens, traces, billing, audit). Self-only access
Brute-force protectionProgressive delay (1–60s) after 5 failed logins, full IP block after 20. Applied on /auth/login
WebSocket rate limiting10 connections/IP/min, 100 messages/connection/min. Violations close the socket (code 1008)
Row-Level SecurityPer-request RLS context middleware on sessions, traces, and memory-profiles routes
ETag cachingGET /agents and GET /skills return weak ETags — 304 on If-None-Match match
HMAC verificationShared middleware for all channel webhooks — SHA256/SHA1, hex/base64, timingSafeEqual

Key configuration

VariableDefaultPurpose
PORT8080HTTP listen port
ALLOWED_ORIGINShttp://localhost:3000Comma-separated CORS allowlist
JWT_SECRETHMAC-HS256 signing key (min 32 chars in production)
JWT_ACCESS_EXPIRES15mAccess token TTL
JWT_REFRESH_EXPIRES7dRefresh token TTL
JWT_SECRET_PREVPrevious signing key for rotation
INTERNAL_API_KEYAPI key for admin, autonomous, diagnostics routes
USER_RATE_LIMIT_RPM60Per-user requests per minute
USER_RATE_LIMIT_BURST10Burst allowance above RPM
GRPC_ENABLEDfalseEnable gRPC sidecar
GRPC_PORT50051gRPC listen port
GATEWAY_URLPublic URL for webhook self-registration
SECURITY_WEBHOOK_URLPOST target for intrusion detection alerts
ALLOWED_OUTBOUND_HOSTSComma-separated allowlist for SSRF guard (if unset, blocklist-only mode)

Explore in depth

TopicWhat it covers
MiddlewareFull middleware reference — auth, CSRF, rate limiting, RBAC, security headers
WebSocketConnection lifecycle, frame types, back-pressure, multi-device
RoutesComplete endpoint reference across all 17 route groups
Channel AdaptersHow channel webhooks register on the gateway, HMAC verification
Auth HardeningJWT rotation, CSRF, device binding, key management
API ReferenceFull REST API documentation