Files
pipeline-bridge/tests/conftest.py

91 lines
2.1 KiB
Python

"""
Pytest configuration and fixtures for pipeline-bridge tests.
"""
import asyncio
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# Set test environment variables before importing
os.environ.setdefault("NATS_URL", "nats://localhost:4222")
os.environ.setdefault("OTEL_ENABLED", "false")
os.environ.setdefault("MLFLOW_ENABLED", "false")
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_nats_message():
"""Create a mock NATS message."""
msg = MagicMock()
msg.subject = "ai.pipeline.trigger"
msg.reply = "ai.pipeline.status.test-123"
return msg
@pytest.fixture
def argo_pipeline_request():
"""Sample Argo pipeline trigger request."""
return {
"request_id": "test-request-123",
"pipeline": "document-ingestion",
"parameters": {
"source_path": "s3://bucket/documents",
"collection_name": "test_collection",
},
}
@pytest.fixture
def kubeflow_pipeline_request():
"""Sample Kubeflow pipeline trigger request."""
return {
"request_id": "test-request-456",
"pipeline": "rag-query",
"parameters": {
"query": "What is AI?",
"collection": "documents",
},
}
@pytest.fixture
def unknown_pipeline_request():
"""Request for unknown pipeline."""
return {
"request_id": "test-request-789",
"pipeline": "nonexistent-pipeline",
"parameters": {},
}
@pytest.fixture
def mock_argo_response():
"""Mock Argo Workflows API response."""
return {
"metadata": {
"name": "document-ingestion-abc123",
"namespace": "ai-ml",
},
"status": {"phase": "Pending"},
}
@pytest.fixture
def mock_kubeflow_response():
"""Mock Kubeflow Pipelines API response."""
return {
"run": {
"id": "run-xyz-789",
"name": "rag-query-test",
"status": "Running",
}
}