Log Assembler Service
Log Assembler Service
1. Executive Summary
The Log Assembler Service provides centralized log collection, correlation, and analysis capabilities for the Augment-It platform's distributed architecture. This service aggregates logs from microfrontends, microservices, containers, and external API interactions, providing unified observability across the entire Module Federation with Docker ecosystem.
The service handles log ingestion from multiple sources, correlates related events using trace IDs, enriches log data with contextual information, and provides structured outputs for monitoring, debugging, and compliance reporting.
2. Service Overview
Responsibilities
- Centralized Log Ingestion: Collect logs from all microfrontends, microservices, and infrastructure components
- Log Correlation: Link related log events across distributed components using trace IDs and correlation tokens
- Log Enrichment: Add contextual metadata including user information, session data, and system state
- Real-time Processing: Stream processing for immediate alerting and monitoring
- Historical Analysis: Store and index logs for historical analysis and compliance
- Error Aggregation: Group and deduplicate similar errors across the distributed system
- Performance Monitoring: Track and correlate performance metrics with log events
- Security Monitoring: Detect and alert on suspicious activities and security events
Key Features
- Multi-source log collection (containers, services, frontends)
- Distributed tracing correlation
- Real-time log streaming and processing
- Structured log parsing and normalization
- Error grouping and deduplication
- Performance correlation and analysis
- Security event detection
- Compliance log retention and archival
- Integration with Report Template Service
- Monitoring and alerting capabilities
3. Technical Architecture
High-Level Architecture
graph TB
subgraph "Log Sources"
subgraph "Microfrontends"
MF1[Shell App]
MF2[Prompt Manager]
MF3[Insight Assembler]
MF4[Request Reviewer]
MF5[Record Collector]
end
subgraph "Microservices"
MS1[User Auth Service]
MS2[API Connector Service]
MS3[YAML Parser Service]
MS4[JSON Parser Service]
MS5[Markdown Parser Service]
MS6[Account Management Service]
end
subgraph "Infrastructure"
INF1[API Gateway]
INF2[Kubernetes Logs]
INF3[Container Runtime]
INF4[External API Responses]
end
end
subgraph "Log Assembler Service"
subgraph "Ingestion Layer"
COLLECTOR[Log Collector]
PARSER[Log Parser]
VALIDATOR[Log Validator]
end
subgraph "Processing Layer"
CORRELATOR[Trace Correlator]
ENRICHER[Context Enricher]
AGGREGATOR[Error Aggregator]
ANALYZER[Pattern Analyzer]
end
subgraph "Storage Layer"
STREAM[Stream Processor]
INDEXER[Log Indexer]
ARCHIVER[Log Archiver]
end
subgraph "Output Layer"
ALERTER[Alert Manager]
API[Log Query API]
EXPORTER[Report Exporter]
end
end
subgraph "External Systems"
ELASTICSEARCH[Elasticsearch]
REDIS[Redis Cache]
PROMETHEUS[Prometheus]
GRAFANA[Grafana]
REPORTS[Report Template Service]
end
%% Log Flow
MF1 --> COLLECTOR
MF2 --> COLLECTOR
MF3 --> COLLECTOR
MF4 --> COLLECTOR
MF5 --> COLLECTOR
MS1 --> COLLECTOR
MS2 --> COLLECTOR
MS3 --> COLLECTOR
MS4 --> COLLECTOR
MS5 --> COLLECTOR
MS6 --> COLLECTOR
INF1 --> COLLECTOR
INF2 --> COLLECTOR
INF3 --> COLLECTOR
INF4 --> COLLECTOR
%% Processing Flow
COLLECTOR --> PARSER
PARSER --> VALIDATOR
VALIDATOR --> CORRELATOR
CORRELATOR --> ENRICHER
ENRICHER --> AGGREGATOR
AGGREGATOR --> ANALYZER
%% Storage Flow
ANALYZER --> STREAM
STREAM --> INDEXER
INDEXER --> ARCHIVER
%% Output Flow
STREAM --> ALERTER
INDEXER --> API
ARCHIVER --> EXPORTER
%% External Integration
INDEXER --> ELASTICSEARCH
STREAM --> REDIS
ALERTER --> PROMETHEUS
API --> GRAFANA
EXPORTER --> REPORTS
Log Collection Architecture
sequenceDiagram
participant Frontend as Microfrontend
participant Service as Microservice
participant Container as Container Runtime
participant Collector as Log Collector
participant Processor as Log Processor
participant Storage as Log Storage
participant Monitor as Monitoring
Note over Frontend, Monitor: User Action Triggers Error
Frontend->>Collector: Send client-side error log
Note right of Frontend: { traceId, userId, componentId, error, stack, timestamp }
Service->>Collector: Send service error log
Note right of Service: { traceId, serviceId, method, error, request, timestamp }
Container->>Collector: Send container log
Note right of Container: { traceId, containerId, level, message, timestamp }
Collector->>Processor: Process log batch
Note right of Collector: Correlate by traceId
Processor->>Storage: Store correlated logs
Note right of Processor: Enrich with context
Processor->>Monitor: Trigger alerts if needed
Note right of Processor: Real-time monitoring
Storage-->>Monitor: Query historical data
Note right of Storage: Trend analysis
4. Detailed Implementation
Log Schema and Standards
typescript
// Common log schema across all sources
interface BaseLogEntry {
timestamp: string; // ISO 8601 format
traceId: string; // Distributed tracing ID
spanId?: string; // Optional span ID for detailed tracing
correlationId: string; // Request/session correlation
source: LogSource;
level: LogLevel;
message: string;
metadata: Record<string, any>;
}
interface LogSource {
type: 'microfrontend' | 'microservice' | 'infrastructure' | 'external';
name: string; // e.g., 'prompt-manager', 'user-auth-service'
version: string;
environment: 'development' | 'staging' | 'production';
instance: string; // Container/pod identifier
}
type LogLevel = 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'fatal';
// Microfrontend-specific log entry
interface MicrofrontendLogEntry extends BaseLogEntry {
source: LogSource & { type: 'microfrontend' };
user?: {
id: string;
sessionId: string;
organizationId?: string;
};
component: {
name: string;
props?: Record<string, any>;
state?: Record<string, any>;
};
browser: {
userAgent: string;
url: string;
viewport: { width: number; height: number };
};
error?: {
name: string;
message: string;
stack: string;
componentStack?: string;
};
}
// Microservice-specific log entry
interface MicroserviceLogEntry extends BaseLogEntry {
source: LogSource & { type: 'microservice' };
request?: {
method: string;
url: string;
headers: Record<string, string>;
body?: any;
userId?: string;
};
response?: {
statusCode: number;
headers: Record<string, string>;
body?: any;
duration: number; // milliseconds
};
database?: {
query?: string;
duration?: number;
affected?: number;
};
external?: {
service: string;
endpoint: string;
duration: number;
statusCode?: number;
};
}
// Infrastructure log entry
interface InfrastructureLogEntry extends BaseLogEntry {
source: LogSource & { type: 'infrastructure' };
resource: {
type: 'container' | 'kubernetes' | 'network' | 'storage';
name: string;
namespace?: string;
};
metrics?: {
cpu?: number;
memory?: number;
network?: { in: number; out: number };
disk?: { read: number; write: number };
};
} Log Collection Implementation
typescript
// Log Collector Service
export class LogCollectorService {
private eventStream: EventEmitter;
private logBuffer: Map<string, LogEntry[]>; // Keyed by traceId
private redis: Redis;
constructor() {
this.eventStream = new EventEmitter();
this.logBuffer = new Map();
this.redis = new Redis(process.env.REDIS_URL);
// Process buffered logs every 100ms
setInterval(() => this.flushBuffer(), 100);
}
// Collect log from various sources
async collectLog(logEntry: BaseLogEntry): Promise<void> {
try {
// Validate log entry
const validatedEntry = await this.validateLogEntry(logEntry);
// Add to buffer for correlation
this.bufferLog(validatedEntry);
// Emit for real-time processing
this.eventStream.emit('log:received', validatedEntry);
// Store in Redis for fast access
await this.cacheLog(validatedEntry);
} catch (error) {
console.error('Failed to collect log:', error);
// Don't let log processing failures break the application
}
}
private bufferLog(logEntry: BaseLogEntry): void {
const { traceId } = logEntry;
if (!this.logBuffer.has(traceId)) {
this.logBuffer.set(traceId, []);
}
this.logBuffer.get(traceId)!.push(logEntry);
}
private async flushBuffer(): Promise<void> {
for (const [traceId, logs] of this.logBuffer.entries()) {
if (logs.length > 0) {
// Process correlated logs
await this.processCorrelatedLogs(traceId, logs);
// Clear processed logs
this.logBuffer.set(traceId, []);
}
}
}
private async processCorrelatedLogs(traceId: string, logs: LogEntry[]): Promise<void> {
const correlatedLog: CorrelatedLogGroup = {
traceId,
timestamp: new Date().toISOString(),
logs: logs.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()),
summary: this.generateLogSummary(logs),
severity: this.calculateGroupSeverity(logs),
duration: this.calculateTraceDuration(logs),
};
// Emit correlated log group
this.eventStream.emit('logs:correlated', correlatedLog);
}
private generateLogSummary(logs: LogEntry[]): LogSummary {
const errorCount = logs.filter(log => log.level === 'error' || log.level === 'fatal').length;
const warnCount = logs.filter(log => log.level === 'warn').length;
const services = [...new Set(logs.map(log => log.source.name))];
return {
totalLogs: logs.length,
errorCount,
warnCount,
servicesInvolved: services,
timeSpan: this.calculateTraceDuration(logs),
};
}
} Error Aggregation and Pattern Analysis
typescript
export class ErrorAggregatorService {
private errorPatterns: Map<string, ErrorPattern>;
private elasticsearch: Client;
constructor() {
this.errorPatterns = new Map();
this.elasticsearch = new Client({ node: process.env.ELASTICSEARCH_URL });
}
async aggregateError(logEntry: BaseLogEntry): Promise<void> {
if (logEntry.level !== 'error' && logEntry.level !== 'fatal') {
return;
}
const errorSignature = this.generateErrorSignature(logEntry);
const existingPattern = this.errorPatterns.get(errorSignature);
if (existingPattern) {
// Update existing pattern
existingPattern.count++;
existingPattern.lastOccurrence = logEntry.timestamp;
existingPattern.affectedTraces.add(logEntry.traceId);
existingPattern.recentLogs.push(logEntry);
// Keep only recent logs (last 10)
if (existingPattern.recentLogs.length > 10) {
existingPattern.recentLogs = existingPattern.recentLogs.slice(-10);
}
// Check if this is a spike in errors
if (this.isErrorSpike(existingPattern)) {
await this.triggerErrorSpikeAlert(existingPattern);
}
} else {
// Create new error pattern
const newPattern: ErrorPattern = {
signature: errorSignature,
firstOccurrence: logEntry.timestamp,
lastOccurrence: logEntry.timestamp,
count: 1,
affectedServices: new Set([logEntry.source.name]),
affectedTraces: new Set([logEntry.traceId]),
recentLogs: [logEntry],
severity: this.calculateErrorSeverity(logEntry),
};
this.errorPatterns.set(errorSignature, newPattern);
// Trigger alert for new critical errors
if (newPattern.severity === 'critical') {
await this.triggerNewCriticalErrorAlert(newPattern);
}
}
// Store in Elasticsearch for historical analysis
await this.indexError(logEntry, errorSignature);
}
private generateErrorSignature(logEntry: BaseLogEntry): string {
const error = (logEntry as any).error || { message: logEntry.message };
// Create a signature based on error type, service, and normalized message
const normalizedMessage = this.normalizeErrorMessage(error.message);
const signature = `${logEntry.source.name}:${error.name || 'UnknownError'}:${normalizedMessage}`;
return crypto.createHash('sha256').update(signature).digest('hex').substring(0, 16);
}
private normalizeErrorMessage(message: string): string {
// Normalize error messages by removing variable parts (IDs, timestamps, etc.)
return message
.replace(/\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b/gi, 'UUID')
.replace(/\b\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z\b/g, 'TIMESTAMP')
.replace(/\b\d+\b/g, 'NUMBER')
.replace(/\b[a-f0-9]{32,}\b/gi, 'HASH')
.toLowerCase();
}
} Performance Correlation
typescript
export class PerformanceCorrelatorService {
private traceMetrics: Map<string, TraceMetrics>;
private prometheus: PromClient;
constructor() {
this.traceMetrics = new Map();
this.prometheus = new PromClient();
}
async correlatePerformance(correlatedLogs: CorrelatedLogGroup): Promise<void> {
const metrics = this.calculateTraceMetrics(correlatedLogs);
// Store metrics for this trace
this.traceMetrics.set(correlatedLogs.traceId, metrics);
// Send metrics to Prometheus
await this.exportMetrics(metrics);
// Check for performance issues
const issues = this.detectPerformanceIssues(metrics);
if (issues.length > 0) {
await this.triggerPerformanceAlerts(correlatedLogs.traceId, issues);
}
}
private calculateTraceMetrics(correlatedLogs: CorrelatedLogGroup): TraceMetrics {
const logs = correlatedLogs.logs;
const serviceMetrics = new Map<string, ServiceMetrics>();
// Calculate per-service metrics
for (const log of logs) {
const serviceName = log.source.name;
if (!serviceMetrics.has(serviceName)) {
serviceMetrics.set(serviceName, {
name: serviceName,
requestCount: 0,
totalDuration: 0,
errorCount: 0,
maxDuration: 0,
});
}
const service = serviceMetrics.get(serviceName)!;
service.requestCount++;
if (log.level === 'error' || log.level === 'fatal') {
service.errorCount++;
}
// Extract duration from microservice logs
if ('response' in log && log.response?.duration) {
service.totalDuration += log.response.duration;
service.maxDuration = Math.max(service.maxDuration, log.response.duration);
}
}
return {
traceId: correlatedLogs.traceId,
totalDuration: correlatedLogs.duration,
serviceMetrics: Array.from(serviceMetrics.values()),
errorRate: correlatedLogs.summary.errorCount / correlatedLogs.summary.totalLogs,
servicesInvolved: correlatedLogs.summary.servicesInvolved.length,
};
}
private detectPerformanceIssues(metrics: TraceMetrics): PerformanceIssue[] {
const issues: PerformanceIssue[] = [];
// Check overall trace duration
if (metrics.totalDuration > 5000) { // 5 seconds
issues.push({
type: 'slow_trace',
severity: 'warning',
description: `Trace took ${metrics.totalDuration}ms to complete`,
affectedServices: metrics.serviceMetrics.map(s => s.name),
});
}
// Check individual service performance
for (const service of metrics.serviceMetrics) {
const avgDuration = service.totalDuration / service.requestCount;
if (avgDuration > 2000) { // 2 seconds average
issues.push({
type: 'slow_service',
severity: 'warning',
description: `Service ${service.name} averaged ${avgDuration.toFixed(0)}ms per request`,
affectedServices: [service.name],
});
}
if (service.errorRate > 0.1) { // 10% error rate
issues.push({
type: 'high_error_rate',
severity: service.errorRate > 0.5 ? 'critical' : 'warning',
description: `Service ${service.name} has ${(service.errorRate * 100).toFixed(1)}% error rate`,
affectedServices: [service.name],
});
}
}
return issues;
}
} 5. API Interface
REST Endpoints
yaml
basePath: /api/v1/logs
paths:
/ingest:
post:
summary: Ingest log entries
requestBody:
required: true
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/BaseLogEntry'
- type: array
items:
$ref: '#/components/schemas/BaseLogEntry'
responses:
'202':
description: Logs accepted for processing
'400':
description: Invalid log format
/query:
post:
summary: Query logs
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/LogQuery'
responses:
'200':
description: Query results
content:
application/json:
schema:
$ref: '#/components/schemas/LogQueryResult'
/errors:
get:
summary: Get error patterns
parameters:
- name: timeRange
in: query
schema:
type: string
enum: [1h, 6h, 24h, 7d, 30d]
- name: service
in: query
schema:
type: string
- name: severity
in: query
schema:
type: string
enum: [low, medium, high, critical]
responses:
'200':
description: Error patterns
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ErrorPattern'
/traces/{traceId}:
get:
summary: Get correlated logs for a trace
parameters:
- name: traceId
in: path
required: true
schema:
type: string
responses:
'200':
description: Correlated log group
content:
application/json:
schema:
$ref: '#/components/schemas/CorrelatedLogGroup'
'404':
description: Trace not found
/metrics:
get:
summary: Get performance metrics
parameters:
- name: timeRange
in: query
schema:
type: string
- name: service
in: query
schema:
type: string
responses:
'200':
description: Performance metrics
content:
application/json:
schema:
$ref: '#/components/schemas/PerformanceMetrics'
/health:
get:
summary: Health check
responses:
'200':
description: Service health status WebSocket Interface
typescript
// Real-time log streaming
interface LogStreamMessage {
type: 'log' | 'error_pattern' | 'performance_alert' | 'trace_complete';
data: any;
timestamp: string;
}
// WebSocket endpoints
interface WebSocketEndpoints {
'/ws/logs': {
subscribe: {
filters: {
services?: string[];
levels?: LogLevel[];
traceIds?: string[];
};
};
messages: LogStreamMessage[];
};
'/ws/errors': {
subscribe: {
severity?: 'warning' | 'critical';
};
messages: ErrorPattern[];
};
'/ws/performance': {
subscribe: {
thresholds: {
duration?: number;
errorRate?: number;
};
};
messages: PerformanceIssue[];
};
} 6. Integration Points
Module Federation Integration
typescript
// Client-side logging for microfrontends
export class MicrofrontendLogger {
private logService: LogAssemblerClient;
private traceId: string;
private componentStack: string[];
constructor(moduleName: string) {
this.logService = new LogAssemblerClient();
this.traceId = this.generateTraceId();
this.componentStack = [moduleName];
}
// Module Federation error boundary integration
logModuleError(error: Error, moduleName: string, errorInfo?: any): void {
const logEntry: MicrofrontendLogEntry = {
timestamp: new Date().toISOString(),
traceId: this.traceId,
correlationId: this.getCorrelationId(),
source: {
type: 'microfrontend',
name: moduleName,
version: process.env.APP_VERSION || '1.0.0',
environment: process.env.NODE_ENV as any,
instance: window.location.hostname,
},
level: 'error',
message: `Module Federation Error: ${error.message}`,
metadata: {
errorInfo,
url: window.location.href,
userAgent: navigator.userAgent,
},
user: this.getCurrentUser(),
component: {
name: moduleName,
props: errorInfo?.componentProps,
},
browser: {
userAgent: navigator.userAgent,
url: window.location.href,
viewport: {
width: window.innerWidth,
height: window.innerHeight,
},
},
error: {
name: error.name,
message: error.message,
stack: error.stack || '',
componentStack: errorInfo?.componentStack,
},
};
this.logService.ingest(logEntry);
}
// Performance logging for module loading
logModuleLoadTime(moduleName: string, duration: number): void {
const logEntry: MicrofrontendLogEntry = {
timestamp: new Date().toISOString(),
traceId: this.traceId,
correlationId: this.getCorrelationId(),
source: {
type: 'microfrontend',
name: moduleName,
version: process.env.APP_VERSION || '1.0.0',
environment: process.env.NODE_ENV as any,
instance: window.location.hostname,
},
level: 'info',
message: `Module loaded: ${moduleName}`,
metadata: {
loadTime: duration,
performance: {
navigation: performance.navigation,
timing: performance.timing,
},
},
};
this.logService.ingest(logEntry);
}
} Docker Container Integration
yaml
# docker-compose logging configuration
version: '3.8'
services:
shell-app:
logging:
driver: "fluentd"
options:
fluentd-address: "log-assembler:24224"
fluentd-async-connect: "true"
tag: "microfrontend.shell-app"
prompt-manager:
logging:
driver: "fluentd"
options:
fluentd-address: "log-assembler:24224"
tag: "microfrontend.prompt-manager"
user-auth-service:
logging:
driver: "fluentd"
options:
fluentd-address: "log-assembler:24224"
tag: "microservice.user-auth"
log-assembler:
image: augment-it/log-assembler:latest
ports:
- "24224:24224" # Fluentd port
- "9090:9090" # HTTP API
- "8080:8080" # WebSocket
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200
- REDIS_URL=redis://redis:6379
- PROMETHEUS_URL=http://prometheus:9090 Kubernetes Integration
yaml
# kubernetes logging configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
data:
fluent-bit.conf: |
[INPUT]
Name tail
Path /var/log/containers/*augment-it*.log
Parser docker
Tag kube.*
Mem_Buf_Limit 50MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Merge_Log On
K8S-Logging.Parser On
K8S-Logging.Exclude Off
[OUTPUT]
Name http
Match *
Host log-assembler-service
Port 9090
URI /api/v1/logs/ingest
Format json_lines 7. Performance and Scalability
Throughput Requirements
- Log Ingestion Rate: 10,000+ logs/second during peak load
- Real-time Processing: < 100ms latency for log correlation
- Query Response Time: < 500ms for typical log queries
- Storage Retention: 90 days hot storage, 1 year cold storage
- Concurrent Users: Support 100+ concurrent dashboard users
Scaling Strategy
yaml
# Kubernetes HPA configuration
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: log-assembler-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: log-assembler
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: log_ingestion_rate
target:
type: AverageValue
averageValue: "1000" # logs per second per pod 8. Security and Compliance
Data Security
- Encryption in Transit: TLS 1.3 for all log transmission
- Encryption at Rest: AES-256 encryption for stored logs
- Access Control: Role-based access to log data
- Audit Trail: All log access and queries are audited
- Data Anonymization: PII scrubbing for compliance
Compliance Features
- GDPR Compliance: Data retention policies and right to deletion
- SOC 2 Compliance: Access controls and audit trails
- HIPAA Compliance: PHI handling and encryption (if applicable)
- Log Retention: Configurable retention policies
- Data Export: Support for compliance reporting
9. Monitoring and Operations
Service Health Monitoring
typescript
export class LogAssemblerHealthMonitor {
private healthMetrics: HealthMetrics;
async getHealth(): Promise<HealthStatus> {
const checks = await Promise.all([
this.checkElasticsearch(),
this.checkRedis(),
this.checkLogIngestion(),
this.checkProcessingLatency(),
this.checkStorage(),
]);
const overallStatus = checks.every(check => check.status === 'healthy')
? 'healthy'
: checks.some(check => check.status === 'critical')
? 'critical'
: 'degraded';
return {
status: overallStatus,
timestamp: new Date().toISOString(),
checks,
metrics: this.healthMetrics,
};
}
private async checkLogIngestion(): Promise<HealthCheck> {
const recentLogs = await this.getRecentLogCount(60000); // Last minute
const expectedRate = 100; // logs per minute minimum
return {
name: 'log_ingestion',
status: recentLogs >= expectedRate ? 'healthy' : 'warning',
message: `${recentLogs} logs ingested in the last minute`,
metrics: { logsPerMinute: recentLogs },
};
}
} Operational Dashboards
yaml
# Grafana dashboard configuration
dashboards:
- name: "Log Assembler Overview"
panels:
- title: "Log Ingestion Rate"
type: graph
targets:
- expr: rate(log_assembler_logs_ingested_total[5m])
- title: "Error Patterns"
type: table
targets:
- expr: topk(10, log_assembler_error_patterns_count)
- title: "Service Response Times"
type: heatmap
targets:
- expr: histogram_quantile(0.95, log_assembler_processing_duration_seconds_bucket)
- title: "Alert Status"
type: stat
targets:
- expr: log_assembler_active_alerts 10. Configuration
Environment Configuration
yaml
# Environment variables
environment:
# Service Configuration
LOG_ASSEMBLER_PORT: 9090
LOG_ASSEMBLER_WS_PORT: 8080
LOG_ASSEMBLER_FLUENTD_PORT: 24224
# Storage Configuration
ELASTICSEARCH_URL: http://elasticsearch:9200
ELASTICSEARCH_INDEX_PREFIX: augment-it-logs
REDIS_URL: redis://redis:6379
REDIS_KEY_PREFIX: log-assembler
# Processing Configuration
LOG_BUFFER_SIZE: 1000
LOG_BUFFER_FLUSH_INTERVAL: 100 # milliseconds
LOG_CORRELATION_TIMEOUT: 30000 # milliseconds
LOG_RETENTION_DAYS: 90
# Alert Configuration
ALERT_ERROR_SPIKE_THRESHOLD: 10 # errors per minute
ALERT_PERFORMANCE_THRESHOLD: 5000 # milliseconds
ALERT_ERROR_RATE_THRESHOLD: 0.1 # 10%
# Security Configuration
LOG_ENCRYPTION_ENABLED: true
LOG_ANONYMIZATION_ENABLED: true
ACCESS_TOKEN_SECRET: ${ACCESS_TOKEN_SECRET} Application Configuration
yaml
# config/log-assembler.yml
service:
name: log-assembler
version: 1.0.0
environment: production
ingestion:
sources:
- type: http
port: 9090
path: /api/v1/logs/ingest
- type: fluentd
port: 24224
buffer_size: 64MB
- type: websocket
port: 8080
max_connections: 1000
validation:
schema_validation: true
required_fields: [timestamp, traceId, source, level, message]
max_message_size: 1MB
processing:
correlation:
enabled: true
timeout: 30s
buffer_size: 10000
enrichment:
enabled: true
user_context: true
geolocation: false
aggregation:
error_patterns: true
performance_metrics: true
deduplication: true
storage:
elasticsearch:
enabled: true
index_rotation: daily
replicas: 1
shards: 3
redis:
enabled: true
ttl: 3600 # 1 hour
max_memory: 2GB
archival:
enabled: true
cold_storage_days: 90
archive_format: gzip
alerting:
channels:
- type: webhook
url: ${SLACK_WEBHOOK_URL}
- type: prometheus
enabled: true
rules:
- name: error_spike
condition: error_rate > 10/min
severity: warning
- name: critical_error
condition: level == "fatal"
severity: critical
- name: slow_performance
condition: avg_duration > 5000ms
severity: warning This comprehensive Log Assembler Service provides the foundation for centralized logging across your distributed Module Federation architecture. It handles the complexity of correlating logs from multiple microfrontends, microservices, and infrastructure components while providing real-time monitoring, error aggregation, and performance analysis.
The service integrates seamlessly with your existing Docker and Kubernetes infrastructure and provides the data foundation needed for the Report Template Service to generate meaningful insights and reports.