|
Add this skill
npx mdskills install sickn33/azure-eventhub-dotnetComprehensive Azure Event Hubs reference with clear examples for sending, receiving, and checkpointing
1---2name: azure-eventhub-dotnet3description: |4 Azure Event Hubs SDK for .NET. Use for high-throughput event streaming: sending events (EventHubProducerClient, EventHubBufferedProducerClient), receiving events (EventProcessorClient with checkpointing), partition management, and real-time data ingestion. Triggers: "Event Hubs", "event streaming", "EventHubProducerClient", "EventProcessorClient", "send events", "receive events", "checkpointing", "partition".5package: Azure.Messaging.EventHubs6---78# Azure.Messaging.EventHubs (.NET)910High-throughput event streaming SDK for sending and receiving events via Azure Event Hubs.1112## Installation1314```bash15# Core package (sending and simple receiving)16dotnet add package Azure.Messaging.EventHubs1718# Processor package (production receiving with checkpointing)19dotnet add package Azure.Messaging.EventHubs.Processor2021# Authentication22dotnet add package Azure.Identity2324# For checkpointing (required by EventProcessorClient)25dotnet add package Azure.Storage.Blobs26```2728**Current Versions**: Azure.Messaging.EventHubs v5.12.2, Azure.Messaging.EventHubs.Processor v5.12.22930## Environment Variables3132```bash33EVENTHUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net34EVENTHUB_NAME=<event-hub-name>3536# For checkpointing (EventProcessorClient)37BLOB_STORAGE_CONNECTION_STRING=<storage-connection-string>38BLOB_CONTAINER_NAME=<checkpoint-container>3940# Alternative: Connection string auth (not recommended for production)41EVENTHUB_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...42```4344## Authentication4546```csharp47using Azure.Identity;48using Azure.Messaging.EventHubs;49using Azure.Messaging.EventHubs.Producer;5051// Always use DefaultAzureCredential for production52var credential = new DefaultAzureCredential();5354var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENTHUB_FULLY_QUALIFIED_NAMESPACE");55var eventHubName = Environment.GetEnvironmentVariable("EVENTHUB_NAME");5657var producer = new EventHubProducerClient(58 fullyQualifiedNamespace,59 eventHubName,60 credential);61```6263**Required RBAC Roles**:64- **Sending**: `Azure Event Hubs Data Sender`65- **Receiving**: `Azure Event Hubs Data Receiver`66- **Both**: `Azure Event Hubs Data Owner`6768## Client Types6970| Client | Purpose | When to Use |71|--------|---------|-------------|72| `EventHubProducerClient` | Send events immediately in batches | Real-time sending, full control over batching |73| `EventHubBufferedProducerClient` | Automatic batching with background sending | High-volume, fire-and-forget scenarios |74| `EventHubConsumerClient` | Simple event reading | Prototyping only, NOT for production |75| `EventProcessorClient` | Production event processing | **Always use this for receiving in production** |7677## Core Workflow7879### 1. Send Events (Batch)8081```csharp82using Azure.Identity;83using Azure.Messaging.EventHubs;84using Azure.Messaging.EventHubs.Producer;8586await using var producer = new EventHubProducerClient(87 fullyQualifiedNamespace,88 eventHubName,89 new DefaultAzureCredential());9091// Create a batch (respects size limits automatically)92using EventDataBatch batch = await producer.CreateBatchAsync();9394// Add events to batch95var events = new[]96{97 new EventData(BinaryData.FromString("{\"id\": 1, \"message\": \"Hello\"}")),98 new EventData(BinaryData.FromString("{\"id\": 2, \"message\": \"World\"}"))99};100101foreach (var eventData in events)102{103 if (!batch.TryAdd(eventData))104 {105 // Batch is full - send it and create a new one106 await producer.SendAsync(batch);107 batch = await producer.CreateBatchAsync();108109 if (!batch.TryAdd(eventData))110 {111 throw new Exception("Event too large for empty batch");112 }113 }114}115116// Send remaining events117if (batch.Count > 0)118{119 await producer.SendAsync(batch);120}121```122123### 2. Send Events (Buffered - High Volume)124125```csharp126using Azure.Messaging.EventHubs.Producer;127128var options = new EventHubBufferedProducerClientOptions129{130 MaximumWaitTime = TimeSpan.FromSeconds(1)131};132133await using var producer = new EventHubBufferedProducerClient(134 fullyQualifiedNamespace,135 eventHubName,136 new DefaultAzureCredential(),137 options);138139// Handle send success/failure140producer.SendEventBatchSucceededAsync += args =>141{142 Console.WriteLine($"Batch sent: {args.EventBatch.Count} events");143 return Task.CompletedTask;144};145146producer.SendEventBatchFailedAsync += args =>147{148 Console.WriteLine($"Batch failed: {args.Exception.Message}");149 return Task.CompletedTask;150};151152// Enqueue events (sent automatically in background)153for (int i = 0; i < 1000; i++)154{155 await producer.EnqueueEventAsync(new EventData($"Event {i}"));156}157158// Flush remaining events before disposing159await producer.FlushAsync();160```161162### 3. Receive Events (Production - EventProcessorClient)163164```csharp165using Azure.Identity;166using Azure.Messaging.EventHubs;167using Azure.Messaging.EventHubs.Consumer;168using Azure.Messaging.EventHubs.Processor;169using Azure.Storage.Blobs;170171// Blob container for checkpointing172var blobClient = new BlobContainerClient(173 Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING"),174 Environment.GetEnvironmentVariable("BLOB_CONTAINER_NAME"));175176await blobClient.CreateIfNotExistsAsync();177178// Create processor179var processor = new EventProcessorClient(180 blobClient,181 EventHubConsumerClient.DefaultConsumerGroup,182 fullyQualifiedNamespace,183 eventHubName,184 new DefaultAzureCredential());185186// Handle events187processor.ProcessEventAsync += async args =>188{189 Console.WriteLine($"Partition: {args.Partition.PartitionId}");190 Console.WriteLine($"Data: {args.Data.EventBody}");191192 // Checkpoint after processing (or batch checkpoints)193 await args.UpdateCheckpointAsync();194};195196// Handle errors197processor.ProcessErrorAsync += args =>198{199 Console.WriteLine($"Error: {args.Exception.Message}");200 Console.WriteLine($"Partition: {args.PartitionId}");201 return Task.CompletedTask;202};203204// Start processing205await processor.StartProcessingAsync();206207// Run until cancelled208await Task.Delay(Timeout.Infinite, cancellationToken);209210// Stop gracefully211await processor.StopProcessingAsync();212```213214### 4. Partition Operations215216```csharp217// Get partition IDs218string[] partitionIds = await producer.GetPartitionIdsAsync();219220// Send to specific partition (use sparingly)221var options = new SendEventOptions222{223 PartitionId = "0"224};225await producer.SendAsync(events, options);226227// Use partition key (recommended for ordering)228var batchOptions = new CreateBatchOptions229{230 PartitionKey = "customer-123" // Events with same key go to same partition231};232using var batch = await producer.CreateBatchAsync(batchOptions);233```234235## EventPosition Options236237Control where to start reading:238239```csharp240// Start from beginning241EventPosition.Earliest242243// Start from end (new events only)244EventPosition.Latest245246// Start from specific offset247EventPosition.FromOffset(12345)248249// Start from specific sequence number250EventPosition.FromSequenceNumber(100)251252// Start from specific time253EventPosition.FromEnqueuedTime(DateTimeOffset.UtcNow.AddHours(-1))254```255256## ASP.NET Core Integration257258```csharp259// Program.cs260using Azure.Identity;261using Azure.Messaging.EventHubs.Producer;262using Microsoft.Extensions.Azure;263264builder.Services.AddAzureClients(clientBuilder =>265{266 clientBuilder.AddEventHubProducerClient(267 builder.Configuration["EventHub:FullyQualifiedNamespace"],268 builder.Configuration["EventHub:Name"]);269270 clientBuilder.UseCredential(new DefaultAzureCredential());271});272273// Inject in controller/service274public class EventService275{276 private readonly EventHubProducerClient _producer;277278 public EventService(EventHubProducerClient producer)279 {280 _producer = producer;281 }282283 public async Task SendAsync(string message)284 {285 using var batch = await _producer.CreateBatchAsync();286 batch.TryAdd(new EventData(message));287 await _producer.SendAsync(batch);288 }289}290```291292## Best Practices2932941. **Use `EventProcessorClient` for receiving** — Never use `EventHubConsumerClient` in production2952. **Checkpoint strategically** — After N events or time interval, not every event2963. **Use partition keys** — For ordering guarantees within a partition2974. **Reuse clients** — Create once, use as singleton (thread-safe)2985. **Use `await using`** — Ensures proper disposal2996. **Handle `ProcessErrorAsync`** — Always register error handler3007. **Batch events** — Use `CreateBatchAsync()` to respect size limits3018. **Use buffered producer** — For high-volume scenarios with automatic batching302303## Error Handling304305```csharp306using Azure.Messaging.EventHubs;307308try309{310 await producer.SendAsync(batch);311}312catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ServiceBusy)313{314 // Retry with backoff315 await Task.Delay(TimeSpan.FromSeconds(5));316}317catch (EventHubsException ex) when (ex.IsTransient)318{319 // Transient error - safe to retry320 Console.WriteLine($"Transient error: {ex.Message}");321}322catch (EventHubsException ex)323{324 // Non-transient error325 Console.WriteLine($"Error: {ex.Reason} - {ex.Message}");326}327```328329## Checkpointing Strategies330331| Strategy | When to Use |332|----------|-------------|333| Every event | Low volume, critical data |334| Every N events | Balanced throughput/reliability |335| Time-based | Consistent checkpoint intervals |336| Batch completion | After processing a logical batch |337338```csharp339// Checkpoint every 100 events340private int _eventCount = 0;341342processor.ProcessEventAsync += async args =>343{344 // Process event...345346 _eventCount++;347 if (_eventCount >= 100)348 {349 await args.UpdateCheckpointAsync();350 _eventCount = 0;351 }352};353```354355## Related SDKs356357| SDK | Purpose | Install |358|-----|---------|---------|359| `Azure.Messaging.EventHubs` | Core sending/receiving | `dotnet add package Azure.Messaging.EventHubs` |360| `Azure.Messaging.EventHubs.Processor` | Production processing | `dotnet add package Azure.Messaging.EventHubs.Processor` |361| `Azure.ResourceManager.EventHubs` | Management plane (create hubs) | `dotnet add package Azure.ResourceManager.EventHubs` |362| `Microsoft.Azure.WebJobs.Extensions.EventHubs` | Azure Functions binding | `dotnet add package Microsoft.Azure.WebJobs.Extensions.EventHubs` |363
Full transparency — inspect the skill content before installing.