Architecture¶
CitizenAI is built with a modern, scalable architecture designed for government-scale citizen engagement. This document outlines the system design, components, and architectural decisions.
System Overview¶
CitizenAI follows a layered architecture pattern with clear separation of concerns:
graph TB
subgraph "Presentation Layer"
A[Web UI]
B[Mobile UI]
C[API Gateway]
end
subgraph "Application Layer"
D[Flask Application]
E[Authentication Service]
F[Chat Engine]
G[Analytics Engine]
H[Concern Manager]
end
subgraph "Integration Layer"
I[IBM Watson API]
J[External APIs]
K[Message Queue]
end
subgraph "Data Layer"
L[Session Store]
M[File Storage]
N[Analytics DB]
O[Audit Logs]
end
A --> D
B --> D
C --> D
D --> E
D --> F
D --> G
D --> H
F --> I
G --> K
H --> J
E --> L
F --> M
G --> N
H --> O
Technology Stack¶
Core Technologies¶
Component | Technology | Purpose |
---|---|---|
Web Framework | Flask 2.3+ | HTTP server and routing |
AI Engine | IBM Watson, IBM Granite | Natural language processing |
Frontend | HTML5, CSS3, JavaScript | User interface |
Visualization | Plotly.js | Interactive charts and graphs |
Session Management | Flask-Session | User session handling |
Authentication | Custom + OAuth | User authentication |
Development Stack¶
# Core dependencies
CORE_STACK = {
"runtime": "Python 3.11+",
"web_framework": "Flask 2.3+",
"ai_platform": "IBM Watson",
"data_processing": "Pandas, NumPy",
"visualization": "Plotly",
"http_client": "Requests",
"configuration": "python-dotenv"
}
# Development dependencies
DEV_STACK = {
"testing": "pytest",
"linting": "flake8, black",
"documentation": "MkDocs Material",
"type_checking": "mypy",
"pre_commit": "pre-commit hooks"
}
Application Architecture¶
Modular Design¶
CitizenAI uses a modular architecture with clear boundaries:
src/
├── core/ # Core application logic
│ ├── __init__.py
│ ├── app.py # Flask application factory
│ ├── config.py # Configuration management
│ └── extensions.py # Extension initialization
├── auth/ # Authentication module
│ ├── __init__.py
│ ├── models.py # User models
│ ├── views.py # Auth endpoints
│ └── utils.py # Auth utilities
├── chat/ # Chat functionality
│ ├── __init__.py
│ ├── engine.py # AI chat engine
│ ├── watson.py # IBM Watson integration
│ └── handlers.py # Message handlers
├── analytics/ # Analytics and reporting
│ ├── __init__.py
│ ├── processor.py # Data processing
│ ├── visualizer.py # Chart generation
│ └── exporters.py # Data export
├── concerns/ # Concern management
│ ├── __init__.py
│ ├── models.py # Concern models
│ ├── manager.py # Business logic
│ └── notifications.py # Notification system
└── utils/ # Shared utilities
├── __init__.py
├── helpers.py # Helper functions
├── validators.py # Input validation
└── decorators.py # Custom decorators
Component Interaction¶
sequenceDiagram
participant U as User
participant W as Web UI
participant A as Flask App
participant C as Chat Engine
participant I as IBM Watson
participant D as Data Store
U->>W: Send message
W->>A: HTTP request
A->>C: Process message
C->>I: AI query
I->>C: AI response
C->>D: Store interaction
C->>A: Return response
A->>W: JSON response
W->>U: Display answer
Data Architecture¶
Data Flow¶
flowchart TD
A[User Input] --> B[Validation Layer]
B --> C[Business Logic]
C --> D[Data Processing]
D --> E[Storage Layer]
E --> F[Analytics Pipeline]
F --> G[Aggregation Engine]
G --> H[Visualization Layer]
H --> I[Dashboard]
C --> J[AI Processing]
J --> K[Watson API]
K --> L[Response Generation]
L --> M[User Interface]
Storage Strategy¶
Session Storage¶
# File-based session storage
SESSION_CONFIG = {
"type": "filesystem",
"directory": "./flask_session",
"threshold": 500,
"mode": 0o600, # Read/write for owner only
"permanent": False,
"use_signer": True
}
Analytics Data¶
# In-memory analytics with persistence
ANALYTICS_CONFIG = {
"storage": "json_files",
"directory": "./data/analytics",
"retention_days": 90,
"aggregation_levels": ["hourly", "daily", "weekly"],
"real_time_buffer": 1000 # Keep last 1000 events in memory
}
File Storage¶
# Secure file handling
FILE_CONFIG = {
"upload_folder": "./data/uploads",
"max_file_size": 10 * 1024 * 1024, # 10MB
"allowed_extensions": [".jpg", ".png", ".pdf", ".txt"],
"virus_scanning": True,
"encryption": "AES-256"
}
Security Architecture¶
Security Layers¶
graph TB
A[HTTPS/TLS] --> B[Web Application Firewall]
B --> C[Rate Limiting]
C --> D[Authentication]
D --> E[Authorization]
E --> F[Input Validation]
F --> G[Output Encoding]
G --> H[Audit Logging]
Security Implementation¶
Authentication Flow¶
class AuthenticationManager:
"""Handles user authentication and session management."""
def authenticate(self, username: str, password: str) -> dict:
"""Authenticate user and create session."""
# 1. Validate input
if not self.validate_credentials(username, password):
raise AuthenticationError("Invalid credentials")
# 2. Check user exists and is active
user = self.get_user(username)
if not user or not user.is_active:
raise AuthenticationError("User not found or inactive")
# 3. Verify password
if not self.verify_password(password, user.password_hash):
self.log_failed_attempt(username)
raise AuthenticationError("Invalid password")
# 4. Create session
session = self.create_session(user)
self.log_successful_login(user)
return {
"user": user.to_dict(),
"session": session.to_dict()
}
Authorization System¶
class AuthorizationManager:
"""Role-based access control."""
PERMISSIONS = {
"viewer": ["read"],
"staff": ["read", "write"],
"manager": ["read", "write", "manage"],
"admin": ["read", "write", "manage", "admin"]
}
def check_permission(self, user: User, resource: str, action: str) -> bool:
"""Check if user has permission for action on resource."""
required_permission = f"{resource}:{action}"
user_permissions = self.get_user_permissions(user)
return required_permission in user_permissions
API Architecture¶
RESTful Design¶
CitizenAI follows REST principles with consistent patterns:
# URL patterns
API_PATTERNS = {
"collections": "/api/v1/concerns", # GET, POST
"resources": "/api/v1/concerns/{id}", # GET, PUT, DELETE
"actions": "/api/v1/concerns/{id}/resolve", # POST
"filters": "/api/v1/concerns?status=open", # GET with query params
}
# HTTP methods mapping
HTTP_METHODS = {
"GET": "retrieve",
"POST": "create",
"PUT": "update",
"PATCH": "partial_update",
"DELETE": "destroy"
}
API Versioning Strategy¶
# Version handling
class APIVersioning:
"""Handle API versioning and backwards compatibility."""
SUPPORTED_VERSIONS = ["v1"]
DEFAULT_VERSION = "v1"
def get_version(self, request):
"""Extract API version from request."""
# 1. Check URL path: /api/v1/endpoint
if "/v" in request.path:
return request.path.split("/")[2]
# 2. Check header: X-API-Version: v1
if "X-API-Version" in request.headers:
return request.headers["X-API-Version"]
# 3. Default version
return self.DEFAULT_VERSION
AI Integration Architecture¶
Watson Integration¶
graph LR
A[User Message] --> B[Preprocessing]
B --> C[Intent Detection]
C --> D[Watson Assistant]
D --> E[Response Generation]
E --> F[Post-processing]
F --> G[User Response]
subgraph Watson Services
D --> H[Natural Language Understanding]
D --> I[Natural Language Generation]
D --> J[Discovery Service]
end
AI Pipeline Implementation¶
class AIEngine:
"""Core AI processing engine."""
def __init__(self, watson_config: dict):
self.watson = WatsonAssistant(**watson_config)
self.preprocessor = MessagePreprocessor()
self.postprocessor = ResponsePostprocessor()
async def process_message(self, message: str, context: dict) -> dict:
"""Process user message through AI pipeline."""
# 1. Preprocess message
processed_message = self.preprocessor.clean(message)
# 2. Send to Watson
watson_response = await self.watson.message(
message=processed_message,
context=context
)
# 3. Post-process response
final_response = self.postprocessor.format(watson_response)
# 4. Extract metadata
metadata = {
"confidence": watson_response.get("confidence", 0),
"intent": watson_response.get("intent", "unknown"),
"entities": watson_response.get("entities", []),
"sentiment": self.analyze_sentiment(message)
}
return {
"response": final_response,
"metadata": metadata
}
Performance Architecture¶
Caching Strategy¶
# Multi-level caching
CACHE_CONFIG = {
"levels": {
"memory": {
"type": "in_memory",
"size": "100MB",
"ttl": 300 # 5 minutes
},
"redis": {
"type": "redis",
"host": "localhost",
"ttl": 3600 # 1 hour
},
"file": {
"type": "filesystem",
"directory": "./cache",
"ttl": 86400 # 24 hours
}
}
}
Asynchronous Processing¶
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncProcessor:
"""Handle asynchronous operations."""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
async def process_chat_message(self, message: str) -> dict:
"""Process chat message asynchronously."""
loop = asyncio.get_event_loop()
# Run AI processing in thread pool
result = await loop.run_in_executor(
self.executor,
self.ai_engine.process,
message
)
return result
Scalability Considerations¶
Horizontal Scaling¶
graph TB
A[Load Balancer] --> B[App Instance 1]
A --> C[App Instance 2]
A --> D[App Instance N]
B --> E[Shared Session Store]
C --> E
D --> E
B --> F[Shared File Storage]
C --> F
D --> F
Database Scaling¶
# Database connection pooling
DATABASE_CONFIG = {
"pool_size": 20,
"max_overflow": 50,
"pool_timeout": 30,
"pool_recycle": 3600,
"pool_pre_ping": True
}
# Read/write splitting for large deployments
class DatabaseManager:
def __init__(self):
self.write_db = create_engine(WRITE_DB_URL, **DATABASE_CONFIG)
self.read_db = create_engine(READ_DB_URL, **DATABASE_CONFIG)
def get_engine(self, operation: str):
"""Route database operations to appropriate instance."""
if operation in ["INSERT", "UPDATE", "DELETE"]:
return self.write_db
return self.read_db
Monitoring and Observability¶
Health Monitoring¶
class HealthMonitor:
"""System health monitoring."""
def check_system_health(self) -> dict:
"""Comprehensive system health check."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"components": {
"database": self.check_database(),
"watson_api": self.check_watson(),
"file_storage": self.check_storage(),
"memory": self.check_memory(),
"disk": self.check_disk()
},
"metrics": self.get_performance_metrics()
}
Logging Architecture¶
# Structured logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"format": "%(asctime)s %(name)s %(levelname)s %(message)s",
"class": "pythonjsonlogger.jsonlogger.JsonFormatter"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "json"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/citizenai.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"formatter": "json"
}
},
"loggers": {
"citizenai": {
"handlers": ["console", "file"],
"level": "INFO",
"propagate": False
}
}
}
Deployment Architecture¶
Container Strategy¶
# Multi-stage Docker build
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim as runtime
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY . .
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
Environment Configuration¶
# docker-compose.yml
version: '3.8'
services:
citizenai:
build: .
ports:
- "5000:5000"
environment:
- FLASK_ENV=production
- WATSON_API_KEY=${WATSON_API_KEY}
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- citizenai
Future Architecture Considerations¶
Microservices Migration¶
graph TB
A[API Gateway] --> B[Auth Service]
A --> C[Chat Service]
A --> D[Analytics Service]
A --> E[Concern Service]
B --> F[User DB]
C --> G[Watson API]
D --> H[Analytics DB]
E --> I[Concern DB]
subgraph Message Bus
J[Event Stream]
end
B --> J
C --> J
D --> J
E --> J
Cloud-Native Features¶
- Service mesh for inter-service communication
- Event-driven architecture with message queues
- Auto-scaling based on demand
- Circuit breakers for resilience
- Distributed tracing for debugging
Best Practices¶
Code Organization¶
Architecture Best Practices
- Separation of concerns: Each module has a single responsibility
- Dependency injection: Use factories and dependency injection
- Configuration management: Centralized configuration
- Error handling: Consistent error handling patterns
- Testing: Unit tests for each component
- Documentation: Clear API documentation
Performance Optimization¶
- Lazy loading: Load resources only when needed
- Connection pooling: Reuse database connections
- Response compression: Gzip compression for API responses
- CDN usage: Static assets served via CDN
- Database indexing: Proper database indexes
Next Steps¶
- Testing - Learn about testing strategies and implementation
- Deployment - Detailed deployment guide
- Contributing - How to contribute to the codebase