Files
homelab-design/specs/BINARY_MESSAGES_AND_JETSTREAM.md
Billy D. 832cda34bd feat: add comprehensive architecture documentation
- Add AGENT-ONBOARDING.md for AI agents
- Add ARCHITECTURE.md with full system overview
- Add TECH-STACK.md with complete technology inventory
- Add DOMAIN-MODEL.md with entities and bounded contexts
- Add CODING-CONVENTIONS.md with patterns and practices
- Add GLOSSARY.md with terminology reference
- Add C4 diagrams (Context and Container levels)
- Add 10 ADRs documenting key decisions:
  - Talos Linux, NATS, MessagePack, Multi-GPU strategy
  - GitOps with Flux, KServe, Milvus, Dual workflow engines
  - Envoy Gateway
- Add specs directory with JetStream configuration
- Add diagrams for GPU allocation and data flows

Based on analysis of homelab-k8s2 and llm-workflows repositories
and kubectl cluster-info dump data.
2026-02-01 14:30:05 -05:00

6.4 KiB

Binary Messages and JetStream Configuration

Technical specification for NATS message handling in the AI platform

Overview

The AI platform uses NATS with JetStream for message persistence. All messages use MessagePack (msgpack) binary format for efficiency, especially when handling audio data.

Message Format

Why MessagePack?

  1. Binary efficiency: Audio data embedded directly without base64 overhead
  2. Compact: 20-50% smaller than equivalent JSON
  3. Fast: Lower serialization/deserialization overhead
  4. Compatible: JSON-like structure, easy debugging

Schema

All messages follow this general structure:

{
    "request_id": str,       # UUID for correlation
    "user_id": str,          # User identifier
    "timestamp": float,      # Unix timestamp
    "payload": Any,          # Type-specific data
    "metadata": dict         # Optional metadata
}

Chat Message

{
    "request_id": "uuid-here",
    "user_id": "user-123",
    "username": "john_doe",
    "message": "Hello, how are you?",
    "premium": False,
    "enable_streaming": True,
    "enable_rag": True,
    "enable_reranker": True,
    "top_k": 5,
    "session_id": "session-abc"
}

Voice Message

{
    "request_id": "uuid-here",
    "user_id": "user-123",
    "audio": b"...",           # Raw bytes, not base64!
    "format": "wav",
    "sample_rate": 16000,
    "premium": False,
    "enable_rag": True,
    "language": "en"
}

Streaming Response Chunk

{
    "request_id": "uuid-here",
    "type": "chunk",           # "chunk", "done", "error"
    "content": "token",
    "done": False,
    "timestamp": 1706000000.0
}

JetStream Configuration

Streams

Stream Subjects Retention Max Age Storage Replicas
COMPANIONS_LOGINS ai.chat.user.*.login Limits 7 days File 1
COMPANIONS_CHAT ai.chat.user.*.message, ai.chat.user.*.greeting.* Limits 30 days File 1
AI_CHAT_STREAM ai.chat.response.stream.> Limits 5 min Memory 1
AI_VOICE_STREAM ai.voice.> Limits 1 hour File 1
AI_VOICE_RESPONSE_STREAM ai.voice.response.stream.> Limits 5 min Memory 1
AI_PIPELINE ai.pipeline.> Limits 24 hours File 1

Consumer Configuration

# Durable consumer for chat handler
consumer:
  name: chat-handler
  durable_name: chat-handler
  filter_subjects:
    - "ai.chat.user.*.message"
  ack_policy: explicit
  ack_wait: 30s
  max_deliver: 3
  deliver_policy: new

Stream Creation (CLI)

# Create chat stream
nats stream add COMPANIONS_CHAT \
  --subjects "ai.chat.user.*.message,ai.chat.user.*.greeting.*" \
  --retention limits \
  --max-age 30d \
  --storage file \
  --replicas 1

# Create ephemeral stream
nats stream add AI_CHAT_STREAM \
  --subjects "ai.chat.response.stream.>" \
  --retention limits \
  --max-age 5m \
  --storage memory \
  --replicas 1

Python Implementation

Publisher

import nats
import msgpack
from datetime import datetime

async def publish_chat_message(nc: nats.NATS, user_id: str, message: str):
    data = {
        "request_id": str(uuid.uuid4()),
        "user_id": user_id,
        "message": message,
        "timestamp": datetime.utcnow().timestamp(),
        "enable_streaming": True,
        "enable_rag": True,
    }
    
    subject = f"ai.chat.user.{user_id}.message"
    await nc.publish(subject, msgpack.packb(data))

Subscriber (JetStream)

async def message_handler(msg):
    try:
        data = msgpack.unpackb(msg.data, raw=False)
        
        # Process message
        result = await process_chat(data)
        
        # Publish response
        response_subject = f"ai.chat.response.{data['request_id']}"
        await nc.publish(response_subject, msgpack.packb(result))
        
        # Acknowledge
        await msg.ack()
        
    except Exception as e:
        logger.error(f"Handler error: {e}")
        await msg.nak(delay=5)  # Retry after 5s

# Subscribe with JetStream
js = nc.jetstream()
sub = await js.subscribe(
    "ai.chat.user.*.message",
    cb=message_handler,
    durable="chat-handler",
    manual_ack=True
)

Streaming Response

async def stream_response(nc, request_id: str, response_generator):
    subject = f"ai.chat.response.stream.{request_id}"
    
    async for token in response_generator:
        chunk = {
            "request_id": request_id,
            "type": "chunk",
            "content": token,
            "done": False
        }
        await nc.publish(subject, msgpack.packb(chunk))
    
    # Send done marker
    done = {
        "request_id": request_id,
        "type": "done",
        "content": "",
        "done": True
    }
    await nc.publish(subject, msgpack.packb(done))

Go Implementation

Publisher

import (
    "github.com/nats-io/nats.go"
    "github.com/vmihailenco/msgpack/v5"
)

type ChatMessage struct {
    RequestID string `msgpack:"request_id"`
    UserID    string `msgpack:"user_id"`
    Message   string `msgpack:"message"`
}

func PublishChat(nc *nats.Conn, userID, message string) error {
    msg := ChatMessage{
        RequestID: uuid.New().String(),
        UserID:    userID,
        Message:   message,
    }
    
    data, err := msgpack.Marshal(msg)
    if err != nil {
        return err
    }
    
    subject := fmt.Sprintf("ai.chat.user.%s.message", userID)
    return nc.Publish(subject, data)
}

Error Handling

NAK with Delay

# Temporary failure - retry later
await msg.nak(delay=5)  # 5 second delay

# Permanent failure - move to dead letter
if attempt >= max_retries:
    await nc.publish("ai.dlq.chat", msg.data)
    await msg.term()  # Terminate delivery

Dead Letter Queue

stream:
  name: AI_DLQ
  subjects:
    - "ai.dlq.>"
  retention: limits
  max_age: 7d
  storage: file

Monitoring

Key Metrics

# Stream info
nats stream info COMPANIONS_CHAT

# Consumer info
nats consumer info COMPANIONS_CHAT chat-handler

# Message rate
nats stream report

Prometheus Metrics

  • nats_stream_messages_total
  • nats_consumer_pending_messages
  • nats_consumer_ack_pending