|
Add this skill
npx mdskills install sickn33/azure-eventhub-pyComprehensive Azure Event Hubs reference with clear examples for producers, consumers, and checkpointing
1---2name: azure-eventhub-py3description: |4 Azure Event Hubs SDK for Python streaming. Use for high-throughput event ingestion, producers, consumers, and checkpointing.5 Triggers: "event hubs", "EventHubProducerClient", "EventHubConsumerClient", "streaming", "partitions".6package: azure-eventhub7---89# Azure Event Hubs SDK for Python1011Big data streaming platform for high-throughput event ingestion.1213## Installation1415```bash16pip install azure-eventhub azure-identity17# For checkpointing with blob storage18pip install azure-eventhub-checkpointstoreblob-aio19```2021## Environment Variables2223```bash24EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net25EVENT_HUB_NAME=my-eventhub26STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net27CHECKPOINT_CONTAINER=checkpoints28```2930## Authentication3132```python33from azure.identity import DefaultAzureCredential34from azure.eventhub import EventHubProducerClient, EventHubConsumerClient3536credential = DefaultAzureCredential()37namespace = "<namespace>.servicebus.windows.net"38eventhub_name = "my-eventhub"3940# Producer41producer = EventHubProducerClient(42 fully_qualified_namespace=namespace,43 eventhub_name=eventhub_name,44 credential=credential45)4647# Consumer48consumer = EventHubConsumerClient(49 fully_qualified_namespace=namespace,50 eventhub_name=eventhub_name,51 consumer_group="$Default",52 credential=credential53)54```5556## Client Types5758| Client | Purpose |59|--------|---------|60| `EventHubProducerClient` | Send events to Event Hub |61| `EventHubConsumerClient` | Receive events from Event Hub |62| `BlobCheckpointStore` | Track consumer progress |6364## Send Events6566```python67from azure.eventhub import EventHubProducerClient, EventData68from azure.identity import DefaultAzureCredential6970producer = EventHubProducerClient(71 fully_qualified_namespace="<namespace>.servicebus.windows.net",72 eventhub_name="my-eventhub",73 credential=DefaultAzureCredential()74)7576with producer:77 # Create batch (handles size limits)78 event_data_batch = producer.create_batch()7980 for i in range(10):81 try:82 event_data_batch.add(EventData(f"Event {i}"))83 except ValueError:84 # Batch is full, send and create new one85 producer.send_batch(event_data_batch)86 event_data_batch = producer.create_batch()87 event_data_batch.add(EventData(f"Event {i}"))8889 # Send remaining90 producer.send_batch(event_data_batch)91```9293### Send to Specific Partition9495```python96# By partition ID97event_data_batch = producer.create_batch(partition_id="0")9899# By partition key (consistent hashing)100event_data_batch = producer.create_batch(partition_key="user-123")101```102103## Receive Events104105### Simple Receive106107```python108from azure.eventhub import EventHubConsumerClient109110def on_event(partition_context, event):111 print(f"Partition: {partition_context.partition_id}")112 print(f"Data: {event.body_as_str()}")113 partition_context.update_checkpoint(event)114115consumer = EventHubConsumerClient(116 fully_qualified_namespace="<namespace>.servicebus.windows.net",117 eventhub_name="my-eventhub",118 consumer_group="$Default",119 credential=DefaultAzureCredential()120)121122with consumer:123 consumer.receive(124 on_event=on_event,125 starting_position="-1", # Beginning of stream126 )127```128129### With Blob Checkpoint Store (Production)130131```python132from azure.eventhub import EventHubConsumerClient133from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore134from azure.identity import DefaultAzureCredential135136checkpoint_store = BlobCheckpointStore(137 blob_account_url="https://<account>.blob.core.windows.net",138 container_name="checkpoints",139 credential=DefaultAzureCredential()140)141142consumer = EventHubConsumerClient(143 fully_qualified_namespace="<namespace>.servicebus.windows.net",144 eventhub_name="my-eventhub",145 consumer_group="$Default",146 credential=DefaultAzureCredential(),147 checkpoint_store=checkpoint_store148)149150def on_event(partition_context, event):151 print(f"Received: {event.body_as_str()}")152 # Checkpoint after processing153 partition_context.update_checkpoint(event)154155with consumer:156 consumer.receive(on_event=on_event)157```158159## Async Client160161```python162from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient163from azure.identity.aio import DefaultAzureCredential164import asyncio165166async def send_events():167 credential = DefaultAzureCredential()168169 async with EventHubProducerClient(170 fully_qualified_namespace="<namespace>.servicebus.windows.net",171 eventhub_name="my-eventhub",172 credential=credential173 ) as producer:174 batch = await producer.create_batch()175 batch.add(EventData("Async event"))176 await producer.send_batch(batch)177178async def receive_events():179 async def on_event(partition_context, event):180 print(event.body_as_str())181 await partition_context.update_checkpoint(event)182183 async with EventHubConsumerClient(184 fully_qualified_namespace="<namespace>.servicebus.windows.net",185 eventhub_name="my-eventhub",186 consumer_group="$Default",187 credential=DefaultAzureCredential()188 ) as consumer:189 await consumer.receive(on_event=on_event)190191asyncio.run(send_events())192```193194## Event Properties195196```python197event = EventData("My event body")198199# Set properties200event.properties = {"custom_property": "value"}201event.content_type = "application/json"202203# Read properties (on receive)204print(event.body_as_str())205print(event.sequence_number)206print(event.offset)207print(event.enqueued_time)208print(event.partition_key)209```210211## Get Event Hub Info212213```python214with producer:215 info = producer.get_eventhub_properties()216 print(f"Name: {info['name']}")217 print(f"Partitions: {info['partition_ids']}")218219 for partition_id in info['partition_ids']:220 partition_info = producer.get_partition_properties(partition_id)221 print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")222```223224## Best Practices2252261. **Use batches** for sending multiple events2272. **Use checkpoint store** in production for reliable processing2283. **Use async client** for high-throughput scenarios2294. **Use partition keys** for ordered delivery within a partition2305. **Handle batch size limits** — catch ValueError when batch is full2316. **Use context managers** (`with`/`async with`) for proper cleanup2327. **Set appropriate consumer groups** for different applications233234## Reference Files235236| File | Contents |237|------|----------|238| [references/checkpointing.md](references/checkpointing.md) | Checkpoint store patterns, blob checkpointing, checkpoint strategies |239| [references/partitions.md](references/partitions.md) | Partition management, load balancing, starting positions |240| [scripts/setup_consumer.py](scripts/setup_consumer.py) | CLI for Event Hub info, consumer setup, and event sending/receiving |241
Full transparency — inspect the skill content before installing.