- Add AGENT-ONBOARDING.md for AI agents - Add ARCHITECTURE.md with full system overview - Add TECH-STACK.md with complete technology inventory - Add DOMAIN-MODEL.md with entities and bounded contexts - Add CODING-CONVENTIONS.md with patterns and practices - Add GLOSSARY.md with terminology reference - Add C4 diagrams (Context and Container levels) - Add 10 ADRs documenting key decisions: - Talos Linux, NATS, MessagePack, Multi-GPU strategy - GitOps with Flux, KServe, Milvus, Dual workflow engines - Envoy Gateway - Add specs directory with JetStream configuration - Add diagrams for GPU allocation and data flows Based on analysis of homelab-k8s2 and llm-workflows repositories and kubectl cluster-info dump data.
288 lines
6.4 KiB
Markdown
288 lines
6.4 KiB
Markdown
# Binary Messages and JetStream Configuration
|
|
|
|
> Technical specification for NATS message handling in the AI platform
|
|
|
|
## Overview
|
|
|
|
The AI platform uses NATS with JetStream for message persistence. All messages use MessagePack (msgpack) binary format for efficiency, especially when handling audio data.
|
|
|
|
## Message Format
|
|
|
|
### Why MessagePack?
|
|
|
|
1. **Binary efficiency**: Audio data embedded directly without base64 overhead
|
|
2. **Compact**: 20-50% smaller than equivalent JSON
|
|
3. **Fast**: Lower serialization/deserialization overhead
|
|
4. **Compatible**: JSON-like structure, easy debugging
|
|
|
|
### Schema
|
|
|
|
All messages follow this general structure:
|
|
|
|
```python
|
|
{
|
|
"request_id": str, # UUID for correlation
|
|
"user_id": str, # User identifier
|
|
"timestamp": float, # Unix timestamp
|
|
"payload": Any, # Type-specific data
|
|
"metadata": dict # Optional metadata
|
|
}
|
|
```
|
|
|
|
### Chat Message
|
|
|
|
```python
|
|
{
|
|
"request_id": "uuid-here",
|
|
"user_id": "user-123",
|
|
"username": "john_doe",
|
|
"message": "Hello, how are you?",
|
|
"premium": False,
|
|
"enable_streaming": True,
|
|
"enable_rag": True,
|
|
"enable_reranker": True,
|
|
"top_k": 5,
|
|
"session_id": "session-abc"
|
|
}
|
|
```
|
|
|
|
### Voice Message
|
|
|
|
```python
|
|
{
|
|
"request_id": "uuid-here",
|
|
"user_id": "user-123",
|
|
"audio": b"...", # Raw bytes, not base64!
|
|
"format": "wav",
|
|
"sample_rate": 16000,
|
|
"premium": False,
|
|
"enable_rag": True,
|
|
"language": "en"
|
|
}
|
|
```
|
|
|
|
### Streaming Response Chunk
|
|
|
|
```python
|
|
{
|
|
"request_id": "uuid-here",
|
|
"type": "chunk", # "chunk", "done", "error"
|
|
"content": "token",
|
|
"done": False,
|
|
"timestamp": 1706000000.0
|
|
}
|
|
```
|
|
|
|
## JetStream Configuration
|
|
|
|
### Streams
|
|
|
|
| Stream | Subjects | Retention | Max Age | Storage | Replicas |
|
|
|--------|----------|-----------|---------|---------|----------|
|
|
| `COMPANIONS_LOGINS` | `ai.chat.user.*.login` | Limits | 7 days | File | 1 |
|
|
| `COMPANIONS_CHAT` | `ai.chat.user.*.message`, `ai.chat.user.*.greeting.*` | Limits | 30 days | File | 1 |
|
|
| `AI_CHAT_STREAM` | `ai.chat.response.stream.>` | Limits | 5 min | Memory | 1 |
|
|
| `AI_VOICE_STREAM` | `ai.voice.>` | Limits | 1 hour | File | 1 |
|
|
| `AI_VOICE_RESPONSE_STREAM` | `ai.voice.response.stream.>` | Limits | 5 min | Memory | 1 |
|
|
| `AI_PIPELINE` | `ai.pipeline.>` | Limits | 24 hours | File | 1 |
|
|
|
|
### Consumer Configuration
|
|
|
|
```yaml
|
|
# Durable consumer for chat handler
|
|
consumer:
|
|
name: chat-handler
|
|
durable_name: chat-handler
|
|
filter_subjects:
|
|
- "ai.chat.user.*.message"
|
|
ack_policy: explicit
|
|
ack_wait: 30s
|
|
max_deliver: 3
|
|
deliver_policy: new
|
|
```
|
|
|
|
### Stream Creation (CLI)
|
|
|
|
```bash
|
|
# Create chat stream
|
|
nats stream add COMPANIONS_CHAT \
|
|
--subjects "ai.chat.user.*.message,ai.chat.user.*.greeting.*" \
|
|
--retention limits \
|
|
--max-age 30d \
|
|
--storage file \
|
|
--replicas 1
|
|
|
|
# Create ephemeral stream
|
|
nats stream add AI_CHAT_STREAM \
|
|
--subjects "ai.chat.response.stream.>" \
|
|
--retention limits \
|
|
--max-age 5m \
|
|
--storage memory \
|
|
--replicas 1
|
|
```
|
|
|
|
## Python Implementation
|
|
|
|
### Publisher
|
|
|
|
```python
|
|
import nats
|
|
import msgpack
|
|
from datetime import datetime
|
|
|
|
async def publish_chat_message(nc: nats.NATS, user_id: str, message: str):
|
|
data = {
|
|
"request_id": str(uuid.uuid4()),
|
|
"user_id": user_id,
|
|
"message": message,
|
|
"timestamp": datetime.utcnow().timestamp(),
|
|
"enable_streaming": True,
|
|
"enable_rag": True,
|
|
}
|
|
|
|
subject = f"ai.chat.user.{user_id}.message"
|
|
await nc.publish(subject, msgpack.packb(data))
|
|
```
|
|
|
|
### Subscriber (JetStream)
|
|
|
|
```python
|
|
async def message_handler(msg):
|
|
try:
|
|
data = msgpack.unpackb(msg.data, raw=False)
|
|
|
|
# Process message
|
|
result = await process_chat(data)
|
|
|
|
# Publish response
|
|
response_subject = f"ai.chat.response.{data['request_id']}"
|
|
await nc.publish(response_subject, msgpack.packb(result))
|
|
|
|
# Acknowledge
|
|
await msg.ack()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Handler error: {e}")
|
|
await msg.nak(delay=5) # Retry after 5s
|
|
|
|
# Subscribe with JetStream
|
|
js = nc.jetstream()
|
|
sub = await js.subscribe(
|
|
"ai.chat.user.*.message",
|
|
cb=message_handler,
|
|
durable="chat-handler",
|
|
manual_ack=True
|
|
)
|
|
```
|
|
|
|
### Streaming Response
|
|
|
|
```python
|
|
async def stream_response(nc, request_id: str, response_generator):
|
|
subject = f"ai.chat.response.stream.{request_id}"
|
|
|
|
async for token in response_generator:
|
|
chunk = {
|
|
"request_id": request_id,
|
|
"type": "chunk",
|
|
"content": token,
|
|
"done": False
|
|
}
|
|
await nc.publish(subject, msgpack.packb(chunk))
|
|
|
|
# Send done marker
|
|
done = {
|
|
"request_id": request_id,
|
|
"type": "done",
|
|
"content": "",
|
|
"done": True
|
|
}
|
|
await nc.publish(subject, msgpack.packb(done))
|
|
```
|
|
|
|
## Go Implementation
|
|
|
|
### Publisher
|
|
|
|
```go
|
|
import (
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/vmihailenco/msgpack/v5"
|
|
)
|
|
|
|
type ChatMessage struct {
|
|
RequestID string `msgpack:"request_id"`
|
|
UserID string `msgpack:"user_id"`
|
|
Message string `msgpack:"message"`
|
|
}
|
|
|
|
func PublishChat(nc *nats.Conn, userID, message string) error {
|
|
msg := ChatMessage{
|
|
RequestID: uuid.New().String(),
|
|
UserID: userID,
|
|
Message: message,
|
|
}
|
|
|
|
data, err := msgpack.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
subject := fmt.Sprintf("ai.chat.user.%s.message", userID)
|
|
return nc.Publish(subject, data)
|
|
}
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### NAK with Delay
|
|
|
|
```python
|
|
# Temporary failure - retry later
|
|
await msg.nak(delay=5) # 5 second delay
|
|
|
|
# Permanent failure - move to dead letter
|
|
if attempt >= max_retries:
|
|
await nc.publish("ai.dlq.chat", msg.data)
|
|
await msg.term() # Terminate delivery
|
|
```
|
|
|
|
### Dead Letter Queue
|
|
|
|
```yaml
|
|
stream:
|
|
name: AI_DLQ
|
|
subjects:
|
|
- "ai.dlq.>"
|
|
retention: limits
|
|
max_age: 7d
|
|
storage: file
|
|
```
|
|
|
|
## Monitoring
|
|
|
|
### Key Metrics
|
|
|
|
```bash
|
|
# Stream info
|
|
nats stream info COMPANIONS_CHAT
|
|
|
|
# Consumer info
|
|
nats consumer info COMPANIONS_CHAT chat-handler
|
|
|
|
# Message rate
|
|
nats stream report
|
|
```
|
|
|
|
### Prometheus Metrics
|
|
|
|
- `nats_stream_messages_total`
|
|
- `nats_consumer_pending_messages`
|
|
- `nats_consumer_ack_pending`
|
|
|
|
## Related
|
|
|
|
- [ADR-0003: Use NATS for Messaging](../decisions/0003-use-nats-for-messaging.md)
|
|
- [ADR-0004: Use MessagePack](../decisions/0004-use-messagepack-for-nats.md)
|
|
- [DOMAIN-MODEL.md](../DOMAIN-MODEL.md)
|