Migration monitoring, CDC, and observability infrastructure
Add this skill
npx mdskills install sickn33/database-migrations-migration-observabilityComprehensive CDC and monitoring infrastructure with production-ready code patterns
1---2name: database-migrations-migration-observability3description: Migration monitoring, CDC, and observability infrastructure4allowed-tools: Read Write Edit Bash WebFetch5metadata:6 version: 1.0.07 tags: database, cdc, debezium, kafka, prometheus, grafana, monitoring8---910# Migration Observability and Real-time Monitoring1112You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.1314## Use this skill when1516- Working on migration observability and real-time monitoring tasks or workflows17- Needing guidance, best practices, or checklists for migration observability and real-time monitoring1819## Do not use this skill when2021- The task is unrelated to migration observability and real-time monitoring22- You need a different domain or tool outside this scope2324## Context25The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.2627## Requirements28$ARGUMENTS2930## Instructions3132### 1. Observable MongoDB Migrations3334```javascript35const { MongoClient } = require('mongodb');36const { createLogger, transports } = require('winston');37const prometheus = require('prom-client');3839class ObservableAtlasMigration {40 constructor(connectionString) {41 this.client = new MongoClient(connectionString);42 this.logger = createLogger({43 transports: [44 new transports.File({ filename: 'migrations.log' }),45 new transports.Console()46 ]47 });48 this.metrics = this.setupMetrics();49 }5051 setupMetrics() {52 const register = new prometheus.Registry();5354 return {55 migrationDuration: new prometheus.Histogram({56 name: 'mongodb_migration_duration_seconds',57 help: 'Duration of MongoDB migrations',58 labelNames: ['version', 'status'],59 buckets: [1, 5, 15, 30, 60, 300],60 registers: [register]61 }),62 documentsProcessed: new prometheus.Counter({63 name: 'mongodb_migration_documents_total',64 help: 'Total documents processed',65 labelNames: ['version', 'collection'],66 registers: [register]67 }),68 migrationErrors: new prometheus.Counter({69 name: 'mongodb_migration_errors_total',70 help: 'Total migration errors',71 labelNames: ['version', 'error_type'],72 registers: [register]73 }),74 register75 };76 }7778 async migrate() {79 await this.client.connect();80 const db = this.client.db();8182 for (const [version, migration] of this.migrations) {83 await this.executeMigrationWithObservability(db, version, migration);84 }85 }8687 async executeMigrationWithObservability(db, version, migration) {88 const timer = this.metrics.migrationDuration.startTimer({ version });89 const session = this.client.startSession();9091 try {92 this.logger.info(`Starting migration ${version}`);9394 await session.withTransaction(async () => {95 await migration.up(db, session, (collection, count) => {96 this.metrics.documentsProcessed.inc({97 version,98 collection99 }, count);100 });101 });102103 timer({ status: 'success' });104 this.logger.info(`Migration ${version} completed`);105106 } catch (error) {107 this.metrics.migrationErrors.inc({108 version,109 error_type: error.name110 });111 timer({ status: 'failed' });112 throw error;113 } finally {114 await session.endSession();115 }116 }117}118```119120### 2. Change Data Capture with Debezium121122```python123import asyncio124import json125from kafka import KafkaConsumer, KafkaProducer126from prometheus_client import Counter, Histogram, Gauge127from datetime import datetime128129class CDCObservabilityManager:130 def __init__(self, config):131 self.config = config132 self.metrics = self.setup_metrics()133134 def setup_metrics(self):135 return {136 'events_processed': Counter(137 'cdc_events_processed_total',138 'Total CDC events processed',139 ['source', 'table', 'operation']140 ),141 'consumer_lag': Gauge(142 'cdc_consumer_lag_messages',143 'Consumer lag in messages',144 ['topic', 'partition']145 ),146 'replication_lag': Gauge(147 'cdc_replication_lag_seconds',148 'Replication lag',149 ['source_table', 'target_table']150 )151 }152153 async def setup_cdc_pipeline(self):154 self.consumer = KafkaConsumer(155 'database.changes',156 bootstrap_servers=self.config['kafka_brokers'],157 group_id='migration-consumer',158 value_deserializer=lambda m: json.loads(m.decode('utf-8'))159 )160161 self.producer = KafkaProducer(162 bootstrap_servers=self.config['kafka_brokers'],163 value_serializer=lambda v: json.dumps(v).encode('utf-8')164 )165166 async def process_cdc_events(self):167 for message in self.consumer:168 event = self.parse_cdc_event(message.value)169170 self.metrics['events_processed'].labels(171 source=event.source_db,172 table=event.table,173 operation=event.operation174 ).inc()175176 await self.apply_to_target(177 event.table,178 event.operation,179 event.data,180 event.timestamp181 )182183 async def setup_debezium_connector(self, source_config):184 connector_config = {185 "name": f"migration-connector-{source_config['name']}",186 "config": {187 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",188 "database.hostname": source_config['host'],189 "database.port": source_config['port'],190 "database.dbname": source_config['database'],191 "plugin.name": "pgoutput",192 "heartbeat.interval.ms": "10000"193 }194 }195196 response = requests.post(197 f"{self.config['kafka_connect_url']}/connectors",198 json=connector_config199 )200```201202### 3. Enterprise Monitoring and Alerting203204```python205from prometheus_client import Counter, Gauge, Histogram, Summary206import numpy as np207208class EnterpriseMigrationMonitor:209 def __init__(self, config):210 self.config = config211 self.registry = prometheus.CollectorRegistry()212 self.metrics = self.setup_metrics()213 self.alerting = AlertingSystem(config.get('alerts', {}))214215 def setup_metrics(self):216 return {217 'migration_duration': Histogram(218 'migration_duration_seconds',219 'Migration duration',220 ['migration_id'],221 buckets=[60, 300, 600, 1800, 3600],222 registry=self.registry223 ),224 'rows_migrated': Counter(225 'migration_rows_total',226 'Total rows migrated',227 ['migration_id', 'table_name'],228 registry=self.registry229 ),230 'data_lag': Gauge(231 'migration_data_lag_seconds',232 'Data lag',233 ['migration_id'],234 registry=self.registry235 )236 }237238 async def track_migration_progress(self, migration_id):239 while migration.status == 'running':240 stats = await self.calculate_progress_stats(migration)241242 self.metrics['rows_migrated'].labels(243 migration_id=migration_id,244 table_name=migration.table245 ).inc(stats.rows_processed)246247 anomalies = await self.detect_anomalies(migration_id, stats)248 if anomalies:249 await self.handle_anomalies(migration_id, anomalies)250251 await asyncio.sleep(30)252253 async def detect_anomalies(self, migration_id, stats):254 anomalies = []255256 if stats.rows_per_second < stats.expected_rows_per_second * 0.5:257 anomalies.append({258 'type': 'low_throughput',259 'severity': 'warning',260 'message': f'Throughput below expected'261 })262263 if stats.error_rate > 0.01:264 anomalies.append({265 'type': 'high_error_rate',266 'severity': 'critical',267 'message': f'Error rate exceeds threshold'268 })269270 return anomalies271272 async def setup_migration_dashboard(self):273 dashboard_config = {274 "dashboard": {275 "title": "Database Migration Monitoring",276 "panels": [277 {278 "title": "Migration Progress",279 "targets": [{280 "expr": "rate(migration_rows_total[5m])"281 }]282 },283 {284 "title": "Data Lag",285 "targets": [{286 "expr": "migration_data_lag_seconds"287 }]288 }289 ]290 }291 }292293 response = requests.post(294 f"{self.config['grafana_url']}/api/dashboards/db",295 json=dashboard_config,296 headers={'Authorization': f"Bearer {self.config['grafana_token']}"}297 )298299class AlertingSystem:300 def __init__(self, config):301 self.config = config302303 async def send_alert(self, title, message, severity, **kwargs):304 if 'slack' in self.config:305 await self.send_slack_alert(title, message, severity)306307 if 'email' in self.config:308 await self.send_email_alert(title, message, severity)309310 async def send_slack_alert(self, title, message, severity):311 color = {312 'critical': 'danger',313 'warning': 'warning',314 'info': 'good'315 }.get(severity, 'warning')316317 payload = {318 'text': title,319 'attachments': [{320 'color': color,321 'text': message322 }]323 }324325 requests.post(self.config['slack']['webhook_url'], json=payload)326```327328### 4. Grafana Dashboard Configuration329330```python331dashboard_panels = [332 {333 "id": 1,334 "title": "Migration Progress",335 "type": "graph",336 "targets": [{337 "expr": "rate(migration_rows_total[5m])",338 "legendFormat": "{{migration_id}} - {{table_name}}"339 }]340 },341 {342 "id": 2,343 "title": "Data Lag",344 "type": "stat",345 "targets": [{346 "expr": "migration_data_lag_seconds"347 }],348 "fieldConfig": {349 "thresholds": {350 "steps": [351 {"value": 0, "color": "green"},352 {"value": 60, "color": "yellow"},353 {"value": 300, "color": "red"}354 ]355 }356 }357 },358 {359 "id": 3,360 "title": "Error Rate",361 "type": "graph",362 "targets": [{363 "expr": "rate(migration_errors_total[5m])"364 }]365 }366]367```368369### 5. CI/CD Integration370371```yaml372name: Migration Monitoring373374on:375 push:376 branches: [main]377378jobs:379 monitor-migration:380 runs-on: ubuntu-latest381382 steps:383 - uses: actions/checkout@v4384385 - name: Start Monitoring386 run: |387 python migration_monitor.py start \388 --migration-id ${{ github.sha }} \389 --prometheus-url ${{ secrets.PROMETHEUS_URL }}390391 - name: Run Migration392 run: |393 python migrate.py --environment production394395 - name: Check Migration Health396 run: |397 python migration_monitor.py check \398 --migration-id ${{ github.sha }} \399 --max-lag 300400```401402## Output Format4034041. **Observable MongoDB Migrations**: Atlas framework with metrics and validation4052. **CDC Pipeline with Monitoring**: Debezium integration with Kafka4063. **Enterprise Metrics Collection**: Prometheus instrumentation4074. **Anomaly Detection**: Statistical analysis4085. **Multi-channel Alerting**: Email, Slack, PagerDuty integrations4096. **Grafana Dashboard Automation**: Programmatic dashboard creation4107. **Replication Lag Tracking**: Source-to-target lag monitoring4118. **Health Check Systems**: Continuous pipeline monitoring412413Focus on real-time visibility, proactive alerting, and comprehensive observability for zero-downtime migrations.414415## Cross-Plugin Integration416417This plugin integrates with:418- **sql-migrations**: Provides observability for SQL migrations419- **nosql-migrations**: Monitors NoSQL transformations420- **migration-integration**: Coordinates monitoring across workflows421
Full transparency — inspect the skill content before installing.