Build real-time streaming applications with Azure Event Hubs SDK for Java. Use when implementing event streaming, high-throughput data ingestion, or building event-driven architectures.
Add this skill
npx mdskills install sickn33/azure-eventhub-javaComprehensive SDK reference with production patterns but lacks agent-specific execution logic
1---2name: azure-eventhub-java3description: Build real-time streaming applications with Azure Event Hubs SDK for Java. Use when implementing event streaming, high-throughput data ingestion, or building event-driven architectures.4package: com.azure:azure-messaging-eventhubs5---67# Azure Event Hubs SDK for Java89Build real-time streaming applications using the Azure Event Hubs SDK for Java.1011## Installation1213```xml14<dependency>15 <groupId>com.azure</groupId>16 <artifactId>azure-messaging-eventhubs</artifactId>17 <version>5.19.0</version>18</dependency>1920<!-- For checkpoint store (production) -->21<dependency>22 <groupId>com.azure</groupId>23 <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>24 <version>1.20.0</version>25</dependency>26```2728## Client Creation2930### EventHubProducerClient3132```java33import com.azure.messaging.eventhubs.EventHubProducerClient;34import com.azure.messaging.eventhubs.EventHubClientBuilder;3536// With connection string37EventHubProducerClient producer = new EventHubClientBuilder()38 .connectionString("<connection-string>", "<event-hub-name>")39 .buildProducerClient();4041// Full connection string with EntityPath42EventHubProducerClient producer = new EventHubClientBuilder()43 .connectionString("<connection-string-with-entity-path>")44 .buildProducerClient();45```4647### With DefaultAzureCredential4849```java50import com.azure.identity.DefaultAzureCredentialBuilder;5152EventHubProducerClient producer = new EventHubClientBuilder()53 .fullyQualifiedNamespace("<namespace>.servicebus.windows.net")54 .eventHubName("<event-hub-name>")55 .credential(new DefaultAzureCredentialBuilder().build())56 .buildProducerClient();57```5859### EventHubConsumerClient6061```java62import com.azure.messaging.eventhubs.EventHubConsumerClient;6364EventHubConsumerClient consumer = new EventHubClientBuilder()65 .connectionString("<connection-string>", "<event-hub-name>")66 .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)67 .buildConsumerClient();68```6970### Async Clients7172```java73import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;74import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;7576EventHubProducerAsyncClient asyncProducer = new EventHubClientBuilder()77 .connectionString("<connection-string>", "<event-hub-name>")78 .buildAsyncProducerClient();7980EventHubConsumerAsyncClient asyncConsumer = new EventHubClientBuilder()81 .connectionString("<connection-string>", "<event-hub-name>")82 .consumerGroup("$Default")83 .buildAsyncConsumerClient();84```8586## Core Patterns8788### Send Single Event8990```java91import com.azure.messaging.eventhubs.EventData;9293EventData eventData = new EventData("Hello, Event Hubs!");94producer.send(Collections.singletonList(eventData));95```9697### Send Event Batch9899```java100import com.azure.messaging.eventhubs.EventDataBatch;101import com.azure.messaging.eventhubs.models.CreateBatchOptions;102103// Create batch104EventDataBatch batch = producer.createBatch();105106// Add events (returns false if batch is full)107for (int i = 0; i < 100; i++) {108 EventData event = new EventData("Event " + i);109 if (!batch.tryAdd(event)) {110 // Batch is full, send and create new batch111 producer.send(batch);112 batch = producer.createBatch();113 batch.tryAdd(event);114 }115}116117// Send remaining events118if (batch.getCount() > 0) {119 producer.send(batch);120}121```122123### Send to Specific Partition124125```java126CreateBatchOptions options = new CreateBatchOptions()127 .setPartitionId("0");128129EventDataBatch batch = producer.createBatch(options);130batch.tryAdd(new EventData("Partition 0 event"));131producer.send(batch);132```133134### Send with Partition Key135136```java137CreateBatchOptions options = new CreateBatchOptions()138 .setPartitionKey("customer-123");139140EventDataBatch batch = producer.createBatch(options);141batch.tryAdd(new EventData("Customer event"));142producer.send(batch);143```144145### Event with Properties146147```java148EventData event = new EventData("Order created");149event.getProperties().put("orderId", "ORD-123");150event.getProperties().put("customerId", "CUST-456");151event.getProperties().put("priority", 1);152153producer.send(Collections.singletonList(event));154```155156### Receive Events (Simple)157158```java159import com.azure.messaging.eventhubs.models.EventPosition;160import com.azure.messaging.eventhubs.models.PartitionEvent;161162// Receive from specific partition163Iterable<PartitionEvent> events = consumer.receiveFromPartition(164 "0", // partitionId165 10, // maxEvents166 EventPosition.earliest(), // startingPosition167 Duration.ofSeconds(30) // timeout168);169170for (PartitionEvent partitionEvent : events) {171 EventData event = partitionEvent.getData();172 System.out.println("Body: " + event.getBodyAsString());173 System.out.println("Sequence: " + event.getSequenceNumber());174 System.out.println("Offset: " + event.getOffset());175}176```177178### EventProcessorClient (Production)179180```java181import com.azure.messaging.eventhubs.EventProcessorClient;182import com.azure.messaging.eventhubs.EventProcessorClientBuilder;183import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;184import com.azure.storage.blob.BlobContainerAsyncClient;185import com.azure.storage.blob.BlobContainerClientBuilder;186187// Create checkpoint store188BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder()189 .connectionString("<storage-connection-string>")190 .containerName("checkpoints")191 .buildAsyncClient();192193// Create processor194EventProcessorClient processor = new EventProcessorClientBuilder()195 .connectionString("<eventhub-connection-string>", "<event-hub-name>")196 .consumerGroup("$Default")197 .checkpointStore(new BlobCheckpointStore(blobClient))198 .processEvent(eventContext -> {199 EventData event = eventContext.getEventData();200 System.out.println("Processing: " + event.getBodyAsString());201202 // Checkpoint after processing203 eventContext.updateCheckpoint();204 })205 .processError(errorContext -> {206 System.err.println("Error: " + errorContext.getThrowable().getMessage());207 System.err.println("Partition: " + errorContext.getPartitionContext().getPartitionId());208 })209 .buildEventProcessorClient();210211// Start processing212processor.start();213214// Keep running...215Thread.sleep(Duration.ofMinutes(5).toMillis());216217// Stop gracefully218processor.stop();219```220221### Batch Processing222223```java224EventProcessorClient processor = new EventProcessorClientBuilder()225 .connectionString("<connection-string>", "<event-hub-name>")226 .consumerGroup("$Default")227 .checkpointStore(new BlobCheckpointStore(blobClient))228 .processEventBatch(eventBatchContext -> {229 List<EventData> events = eventBatchContext.getEvents();230 System.out.printf("Received %d events%n", events.size());231232 for (EventData event : events) {233 // Process each event234 System.out.println(event.getBodyAsString());235 }236237 // Checkpoint after batch238 eventBatchContext.updateCheckpoint();239 }, 50) // maxBatchSize240 .processError(errorContext -> {241 System.err.println("Error: " + errorContext.getThrowable());242 })243 .buildEventProcessorClient();244```245246### Async Receiving247248```java249asyncConsumer.receiveFromPartition("0", EventPosition.latest())250 .subscribe(251 partitionEvent -> {252 EventData event = partitionEvent.getData();253 System.out.println("Received: " + event.getBodyAsString());254 },255 error -> System.err.println("Error: " + error),256 () -> System.out.println("Complete")257 );258```259260### Get Event Hub Properties261262```java263// Get hub info264EventHubProperties hubProps = producer.getEventHubProperties();265System.out.println("Hub: " + hubProps.getName());266System.out.println("Partitions: " + hubProps.getPartitionIds());267268// Get partition info269PartitionProperties partitionProps = producer.getPartitionProperties("0");270System.out.println("Begin sequence: " + partitionProps.getBeginningSequenceNumber());271System.out.println("Last sequence: " + partitionProps.getLastEnqueuedSequenceNumber());272System.out.println("Last offset: " + partitionProps.getLastEnqueuedOffset());273```274275## Event Positions276277```java278// Start from beginning279EventPosition.earliest()280281// Start from end (new events only)282EventPosition.latest()283284// From specific offset285EventPosition.fromOffset(12345L)286287// From specific sequence number288EventPosition.fromSequenceNumber(100L)289290// From specific time291EventPosition.fromEnqueuedTime(Instant.now().minus(Duration.ofHours(1)))292```293294## Error Handling295296```java297import com.azure.messaging.eventhubs.models.ErrorContext;298299.processError(errorContext -> {300 Throwable error = errorContext.getThrowable();301 String partitionId = errorContext.getPartitionContext().getPartitionId();302303 if (error instanceof AmqpException) {304 AmqpException amqpError = (AmqpException) error;305 if (amqpError.isTransient()) {306 System.out.println("Transient error, will retry");307 }308 }309310 System.err.printf("Error on partition %s: %s%n", partitionId, error.getMessage());311})312```313314## Resource Cleanup315316```java317// Always close clients318try {319 producer.send(batch);320} finally {321 producer.close();322}323324// Or use try-with-resources325try (EventHubProducerClient producer = new EventHubClientBuilder()326 .connectionString(connectionString, eventHubName)327 .buildProducerClient()) {328 producer.send(events);329}330```331332## Environment Variables333334```bash335EVENT_HUBS_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...336EVENT_HUBS_NAME=<event-hub-name>337STORAGE_CONNECTION_STRING=<for-checkpointing>338```339340## Best Practices3413421. **Use EventProcessorClient**: For production, provides load balancing and checkpointing3432. **Batch Events**: Use `EventDataBatch` for efficient sending3443. **Partition Keys**: Use for ordering guarantees within a partition3454. **Checkpointing**: Checkpoint after processing to avoid reprocessing3465. **Error Handling**: Handle transient errors with retries3476. **Close Clients**: Always close producer/consumer when done348349## Trigger Phrases350351- "Event Hubs Java"352- "event streaming Azure"353- "real-time data ingestion"354- "EventProcessorClient"355- "event hub producer consumer"356- "partition processing"357
Full transparency — inspect the skill content before installing.