|
Add this skill
npx mdskills install sickn33/azure-servicebus-pyComprehensive Azure Service Bus reference with async/sync patterns, message settlement, and sessions
1---2name: azure-servicebus-py3description: |4 Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.5 Triggers: "service bus", "ServiceBusClient", "queue", "topic", "subscription", "message broker".6package: azure-servicebus7---89# Azure Service Bus SDK for Python1011Enterprise messaging for reliable cloud communication with queues and pub/sub topics.1213## Installation1415```bash16pip install azure-servicebus azure-identity17```1819## Environment Variables2021```bash22SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net23SERVICEBUS_QUEUE_NAME=myqueue24SERVICEBUS_TOPIC_NAME=mytopic25SERVICEBUS_SUBSCRIPTION_NAME=mysubscription26```2728## Authentication2930```python31from azure.identity import DefaultAzureCredential32from azure.servicebus import ServiceBusClient3334credential = DefaultAzureCredential()35namespace = "<namespace>.servicebus.windows.net"3637client = ServiceBusClient(38 fully_qualified_namespace=namespace,39 credential=credential40)41```4243## Client Types4445| Client | Purpose | Get From |46|--------|---------|----------|47| `ServiceBusClient` | Connection management | Direct instantiation |48| `ServiceBusSender` | Send messages | `client.get_queue_sender()` / `get_topic_sender()` |49| `ServiceBusReceiver` | Receive messages | `client.get_queue_receiver()` / `get_subscription_receiver()` |5051## Send Messages (Async)5253```python54import asyncio55from azure.servicebus.aio import ServiceBusClient56from azure.servicebus import ServiceBusMessage57from azure.identity.aio import DefaultAzureCredential5859async def send_messages():60 credential = DefaultAzureCredential()6162 async with ServiceBusClient(63 fully_qualified_namespace="<namespace>.servicebus.windows.net",64 credential=credential65 ) as client:66 sender = client.get_queue_sender(queue_name="myqueue")6768 async with sender:69 # Single message70 message = ServiceBusMessage("Hello, Service Bus!")71 await sender.send_messages(message)7273 # Batch of messages74 messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]75 await sender.send_messages(messages)7677 # Message batch (for size control)78 batch = await sender.create_message_batch()79 for i in range(100):80 try:81 batch.add_message(ServiceBusMessage(f"Batch message {i}"))82 except ValueError: # Batch full83 await sender.send_messages(batch)84 batch = await sender.create_message_batch()85 batch.add_message(ServiceBusMessage(f"Batch message {i}"))86 await sender.send_messages(batch)8788asyncio.run(send_messages())89```9091## Receive Messages (Async)9293```python94async def receive_messages():95 credential = DefaultAzureCredential()9697 async with ServiceBusClient(98 fully_qualified_namespace="<namespace>.servicebus.windows.net",99 credential=credential100 ) as client:101 receiver = client.get_queue_receiver(queue_name="myqueue")102103 async with receiver:104 # Receive batch105 messages = await receiver.receive_messages(106 max_message_count=10,107 max_wait_time=5 # seconds108 )109110 for msg in messages:111 print(f"Received: {str(msg)}")112 await receiver.complete_message(msg) # Remove from queue113114asyncio.run(receive_messages())115```116117## Receive Modes118119| Mode | Behavior | Use Case |120|------|----------|----------|121| `PEEK_LOCK` (default) | Message locked, must complete/abandon | Reliable processing |122| `RECEIVE_AND_DELETE` | Removed immediately on receive | At-most-once delivery |123124```python125from azure.servicebus import ServiceBusReceiveMode126127receiver = client.get_queue_receiver(128 queue_name="myqueue",129 receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE130)131```132133## Message Settlement134135```python136async with receiver:137 messages = await receiver.receive_messages(max_message_count=1)138139 for msg in messages:140 try:141 # Process message...142 await receiver.complete_message(msg) # Success - remove from queue143 except ProcessingError:144 await receiver.abandon_message(msg) # Retry later145 except PermanentError:146 await receiver.dead_letter_message(147 msg,148 reason="ProcessingFailed",149 error_description="Could not process"150 )151```152153| Action | Effect |154|--------|--------|155| `complete_message()` | Remove from queue (success) |156| `abandon_message()` | Release lock, retry immediately |157| `dead_letter_message()` | Move to dead-letter queue |158| `defer_message()` | Set aside, receive by sequence number |159160## Topics and Subscriptions161162```python163# Send to topic164sender = client.get_topic_sender(topic_name="mytopic")165async with sender:166 await sender.send_messages(ServiceBusMessage("Topic message"))167168# Receive from subscription169receiver = client.get_subscription_receiver(170 topic_name="mytopic",171 subscription_name="mysubscription"172)173async with receiver:174 messages = await receiver.receive_messages(max_message_count=10)175```176177## Sessions (FIFO)178179```python180# Send with session181message = ServiceBusMessage("Session message")182message.session_id = "order-123"183await sender.send_messages(message)184185# Receive from specific session186receiver = client.get_queue_receiver(187 queue_name="session-queue",188 session_id="order-123"189)190191# Receive from next available session192from azure.servicebus import NEXT_AVAILABLE_SESSION193receiver = client.get_queue_receiver(194 queue_name="session-queue",195 session_id=NEXT_AVAILABLE_SESSION196)197```198199## Scheduled Messages200201```python202from datetime import datetime, timedelta, timezone203204message = ServiceBusMessage("Scheduled message")205scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)206207# Schedule message208sequence_number = await sender.schedule_messages(message, scheduled_time)209210# Cancel scheduled message211await sender.cancel_scheduled_messages(sequence_number)212```213214## Dead-Letter Queue215216```python217from azure.servicebus import ServiceBusSubQueue218219# Receive from dead-letter queue220dlq_receiver = client.get_queue_receiver(221 queue_name="myqueue",222 sub_queue=ServiceBusSubQueue.DEAD_LETTER223)224225async with dlq_receiver:226 messages = await dlq_receiver.receive_messages(max_message_count=10)227 for msg in messages:228 print(f"Dead-lettered: {msg.dead_letter_reason}")229 await dlq_receiver.complete_message(msg)230```231232## Sync Client (for simple scripts)233234```python235from azure.servicebus import ServiceBusClient, ServiceBusMessage236from azure.identity import DefaultAzureCredential237238with ServiceBusClient(239 fully_qualified_namespace="<namespace>.servicebus.windows.net",240 credential=DefaultAzureCredential()241) as client:242 with client.get_queue_sender("myqueue") as sender:243 sender.send_messages(ServiceBusMessage("Sync message"))244245 with client.get_queue_receiver("myqueue") as receiver:246 for msg in receiver:247 print(str(msg))248 receiver.complete_message(msg)249```250251## Best Practices2522531. **Use async client** for production workloads2542. **Use context managers** (`async with`) for proper cleanup2553. **Complete messages** after successful processing2564. **Use dead-letter queue** for poison messages2575. **Use sessions** for ordered, FIFO processing2586. **Use message batches** for high-throughput scenarios2597. **Set `max_wait_time`** to avoid infinite blocking260261## Reference Files262263| File | Contents |264|------|----------|265| [references/patterns.md](references/patterns.md) | Competing consumers, sessions, retry patterns, request-response, transactions |266| [references/dead-letter.md](references/dead-letter.md) | DLQ handling, poison messages, reprocessing strategies |267| [scripts/setup_servicebus.py](scripts/setup_servicebus.py) | CLI for queue/topic/subscription management and DLQ monitoring |268
Full transparency — inspect the skill content before installing.