- Add AGENT-ONBOARDING.md for AI agents - Add ARCHITECTURE.md with full system overview - Add TECH-STACK.md with complete technology inventory - Add DOMAIN-MODEL.md with entities and bounded contexts - Add CODING-CONVENTIONS.md with patterns and practices - Add GLOSSARY.md with terminology reference - Add C4 diagrams (Context and Container levels) - Add 10 ADRs documenting key decisions: - Talos Linux, NATS, MessagePack, Multi-GPU strategy - GitOps with Flux, KServe, Milvus, Dual workflow engines - Envoy Gateway - Add specs directory with JetStream configuration - Add diagrams for GPU allocation and data flows Based on analysis of homelab-k8s2 and llm-workflows repositories and kubectl cluster-info dump data.
6.4 KiB
6.4 KiB
Binary Messages and JetStream Configuration
Technical specification for NATS message handling in the AI platform
Overview
The AI platform uses NATS with JetStream for message persistence. All messages use MessagePack (msgpack) binary format for efficiency, especially when handling audio data.
Message Format
Why MessagePack?
- Binary efficiency: Audio data embedded directly without base64 overhead
- Compact: 20-50% smaller than equivalent JSON
- Fast: Lower serialization/deserialization overhead
- Compatible: JSON-like structure, easy debugging
Schema
All messages follow this general structure:
{
"request_id": str, # UUID for correlation
"user_id": str, # User identifier
"timestamp": float, # Unix timestamp
"payload": Any, # Type-specific data
"metadata": dict # Optional metadata
}
Chat Message
{
"request_id": "uuid-here",
"user_id": "user-123",
"username": "john_doe",
"message": "Hello, how are you?",
"premium": False,
"enable_streaming": True,
"enable_rag": True,
"enable_reranker": True,
"top_k": 5,
"session_id": "session-abc"
}
Voice Message
{
"request_id": "uuid-here",
"user_id": "user-123",
"audio": b"...", # Raw bytes, not base64!
"format": "wav",
"sample_rate": 16000,
"premium": False,
"enable_rag": True,
"language": "en"
}
Streaming Response Chunk
{
"request_id": "uuid-here",
"type": "chunk", # "chunk", "done", "error"
"content": "token",
"done": False,
"timestamp": 1706000000.0
}
JetStream Configuration
Streams
| Stream | Subjects | Retention | Max Age | Storage | Replicas |
|---|---|---|---|---|---|
COMPANIONS_LOGINS |
ai.chat.user.*.login |
Limits | 7 days | File | 1 |
COMPANIONS_CHAT |
ai.chat.user.*.message, ai.chat.user.*.greeting.* |
Limits | 30 days | File | 1 |
AI_CHAT_STREAM |
ai.chat.response.stream.> |
Limits | 5 min | Memory | 1 |
AI_VOICE_STREAM |
ai.voice.> |
Limits | 1 hour | File | 1 |
AI_VOICE_RESPONSE_STREAM |
ai.voice.response.stream.> |
Limits | 5 min | Memory | 1 |
AI_PIPELINE |
ai.pipeline.> |
Limits | 24 hours | File | 1 |
Consumer Configuration
# Durable consumer for chat handler
consumer:
name: chat-handler
durable_name: chat-handler
filter_subjects:
- "ai.chat.user.*.message"
ack_policy: explicit
ack_wait: 30s
max_deliver: 3
deliver_policy: new
Stream Creation (CLI)
# Create chat stream
nats stream add COMPANIONS_CHAT \
--subjects "ai.chat.user.*.message,ai.chat.user.*.greeting.*" \
--retention limits \
--max-age 30d \
--storage file \
--replicas 1
# Create ephemeral stream
nats stream add AI_CHAT_STREAM \
--subjects "ai.chat.response.stream.>" \
--retention limits \
--max-age 5m \
--storage memory \
--replicas 1
Python Implementation
Publisher
import nats
import msgpack
from datetime import datetime
async def publish_chat_message(nc: nats.NATS, user_id: str, message: str):
data = {
"request_id": str(uuid.uuid4()),
"user_id": user_id,
"message": message,
"timestamp": datetime.utcnow().timestamp(),
"enable_streaming": True,
"enable_rag": True,
}
subject = f"ai.chat.user.{user_id}.message"
await nc.publish(subject, msgpack.packb(data))
Subscriber (JetStream)
async def message_handler(msg):
try:
data = msgpack.unpackb(msg.data, raw=False)
# Process message
result = await process_chat(data)
# Publish response
response_subject = f"ai.chat.response.{data['request_id']}"
await nc.publish(response_subject, msgpack.packb(result))
# Acknowledge
await msg.ack()
except Exception as e:
logger.error(f"Handler error: {e}")
await msg.nak(delay=5) # Retry after 5s
# Subscribe with JetStream
js = nc.jetstream()
sub = await js.subscribe(
"ai.chat.user.*.message",
cb=message_handler,
durable="chat-handler",
manual_ack=True
)
Streaming Response
async def stream_response(nc, request_id: str, response_generator):
subject = f"ai.chat.response.stream.{request_id}"
async for token in response_generator:
chunk = {
"request_id": request_id,
"type": "chunk",
"content": token,
"done": False
}
await nc.publish(subject, msgpack.packb(chunk))
# Send done marker
done = {
"request_id": request_id,
"type": "done",
"content": "",
"done": True
}
await nc.publish(subject, msgpack.packb(done))
Go Implementation
Publisher
import (
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
)
type ChatMessage struct {
RequestID string `msgpack:"request_id"`
UserID string `msgpack:"user_id"`
Message string `msgpack:"message"`
}
func PublishChat(nc *nats.Conn, userID, message string) error {
msg := ChatMessage{
RequestID: uuid.New().String(),
UserID: userID,
Message: message,
}
data, err := msgpack.Marshal(msg)
if err != nil {
return err
}
subject := fmt.Sprintf("ai.chat.user.%s.message", userID)
return nc.Publish(subject, data)
}
Error Handling
NAK with Delay
# Temporary failure - retry later
await msg.nak(delay=5) # 5 second delay
# Permanent failure - move to dead letter
if attempt >= max_retries:
await nc.publish("ai.dlq.chat", msg.data)
await msg.term() # Terminate delivery
Dead Letter Queue
stream:
name: AI_DLQ
subjects:
- "ai.dlq.>"
retention: limits
max_age: 7d
storage: file
Monitoring
Key Metrics
# Stream info
nats stream info COMPANIONS_CHAT
# Consumer info
nats consumer info COMPANIONS_CHAT chat-handler
# Message rate
nats stream report
Prometheus Metrics
nats_stream_messages_totalnats_consumer_pending_messagesnats_consumer_ack_pending