แก้ไข

แชร์ผ่าน


Use Java to send events to or receive events from Azure Event Hubs

This quickstart shows how to send events to and receive events from an event hub using the azure-messaging-eventhubs Java package.

Tip

If you're working with Azure Event Hubs resources in a Spring application, we recommend that you consider Spring Cloud Azure as an alternative. Spring Cloud Azure is an open-source project that provides seamless Spring integration with Azure services. To learn more about Spring Cloud Azure, and to see an example using Event Hubs, see Spring Cloud Stream with Azure Event Hubs.

Prerequisites

If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

To complete this quickstart, you need the following prerequisites:

  • Microsoft Azure subscription. To use Azure services, including Azure Event Hubs, you need a subscription. If you don't have an existing Azure account, you can sign up for a free trial or use your MSDN subscriber benefits when you create an account.
  • A Java development environment. This quickstart uses Eclipse. Java Development Kit (JDK) with version 8 or above is required.
  • Create an Event Hubs namespace and an event hub. The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. To create a namespace and an event hub, follow the procedure in this article. Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. You use the connection string later in this quickstart.

Send events

This section shows you how to create a Java application to send events an event hub.

Add reference to Azure Event Hubs library

First, create a new Maven project for a console/shell application in your favorite Java development environment. Update the pom.xml file as follows. The Java client library for Event Hubs is available in the Maven Central Repository.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Note

Update the version to the latest version published to the Maven repository.

Authenticate the app to Azure

This quickstart shows you two ways of connecting to Azure Event Hubs: passwordless and connection string. The first option shows you how to use your security principal in Microsoft Entra ID and role-based access control (RBAC) to connect to an Event Hubs namespace. You don't need to worry about having hard-coded connection strings in your code or in a configuration file or in a secure storage like Azure Key Vault. The second option shows you how to use a connection string to connect to an Event Hubs namespace. If you're new to Azure, you may find the connection string option easier to follow. We recommend using the passwordless option in real-world applications and production environments. For more information, see Authentication and authorization. You can also read more about passwordless authentication on the overview page.

Assign roles to your Microsoft Entra user

When developing locally, make sure that the user account that connects to Azure Event Hubs has the correct permissions. You'll need the Azure Event Hubs Data Owner role in order to send and receive messages. To assign yourself this role, you'll need the User Access Administrator role, or another role that includes the Microsoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. Learn more about the available scopes for role assignments on the scope overview page.

The following example assigns the Azure Event Hubs Data Owner role to your user account, which provides full access to Azure Event Hubs resources. In a real scenario, follow the Principle of Least Privilege to give users only the minimum permissions needed for a more secure production environment.

Azure built-in roles for Azure Event Hubs

For Azure Event Hubs, the management of namespaces and all related resources through the Azure portal and the Azure resource management API is already protected using the Azure RBAC model. Azure provides the below Azure built-in roles for authorizing access to an Event Hubs namespace:

If you want to create a custom role, see Rights required for Event Hubs operations.

Important

In most cases, it will take a minute or two for the role assignment to propagate in Azure. In rare cases, it may take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.

  1. In the Azure portal, locate your Event Hubs namespace using the main search bar or left navigation.

  2. On the overview page, select Access control (IAM) from the left-hand menu.

  3. On the Access control (IAM) page, select the Role assignments tab.

  4. Select + Add from the top menu and then Add role assignment from the resulting drop-down menu.

    A screenshot showing how to assign a role.

  5. Use the search box to filter the results to the desired role. For this example, search for Azure Event Hubs Data Owner and select the matching result. Then choose Next.

  6. Under Assign access to, select User, group, or service principal, and then choose + Select members.

  7. In the dialog, search for your Microsoft Entra username (usually your user@domain email address) and then choose Select at the bottom of the dialog.

  8. Select Review + assign to go to the final page, and then Review + assign again to complete the process.

Write code to send messages to the event hub

