Build event streaming applications using Azure Event Hubs SDK for JavaScript (@azure/event-hubs). Use when implementing high-throughput event ingestion, real-time analytics, IoT telemetry, or event-driven architectures with partitioned consumers.
Add this skill
npx mdskills install sickn33/azure-eventhub-tsComprehensive SDK reference with excellent code examples and production patterns
1---2name: azure-eventhub-ts3description: Build event streaming applications using Azure Event Hubs SDK for JavaScript (@azure/event-hubs). Use when implementing high-throughput event ingestion, real-time analytics, IoT telemetry, or event-driven architectures with partitioned consumers.4package: "@azure/event-hubs"5---67# Azure Event Hubs SDK for TypeScript89High-throughput event streaming and real-time data ingestion.1011## Installation1213```bash14npm install @azure/event-hubs @azure/identity15```1617For checkpointing with consumer groups:18```bash19npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob20```2122## Environment Variables2324```bash25EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net26EVENTHUB_NAME=my-eventhub27STORAGE_ACCOUNT_NAME=<storage-account>28STORAGE_CONTAINER_NAME=checkpoints29```3031## Authentication3233```typescript34import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";35import { DefaultAzureCredential } from "@azure/identity";3637const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;38const eventHubName = process.env.EVENTHUB_NAME!;39const credential = new DefaultAzureCredential();4041// Producer42const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);4344// Consumer45const consumer = new EventHubConsumerClient(46 "$Default", // Consumer group47 fullyQualifiedNamespace,48 eventHubName,49 credential50);51```5253## Core Workflow5455### Send Events5657```typescript58const producer = new EventHubProducerClient(namespace, eventHubName, credential);5960// Create batch and add events61const batch = await producer.createBatch();62batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });63batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });6465await producer.sendBatch(batch);66await producer.close();67```6869### Send to Specific Partition7071```typescript72// By partition ID73const batch = await producer.createBatch({ partitionId: "0" });7475// By partition key (consistent hashing)76const batch = await producer.createBatch({ partitionKey: "device-123" });77```7879### Receive Events (Simple)8081```typescript82const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);8384const subscription = consumer.subscribe({85 processEvents: async (events, context) => {86 for (const event of events) {87 console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);88 }89 },90 processError: async (err, context) => {91 console.error(`Error on partition ${context.partitionId}: ${err.message}`);92 },93});9495// Stop after some time96setTimeout(async () => {97 await subscription.close();98 await consumer.close();99}, 60000);100```101102### Receive with Checkpointing (Production)103104```typescript105import { EventHubConsumerClient } from "@azure/event-hubs";106import { ContainerClient } from "@azure/storage-blob";107import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";108109const containerClient = new ContainerClient(110 `https://${storageAccount}.blob.core.windows.net/${containerName}`,111 credential112);113114const checkpointStore = new BlobCheckpointStore(containerClient);115116const consumer = new EventHubConsumerClient(117 "$Default",118 namespace,119 eventHubName,120 credential,121 checkpointStore122);123124const subscription = consumer.subscribe({125 processEvents: async (events, context) => {126 for (const event of events) {127 console.log(`Processing: ${JSON.stringify(event.body)}`);128 }129 // Checkpoint after processing batch130 if (events.length > 0) {131 await context.updateCheckpoint(events[events.length - 1]);132 }133 },134 processError: async (err, context) => {135 console.error(`Error: ${err.message}`);136 },137});138```139140### Receive from Specific Position141142```typescript143const subscription = consumer.subscribe({144 processEvents: async (events, context) => { /* ... */ },145 processError: async (err, context) => { /* ... */ },146}, {147 startPosition: {148 // Start from beginning149 "0": { offset: "@earliest" },150 // Start from end (new events only)151 "1": { offset: "@latest" },152 // Start from specific offset153 "2": { offset: "12345" },154 // Start from specific time155 "3": { enqueuedOn: new Date("2024-01-01") },156 },157});158```159160## Event Hub Properties161162```typescript163// Get hub info164const hubProperties = await producer.getEventHubProperties();165console.log(`Partitions: ${hubProperties.partitionIds}`);166167// Get partition info168const partitionProperties = await producer.getPartitionProperties("0");169console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);170```171172## Batch Processing Options173174```typescript175const subscription = consumer.subscribe(176 {177 processEvents: async (events, context) => { /* ... */ },178 processError: async (err, context) => { /* ... */ },179 },180 {181 maxBatchSize: 100, // Max events per batch182 maxWaitTimeInSeconds: 30, // Max wait for batch183 }184);185```186187## Key Types188189```typescript190import {191 EventHubProducerClient,192 EventHubConsumerClient,193 EventData,194 ReceivedEventData,195 PartitionContext,196 Subscription,197 SubscriptionEventHandlers,198 CreateBatchOptions,199 EventPosition,200} from "@azure/event-hubs";201202import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";203```204205## Event Properties206207```typescript208// Send with properties209const batch = await producer.createBatch();210batch.tryAdd({211 body: { data: "payload" },212 properties: {213 eventType: "telemetry",214 deviceId: "sensor-1",215 },216 contentType: "application/json",217 correlationId: "request-123",218});219220// Access in receiver221consumer.subscribe({222 processEvents: async (events, context) => {223 for (const event of events) {224 console.log(`Type: ${event.properties?.eventType}`);225 console.log(`Sequence: ${event.sequenceNumber}`);226 console.log(`Enqueued: ${event.enqueuedTimeUtc}`);227 console.log(`Offset: ${event.offset}`);228 }229 },230});231```232233## Error Handling234235```typescript236consumer.subscribe({237 processEvents: async (events, context) => {238 try {239 for (const event of events) {240 await processEvent(event);241 }242 await context.updateCheckpoint(events[events.length - 1]);243 } catch (error) {244 // Don't checkpoint on error - events will be reprocessed245 console.error("Processing failed:", error);246 }247 },248 processError: async (err, context) => {249 if (err.name === "MessagingError") {250 // Transient error - SDK will retry251 console.warn("Transient error:", err.message);252 } else {253 // Fatal error254 console.error("Fatal error:", err);255 }256 },257});258```259260## Best Practices2612621. **Use checkpointing** - Always checkpoint in production for exactly-once processing2632. **Batch sends** - Use `createBatch()` for efficient sending2643. **Partition keys** - Use partition keys to ensure ordering for related events2654. **Consumer groups** - Use separate consumer groups for different processing pipelines2665. **Handle errors gracefully** - Don't checkpoint on processing failures2676. **Close clients** - Always close producer/consumer when done2687. **Monitor lag** - Track `lastEnqueuedSequenceNumber` vs processed sequence269
Full transparency — inspect the skill content before installing.