# Binary Messages and JetStream Configuration > Technical specification for NATS message handling in the AI platform ## Overview The AI platform uses NATS with JetStream for message persistence. All messages use MessagePack (msgpack) binary format for efficiency, especially when handling audio data. ## Message Format ### Why MessagePack? 1. **Binary efficiency**: Audio data embedded directly without base64 overhead 2. **Compact**: 20-50% smaller than equivalent JSON 3. **Fast**: Lower serialization/deserialization overhead 4. **Compatible**: JSON-like structure, easy debugging ### Schema All messages follow this general structure: ```python { "request_id": str, # UUID for correlation "user_id": str, # User identifier "timestamp": float, # Unix timestamp "payload": Any, # Type-specific data "metadata": dict # Optional metadata } ``` ### Chat Message ```python { "request_id": "uuid-here", "user_id": "user-123", "username": "john_doe", "message": "Hello, how are you?", "premium": False, "enable_streaming": True, "enable_rag": True, "enable_reranker": True, "top_k": 5, "session_id": "session-abc" } ``` ### Voice Message ```python { "request_id": "uuid-here", "user_id": "user-123", "audio": b"...", # Raw bytes, not base64! "format": "wav", "sample_rate": 16000, "premium": False, "enable_rag": True, "language": "en" } ``` ### Streaming Response Chunk ```python { "request_id": "uuid-here", "type": "chunk", # "chunk", "done", "error" "content": "token", "done": False, "timestamp": 1706000000.0 } ``` ## JetStream Configuration ### Streams | Stream | Subjects | Retention | Max Age | Storage | Replicas | |--------|----------|-----------|---------|---------|----------| | `COMPANIONS_LOGINS` | `ai.chat.user.*.login` | Limits | 7 days | File | 1 | | `COMPANIONS_CHAT` | `ai.chat.user.*.message`, `ai.chat.user.*.greeting.*` | Limits | 30 days | File | 1 | | `AI_CHAT_STREAM` | `ai.chat.response.stream.>` | Limits | 5 min | Memory | 1 | | `AI_VOICE_STREAM` | `ai.voice.>` | Limits | 1 hour | File | 1 | | `AI_VOICE_RESPONSE_STREAM` | `ai.voice.response.stream.>` | Limits | 5 min | Memory | 1 | | `AI_PIPELINE` | `ai.pipeline.>` | Limits | 24 hours | File | 1 | ### Consumer Configuration ```yaml # Durable consumer for chat handler consumer: name: chat-handler durable_name: chat-handler filter_subjects: - "ai.chat.user.*.message" ack_policy: explicit ack_wait: 30s max_deliver: 3 deliver_policy: new ``` ### Stream Creation (CLI) ```bash # Create chat stream nats stream add COMPANIONS_CHAT \ --subjects "ai.chat.user.*.message,ai.chat.user.*.greeting.*" \ --retention limits \ --max-age 30d \ --storage file \ --replicas 1 # Create ephemeral stream nats stream add AI_CHAT_STREAM \ --subjects "ai.chat.response.stream.>" \ --retention limits \ --max-age 5m \ --storage memory \ --replicas 1 ``` ## Python Implementation ### Publisher ```python import nats import msgpack from datetime import datetime async def publish_chat_message(nc: nats.NATS, user_id: str, message: str): data = { "request_id": str(uuid.uuid4()), "user_id": user_id, "message": message, "timestamp": datetime.utcnow().timestamp(), "enable_streaming": True, "enable_rag": True, } subject = f"ai.chat.user.{user_id}.message" await nc.publish(subject, msgpack.packb(data)) ``` ### Subscriber (JetStream) ```python async def message_handler(msg): try: data = msgpack.unpackb(msg.data, raw=False) # Process message result = await process_chat(data) # Publish response response_subject = f"ai.chat.response.{data['request_id']}" await nc.publish(response_subject, msgpack.packb(result)) # Acknowledge await msg.ack() except Exception as e: logger.error(f"Handler error: {e}") await msg.nak(delay=5) # Retry after 5s # Subscribe with JetStream js = nc.jetstream() sub = await js.subscribe( "ai.chat.user.*.message", cb=message_handler, durable="chat-handler", manual_ack=True ) ``` ### Streaming Response ```python async def stream_response(nc, request_id: str, response_generator): subject = f"ai.chat.response.stream.{request_id}" async for token in response_generator: chunk = { "request_id": request_id, "type": "chunk", "content": token, "done": False } await nc.publish(subject, msgpack.packb(chunk)) # Send done marker done = { "request_id": request_id, "type": "done", "content": "", "done": True } await nc.publish(subject, msgpack.packb(done)) ``` ## Go Implementation ### Publisher ```go import ( "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" ) type ChatMessage struct { RequestID string `msgpack:"request_id"` UserID string `msgpack:"user_id"` Message string `msgpack:"message"` } func PublishChat(nc *nats.Conn, userID, message string) error { msg := ChatMessage{ RequestID: uuid.New().String(), UserID: userID, Message: message, } data, err := msgpack.Marshal(msg) if err != nil { return err } subject := fmt.Sprintf("ai.chat.user.%s.message", userID) return nc.Publish(subject, data) } ``` ## Error Handling ### NAK with Delay ```python # Temporary failure - retry later await msg.nak(delay=5) # 5 second delay # Permanent failure - move to dead letter if attempt >= max_retries: await nc.publish("ai.dlq.chat", msg.data) await msg.term() # Terminate delivery ``` ### Dead Letter Queue ```yaml stream: name: AI_DLQ subjects: - "ai.dlq.>" retention: limits max_age: 7d storage: file ``` ## Monitoring ### Key Metrics ```bash # Stream info nats stream info COMPANIONS_CHAT # Consumer info nats consumer info COMPANIONS_CHAT chat-handler # Message rate nats stream report ``` ### Prometheus Metrics - `nats_stream_messages_total` - `nats_consumer_pending_messages` - `nats_consumer_ack_pending` ## Related - [ADR-0003: Use NATS for Messaging](../decisions/0003-use-nats-for-messaging.md) - [ADR-0004: Use MessagePack](../decisions/0004-use-messagepack-for-nats.md) - [DOMAIN-MODEL.md](../DOMAIN-MODEL.md)