Build real-time streaming applications with Azure Event Hubs SDK for Java. Use when implementing event streaming, high-throughput data ingestion, or building event-driven architectures.
Add this skill
npx mdskills install sickn33/azure-eventhub-javaComprehensive SDK reference with production patterns but lacks agent-specific execution logic
Build real-time streaming applications using the Azure Event Hubs SDK for Java.
com.azure
azure-messaging-eventhubs
5.19.0
com.azure
azure-messaging-eventhubs-checkpointstore-blob
1.20.0
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
// With connection string
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("", "")
.buildProducerClient();
// Full connection string with EntityPath
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("")
.buildProducerClient();
import com.azure.identity.DefaultAzureCredentialBuilder;
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(".servicebus.windows.net")
.eventHubName("")
.credential(new DefaultAzureCredentialBuilder().build())
.buildProducerClient();
import com.azure.messaging.eventhubs.EventHubConsumerClient;
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString("", "")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
EventHubProducerAsyncClient asyncProducer = new EventHubClientBuilder()
.connectionString("", "")
.buildAsyncProducerClient();
EventHubConsumerAsyncClient asyncConsumer = new EventHubClientBuilder()
.connectionString("", "")
.consumerGroup("$Default")
.buildAsyncConsumerClient();
import com.azure.messaging.eventhubs.EventData;
EventData eventData = new EventData("Hello, Event Hubs!");
producer.send(Collections.singletonList(eventData));
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
// Create batch
EventDataBatch batch = producer.createBatch();
// Add events (returns false if batch is full)
for (int i = 0; i 0) {
producer.send(batch);
}
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Partition 0 event"));
producer.send(batch);
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey("customer-123");
EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Customer event"));
producer.send(batch);
EventData event = new EventData("Order created");
event.getProperties().put("orderId", "ORD-123");
event.getProperties().put("customerId", "CUST-456");
event.getProperties().put("priority", 1);
producer.send(Collections.singletonList(event));
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
// Receive from specific partition
Iterable events = consumer.receiveFromPartition(
"0", // partitionId
10, // maxEvents
EventPosition.earliest(), // startingPosition
Duration.ofSeconds(30) // timeout
);
for (PartitionEvent partitionEvent : events) {
EventData event = partitionEvent.getData();
System.out.println("Body: " + event.getBodyAsString());
System.out.println("Sequence: " + event.getSequenceNumber());
System.out.println("Offset: " + event.getOffset());
}
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
// Create checkpoint store
BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder()
.connectionString("")
.containerName("checkpoints")
.buildAsyncClient();
// Create processor
EventProcessorClient processor = new EventProcessorClientBuilder()
.connectionString("", "")
.consumerGroup("$Default")
.checkpointStore(new BlobCheckpointStore(blobClient))
.processEvent(eventContext -> {
EventData event = eventContext.getEventData();
System.out.println("Processing: " + event.getBodyAsString());
// Checkpoint after processing
eventContext.updateCheckpoint();
})
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable().getMessage());
System.err.println("Partition: " + errorContext.getPartitionContext().getPartitionId());
})
.buildEventProcessorClient();
// Start processing
processor.start();
// Keep running...
Thread.sleep(Duration.ofMinutes(5).toMillis());
// Stop gracefully
processor.stop();
EventProcessorClient processor = new EventProcessorClientBuilder()
.connectionString("", "")
.consumerGroup("$Default")
.checkpointStore(new BlobCheckpointStore(blobClient))
.processEventBatch(eventBatchContext -> {
List events = eventBatchContext.getEvents();
System.out.printf("Received %d events%n", events.size());
for (EventData event : events) {
// Process each event
System.out.println(event.getBodyAsString());
}
// Checkpoint after batch
eventBatchContext.updateCheckpoint();
}, 50) // maxBatchSize
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable());
})
.buildEventProcessorClient();
asyncConsumer.receiveFromPartition("0", EventPosition.latest())
.subscribe(
partitionEvent -> {
EventData event = partitionEvent.getData();
System.out.println("Received: " + event.getBodyAsString());
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
// Get hub info
EventHubProperties hubProps = producer.getEventHubProperties();
System.out.println("Hub: " + hubProps.getName());
System.out.println("Partitions: " + hubProps.getPartitionIds());
// Get partition info
PartitionProperties partitionProps = producer.getPartitionProperties("0");
System.out.println("Begin sequence: " + partitionProps.getBeginningSequenceNumber());
System.out.println("Last sequence: " + partitionProps.getLastEnqueuedSequenceNumber());
System.out.println("Last offset: " + partitionProps.getLastEnqueuedOffset());
// Start from beginning
EventPosition.earliest()
// Start from end (new events only)
EventPosition.latest()
// From specific offset
EventPosition.fromOffset(12345L)
// From specific sequence number
EventPosition.fromSequenceNumber(100L)
// From specific time
EventPosition.fromEnqueuedTime(Instant.now().minus(Duration.ofHours(1)))
import com.azure.messaging.eventhubs.models.ErrorContext;
.processError(errorContext -> {
Throwable error = errorContext.getThrowable();
String partitionId = errorContext.getPartitionContext().getPartitionId();
if (error instanceof AmqpException) {
AmqpException amqpError = (AmqpException) error;
if (amqpError.isTransient()) {
System.out.println("Transient error, will retry");
}
}
System.err.printf("Error on partition %s: %s%n", partitionId, error.getMessage());
})
// Always close clients
try {
producer.send(batch);
} finally {
producer.close();
}
// Or use try-with-resources
try (EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient()) {
producer.send(events);
}
EVENT_HUBS_CONNECTION_STRING=Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=...
EVENT_HUBS_NAME=
STORAGE_CONNECTION_STRING=
EventDataBatch for efficient sendingInstall via CLI
npx mdskills install sickn33/azure-eventhub-javaAzure Eventhub Java is a free, open-source AI agent skill. Build real-time streaming applications with Azure Event Hubs SDK for Java. Use when implementing event streaming, high-throughput data ingestion, or building event-driven architectures.
Install Azure Eventhub Java with a single command:
npx mdskills install sickn33/azure-eventhub-javaThis downloads the skill files into your project and your AI agent picks them up automatically.
Azure Eventhub Java works with Claude Code, Claude Desktop, Cursor, Vscode Copilot, Windsurf, Continue Dev, Codex, Gemini Cli, Amp, Roo Code, Goose, Opencode, Trae, Qodo, Command Code. Skills use the open SKILL.md format which is compatible with any AI coding agent that reads markdown instructions.