feat: add comprehensive architecture documentation
- Add AGENT-ONBOARDING.md for AI agents - Add ARCHITECTURE.md with full system overview - Add TECH-STACK.md with complete technology inventory - Add DOMAIN-MODEL.md with entities and bounded contexts - Add CODING-CONVENTIONS.md with patterns and practices - Add GLOSSARY.md with terminology reference - Add C4 diagrams (Context and Container levels) - Add 10 ADRs documenting key decisions: - Talos Linux, NATS, MessagePack, Multi-GPU strategy - GitOps with Flux, KServe, Milvus, Dual workflow engines - Envoy Gateway - Add specs directory with JetStream configuration - Add diagrams for GPU allocation and data flows Based on analysis of homelab-k8s2 and llm-workflows repositories and kubectl cluster-info dump data.
This commit is contained in:
287
specs/BINARY_MESSAGES_AND_JETSTREAM.md
Normal file
287
specs/BINARY_MESSAGES_AND_JETSTREAM.md
Normal file
@@ -0,0 +1,287 @@
|
||||
# Binary Messages and JetStream Configuration
|
||||
|
||||
> Technical specification for NATS message handling in the AI platform
|
||||
|
||||
## Overview
|
||||
|
||||
The AI platform uses NATS with JetStream for message persistence. All messages use MessagePack (msgpack) binary format for efficiency, especially when handling audio data.
|
||||
|
||||
## Message Format
|
||||
|
||||
### Why MessagePack?
|
||||
|
||||
1. **Binary efficiency**: Audio data embedded directly without base64 overhead
|
||||
2. **Compact**: 20-50% smaller than equivalent JSON
|
||||
3. **Fast**: Lower serialization/deserialization overhead
|
||||
4. **Compatible**: JSON-like structure, easy debugging
|
||||
|
||||
### Schema
|
||||
|
||||
All messages follow this general structure:
|
||||
|
||||
```python
|
||||
{
|
||||
"request_id": str, # UUID for correlation
|
||||
"user_id": str, # User identifier
|
||||
"timestamp": float, # Unix timestamp
|
||||
"payload": Any, # Type-specific data
|
||||
"metadata": dict # Optional metadata
|
||||
}
|
||||
```
|
||||
|
||||
### Chat Message
|
||||
|
||||
```python
|
||||
{
|
||||
"request_id": "uuid-here",
|
||||
"user_id": "user-123",
|
||||
"username": "john_doe",
|
||||
"message": "Hello, how are you?",
|
||||
"premium": False,
|
||||
"enable_streaming": True,
|
||||
"enable_rag": True,
|
||||
"enable_reranker": True,
|
||||
"top_k": 5,
|
||||
"session_id": "session-abc"
|
||||
}
|
||||
```
|
||||
|
||||
### Voice Message
|
||||
|
||||
```python
|
||||
{
|
||||
"request_id": "uuid-here",
|
||||
"user_id": "user-123",
|
||||
"audio": b"...", # Raw bytes, not base64!
|
||||
"format": "wav",
|
||||
"sample_rate": 16000,
|
||||
"premium": False,
|
||||
"enable_rag": True,
|
||||
"language": "en"
|
||||
}
|
||||
```
|
||||
|
||||
### Streaming Response Chunk
|
||||
|
||||
```python
|
||||
{
|
||||
"request_id": "uuid-here",
|
||||
"type": "chunk", # "chunk", "done", "error"
|
||||
"content": "token",
|
||||
"done": False,
|
||||
"timestamp": 1706000000.0
|
||||
}
|
||||
```
|
||||
|
||||
## JetStream Configuration
|
||||
|
||||
### Streams
|
||||
|
||||
| Stream | Subjects | Retention | Max Age | Storage | Replicas |
|
||||
|--------|----------|-----------|---------|---------|----------|
|
||||
| `COMPANIONS_LOGINS` | `ai.chat.user.*.login` | Limits | 7 days | File | 1 |
|
||||
| `COMPANIONS_CHAT` | `ai.chat.user.*.message`, `ai.chat.user.*.greeting.*` | Limits | 30 days | File | 1 |
|
||||
| `AI_CHAT_STREAM` | `ai.chat.response.stream.>` | Limits | 5 min | Memory | 1 |
|
||||
| `AI_VOICE_STREAM` | `ai.voice.>` | Limits | 1 hour | File | 1 |
|
||||
| `AI_VOICE_RESPONSE_STREAM` | `ai.voice.response.stream.>` | Limits | 5 min | Memory | 1 |
|
||||
| `AI_PIPELINE` | `ai.pipeline.>` | Limits | 24 hours | File | 1 |
|
||||
|
||||
### Consumer Configuration
|
||||
|
||||
```yaml
|
||||
# Durable consumer for chat handler
|
||||
consumer:
|
||||
name: chat-handler
|
||||
durable_name: chat-handler
|
||||
filter_subjects:
|
||||
- "ai.chat.user.*.message"
|
||||
ack_policy: explicit
|
||||
ack_wait: 30s
|
||||
max_deliver: 3
|
||||
deliver_policy: new
|
||||
```
|
||||
|
||||
### Stream Creation (CLI)
|
||||
|
||||
```bash
|
||||
# Create chat stream
|
||||
nats stream add COMPANIONS_CHAT \
|
||||
--subjects "ai.chat.user.*.message,ai.chat.user.*.greeting.*" \
|
||||
--retention limits \
|
||||
--max-age 30d \
|
||||
--storage file \
|
||||
--replicas 1
|
||||
|
||||
# Create ephemeral stream
|
||||
nats stream add AI_CHAT_STREAM \
|
||||
--subjects "ai.chat.response.stream.>" \
|
||||
--retention limits \
|
||||
--max-age 5m \
|
||||
--storage memory \
|
||||
--replicas 1
|
||||
```
|
||||
|
||||
## Python Implementation
|
||||
|
||||
### Publisher
|
||||
|
||||
```python
|
||||
import nats
|
||||
import msgpack
|
||||
from datetime import datetime
|
||||
|
||||
async def publish_chat_message(nc: nats.NATS, user_id: str, message: str):
|
||||
data = {
|
||||
"request_id": str(uuid.uuid4()),
|
||||
"user_id": user_id,
|
||||
"message": message,
|
||||
"timestamp": datetime.utcnow().timestamp(),
|
||||
"enable_streaming": True,
|
||||
"enable_rag": True,
|
||||
}
|
||||
|
||||
subject = f"ai.chat.user.{user_id}.message"
|
||||
await nc.publish(subject, msgpack.packb(data))
|
||||
```
|
||||
|
||||
### Subscriber (JetStream)
|
||||
|
||||
```python
|
||||
async def message_handler(msg):
|
||||
try:
|
||||
data = msgpack.unpackb(msg.data, raw=False)
|
||||
|
||||
# Process message
|
||||
result = await process_chat(data)
|
||||
|
||||
# Publish response
|
||||
response_subject = f"ai.chat.response.{data['request_id']}"
|
||||
await nc.publish(response_subject, msgpack.packb(result))
|
||||
|
||||
# Acknowledge
|
||||
await msg.ack()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Handler error: {e}")
|
||||
await msg.nak(delay=5) # Retry after 5s
|
||||
|
||||
# Subscribe with JetStream
|
||||
js = nc.jetstream()
|
||||
sub = await js.subscribe(
|
||||
"ai.chat.user.*.message",
|
||||
cb=message_handler,
|
||||
durable="chat-handler",
|
||||
manual_ack=True
|
||||
)
|
||||
```
|
||||
|
||||
### Streaming Response
|
||||
|
||||
```python
|
||||
async def stream_response(nc, request_id: str, response_generator):
|
||||
subject = f"ai.chat.response.stream.{request_id}"
|
||||
|
||||
async for token in response_generator:
|
||||
chunk = {
|
||||
"request_id": request_id,
|
||||
"type": "chunk",
|
||||
"content": token,
|
||||
"done": False
|
||||
}
|
||||
await nc.publish(subject, msgpack.packb(chunk))
|
||||
|
||||
# Send done marker
|
||||
done = {
|
||||
"request_id": request_id,
|
||||
"type": "done",
|
||||
"content": "",
|
||||
"done": True
|
||||
}
|
||||
await nc.publish(subject, msgpack.packb(done))
|
||||
```
|
||||
|
||||
## Go Implementation
|
||||
|
||||
### Publisher
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
)
|
||||
|
||||
type ChatMessage struct {
|
||||
RequestID string `msgpack:"request_id"`
|
||||
UserID string `msgpack:"user_id"`
|
||||
Message string `msgpack:"message"`
|
||||
}
|
||||
|
||||
func PublishChat(nc *nats.Conn, userID, message string) error {
|
||||
msg := ChatMessage{
|
||||
RequestID: uuid.New().String(),
|
||||
UserID: userID,
|
||||
Message: message,
|
||||
}
|
||||
|
||||
data, err := msgpack.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("ai.chat.user.%s.message", userID)
|
||||
return nc.Publish(subject, data)
|
||||
}
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
### NAK with Delay
|
||||
|
||||
```python
|
||||
# Temporary failure - retry later
|
||||
await msg.nak(delay=5) # 5 second delay
|
||||
|
||||
# Permanent failure - move to dead letter
|
||||
if attempt >= max_retries:
|
||||
await nc.publish("ai.dlq.chat", msg.data)
|
||||
await msg.term() # Terminate delivery
|
||||
```
|
||||
|
||||
### Dead Letter Queue
|
||||
|
||||
```yaml
|
||||
stream:
|
||||
name: AI_DLQ
|
||||
subjects:
|
||||
- "ai.dlq.>"
|
||||
retention: limits
|
||||
max_age: 7d
|
||||
storage: file
|
||||
```
|
||||
|
||||
## Monitoring
|
||||
|
||||
### Key Metrics
|
||||
|
||||
```bash
|
||||
# Stream info
|
||||
nats stream info COMPANIONS_CHAT
|
||||
|
||||
# Consumer info
|
||||
nats consumer info COMPANIONS_CHAT chat-handler
|
||||
|
||||
# Message rate
|
||||
nats stream report
|
||||
```
|
||||
|
||||
### Prometheus Metrics
|
||||
|
||||
- `nats_stream_messages_total`
|
||||
- `nats_consumer_pending_messages`
|
||||
- `nats_consumer_ack_pending`
|
||||
|
||||
## Related
|
||||
|
||||
- [ADR-0003: Use NATS for Messaging](../decisions/0003-use-nats-for-messaging.md)
|
||||
- [ADR-0004: Use MessagePack](../decisions/0004-use-messagepack-for-nats.md)
|
||||
- [DOMAIN-MODEL.md](../DOMAIN-MODEL.md)
|
||||
36
specs/README.md
Normal file
36
specs/README.md
Normal file
@@ -0,0 +1,36 @@
|
||||
# Specifications
|
||||
|
||||
This directory contains feature-level specifications and technical designs.
|
||||
|
||||
## Contents
|
||||
|
||||
- [BINARY_MESSAGES_AND_JETSTREAM.md](BINARY_MESSAGES_AND_JETSTREAM.md) - MessagePack format and JetStream configuration
|
||||
- Future specs will be added here
|
||||
|
||||
## Spec Template
|
||||
|
||||
```markdown
|
||||
# Feature Name
|
||||
|
||||
## Overview
|
||||
Brief description of the feature
|
||||
|
||||
## Requirements
|
||||
- Requirement 1
|
||||
- Requirement 2
|
||||
|
||||
## Design
|
||||
Technical design details
|
||||
|
||||
## API
|
||||
Interface definitions
|
||||
|
||||
## Implementation Notes
|
||||
Key implementation considerations
|
||||
|
||||
## Testing
|
||||
Test strategy
|
||||
|
||||
## Open Questions
|
||||
Unresolved items
|
||||
```
|
||||
Reference in New Issue
Block a user