Add a class named Sender, and add the following code to the class:

Important

  • Update <NAMESPACE NAME> with the name of your Event Hubs namespace.
  • Update <EVENT HUB NAME> with the name of your event hub.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Build the program, and ensure that there are no errors. You'll run this program after you run the receiver program.

Receive events

The code in this tutorial is based on the EventProcessorClient sample on GitHub, which you can examine to see the full working application.

Follow these recommendations when using Azure Blob Storage as a checkpoint store:

  • Use a separate container for each consumer group. You can use the same storage account, but use one container per each group.
  • Don't use the container for anything else, and don't use the storage account for anything else.
  • Storage account should be in the same region as the deployed application is located in. If the application is on-premises, try to choose the closest region possible.

On the Storage account page in the Azure portal, in the Blob service section, ensure that the following settings are disabled.

  • Hierarchical namespace
  • Blob soft delete
  • Versioning

Create an Azure Storage and a blob container

In this quickstart, you use Azure Storage (specifically, Blob Storage) as the checkpoint store. Checkpointing is a process by which an event processor marks or commits the position of the last successfully processed event within a partition. Marking a checkpoint is typically done within the function that processes the events. To learn more about checkpointing, see Event processor.

Follow these steps to create an Azure Storage account.

  1. Create an Azure Storage account
  2. Create a blob container
  3. Authenticate to the blob container

When developing locally, make sure that the user account that is accessing blob data has the correct permissions. You'll need Storage Blob Data Contributor to read and write blob data. To assign yourself this role, you'll need to be assigned the User Access Administrator role, or another role that includes the Microsoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. You can learn more about the available scopes for role assignments on the scope overview page.

In this scenario, you'll assign permissions to your user account, scoped to the storage account, to follow the Principle of Least Privilege. This practice gives users only the minimum permissions needed and creates more secure production environments.

The following example will assign the Storage Blob Data Contributor role to your user account, which provides both read and write access to blob data in your storage account.

Important

In most cases it will take a minute or two for the role assignment to propagate in Azure, but in rare cases it may take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.

  1. In the Azure portal, locate your storage account using the main search bar or left navigation.

  2. On the storage account overview page, select Access control (IAM) from the left-hand menu.

  3. On the Access control (IAM) page, select the Role assignments tab.

  4. Select + Add from the top menu and then Add role assignment from the resulting drop-down menu.

    A screenshot showing how to assign a storage account role.

  5. Use the search box to filter the results to the desired role. For this example, search for Storage Blob Data Contributor and select the matching result and then choose Next.

  6. Under Assign access to, select User, group, or service principal, and then choose + Select members.

  7. In the dialog, search for your Microsoft Entra username (usually your user@domain email address) and then choose Select at the bottom of the dialog.

  8. Select Review + assign to go to the final page, and then Review + assign again to complete the process.

Add Event Hubs libraries to your Java project

Add the following dependencies in the pom.xml file.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Add the following import statements at the top of the Java file.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Create a class named Receiver, and add the following string variables to the class. Replace the placeholders with the correct values.

    Important

    Replace the placeholders with the correct values.

    • <NAMESPACE NAME> with the name of your Event Hubs namespace.
    • <EVENT HUB NAME> with the name of your event hub in the namespace.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Add the following main method to the class.

    Important

    Replace the placeholders with the correct values.

    • <STORAGE ACCOUNT NAME> with the name of your Azure Storage account.
    • <CONTAINER NAME> with the name of the blob container in the storage account
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Add the two helper methods (PARTITION_PROCESSOR and ERROR_HANDLER) that process events and errors to the Receiver class.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Build the program, and ensure that there are no errors.

Run the applications

  1. Run the Receiver application first.

  2. Then, run the Sender application.

  3. In the Receiver application window, confirm that you see the events that were published by the Sender application.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Press ENTER in the receiver application window to stop the application.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Next steps

See the following samples on GitHub: