Поделиться через


Отправка сообщений в очереди Служебной шины и получение сообщений из них (Java)

В этом кратком руководстве вы создадите приложение Java для отправки сообщений и получения сообщений из очереди Служебная шина Azure.

Примечание.

В этом кратком руководстве содержатся пошаговые инструкции для простого сценария отправки сообщений в очередь Служебной шины и получения сообщений из них. Готовые примеры Java для Служебной шины Azure вы найдете в репозитории пакета SDK Azure для Java на GitHub.

Совет

Если вы работаете с ресурсами Служебная шина Azure в приложении Spring, рекомендуется рассмотреть Azure Spring Cloud как альтернативу. Spring Cloud Azure — это проект с открытым исходным кодом, который обеспечивает простую интеграцию Spring со службами Azure. Дополнительные сведения о Spring Cloud Azure и пример использования служебная шина см. в статье Spring Cloud Stream с Служебная шина Azure.

Необходимые компоненты

Создание пространства имен на портале Azure

Чтобы приступить к использованию сущностей обмена сообщениями в служебной шине в Azure, сначала необходимо создать пространство имен с уникальным для Azure именем. Пространство имен предоставляет контейнер области для служебная шина ресурсов (очередей, тем и т. д.) в приложении.

Создание пространства имен службы:

  1. Войдите на портал Azure.

  2. Перейдите на страницу "Все службы".

  3. На панели навигации слева выберите "Интеграция" из списка категорий, наведите указатель мыши на служебная шина, а затем нажмите + кнопку на плитке служебная шина.

    Изображение: выбор

  4. В теге Основные сведения на странице Создание пространства имен выполните следующие действия:

    1. Выберите подписку Azure, в которой будет создано пространство имен.

    2. Для группы ресурсов выберите существующую группу ресурсов или создайте новую.

    3. Введите имя для пространства имен. В имени пространства имен должны соблюдаться следующие соглашения об именовании:

      • Это имя должно быть уникальным в пределах Azure. Система немедленно проверяет, доступно ли оно.
      • Длина имени составляет не менее 6 и не более 50 символов.
      • Имя может содержать только буквы, цифры, дефисы -.
      • Имя должно начинаться с буквы или цифры и заканчиваться буквой или цифрой.
      • Имя не заканчивается или -mgmtне заканчивается-sb.
    4. Укажите расположение — регион для размещения пространства имен.

    5. Для параметра Ценовая категория выберите ценовую категорию ("Базовый", "Стандартный" или "Премиум") для пространства имен. Для работы с этим кратким руководством выберите вариант Стандартный.

    6. Если выбрать уровень "Премиум" , выберите, можно ли включить георепликацию для пространства имен. Функция георепликации гарантирует, что метаданные и данные пространства имен постоянно реплицируются из основного региона в один или несколько дополнительных регионов.

      Внимание

      Чтобы использовать разделы и подписки, выберите категорию "Стандартный" или "Премиум". Разделы и подписки не поддерживаются в ценовой категории "Базовый".

      Если выбрана ценовая категория Премиум, укажите число единиц обмена сообщениями. В категории "Премиум" обеспечивается изоляция ресурсов на уровне ЦП и памяти, так что рабочая нагрузка выполняется изолированно от других. Контейнер ресурса называется единицей обмена сообщениями. Пространству имен ценовой категории "Премиум" выделяется по крайней мере одна единица обмена сообщениями. Для каждого пространства имен служебной шины Premium можно выбрать 1, 2, 4, 8 или 16 единиц обмена сообщениями. Дополнительные сведения см. в статье Уровни обмена сообщениями через служебную шину Premium и Standard.

    7. В нижней части страницы выберите Review + create (Проверить и создать).

      Изображение: страница

    8. На странице Проверить и создать проверьте параметры и нажмите кнопку Создать.

  5. После успешного развертывания ресурса выберите "Перейти к ресурсу " на странице развертывания.

    Изображение: страница

  6. Вы увидите домашнюю страницу пространства имен служебной шины.

    Изображение: домашняя страница созданного пространства имен Служебной шины.

Создание очереди на портале Azure

  1. На странице пространства имен служебная шина разверните сущности в меню навигации слева и выберите "Очереди".

  2. На странице Очереди на панели инструментов выберите + Очередь.

  3. Введите имя очереди, остальные значения по умолчанию не изменяйте.

  4. Выберите Создать.

    Снимок экрана: страница создания очереди.

Проверка подлинности приложения в Azure

В этом кратком руководстве показано два способа подключения к Служебная шина Azure: без пароля и строка подключения.

Первый вариант показывает, как использовать субъект безопасности в идентификаторе Microsoft Entra ID и управлении доступом на основе ролей (RBAC) для подключения к пространству имен служебная шина. Вам не нужно беспокоиться о наличии жестко закодированных строка подключения в коде или в файле конфигурации или в безопасном хранилище, например Azure Key Vault.

Второй вариант показывает, как использовать строка подключения для подключения к пространству имен служебная шина. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать. Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в разделе "Проверка подлинности и авторизация". Дополнительные сведения о проверке подлинности без пароля см. на странице обзора.

Назначение ролей пользователю Microsoft Entra

При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Служебная шина Azure имеет правильные разрешения. Для отправки и получения сообщений вам потребуется роль владельца данных Служебная шина Azure. Чтобы назначить себе эту роль, вам потребуется роль администратора доступа пользователей или другая роль, которая включает Microsoft.Authorization/roleAssignments/write действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения о доступных областях назначения ролей на странице обзора области.

В следующем примере роль назначается Azure Service Bus Data Owner учетной записи пользователя, которая предоставляет полный доступ к ресурсам Служебная шина Azure. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.

Встроенные роли Azure для служебной шины Azure

Для служебной шины Azure управление пространствами имен и всеми связанными ресурсами через портал Azure и API управления ресурсами Azure уже защищено с помощью модели Azure RBAC. Azure предоставляет следующие встроенные роли Azure для авторизации доступа к пространству имен служебной шины:

  • Служебная шина Azure Владелец данных: включает доступ к пространству имен служебная шина и его сущностям (очереди, разделы, подписки и фильтры). Участник этой роли может отправлять и получать сообщения из очередей или разделов или подписок.
  • Служебная шина Azure Отправителю данных: используйте эту роль, чтобы предоставить доступ к пространству имен служебная шина и его сущностям.
  • Служебная шина Azure приемник данных: используйте эту роль, чтобы предоставить доступ к пространству имен служебная шина и его сущностям.

Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций служебная шина.

Добавление пользователя Microsoft Entra в роль владельца Служебная шина Azure

Добавьте имя пользователя Microsoft Entra в роль владельца данных Служебная шина Azure на уровне пространства имен служебная шина. Это позволит приложению, работающему в контексте учетной записи пользователя, отправлять сообщения в очередь или раздел, а также получать сообщения из очереди или подписки раздела.

Внимание

В большинстве случаев для распространения назначения ролей в Azure потребуется несколько минут. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. Если в портал Azure нет страницы пространства имен служебная шина, найдите пространство имен служебная шина с помощью главной панели поиска или навигации слева.

  2. На странице обзора выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.

    Снимок экрана, на котором продемонстрировано назначение роли.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Azure Service Bus Data Owner и выберите соответствующий результат. Теперь щелкните Далее.

  6. В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.

  8. Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.

Отправка сообщений в очередь

В рамках этого раздела вы создадите консольный проект Java и добавите код для отправки сообщений в созданную вами ранее очередь.

Создание консольного проекта Java

Создайте проект Java с помощью Eclipse или инструмента по своему усмотрению.

Настройка приложения для использования служебной шины

Добавьте ссылку на библиотеки Azure Core и Служебной шины Azure.

Если вы используете Eclipse и создали консольное приложение Java, преобразуйте проект Java в Maven: щелкните проект правой кнопкой мыши в окне обозревателя пакетов, выберите "Настроить ->Преобразовать в проект Maven". Затем добавьте зависимости в эти две библиотеки, как показано в следующем примере.

pom.xml Обновите файл, чтобы добавить зависимости для Служебная шина Azure и пакетов удостоверений Azure.

    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

Добавление кода для отправки сообщений в очередь

  1. Добавьте в раздел Java-файла следующие инструкции import.

    import com.azure.messaging.servicebus.*;
    import com.azure.identity.*;
    
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. В классе определите переменные для хранения строка подключения и имени очереди.

    static String queueName = "<QUEUE NAME>";
    

    Внимание

    Замените <QUEUE NAME> именем очереди.

  3. Добавьте метод с именем sendMessage, в класс для отправки одного сообщения в очередь.

    Внимание

    Замените NAMESPACENAME именем пространства имен используемой служебной шины.

    static void sendMessage()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // send one message to the queue
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the queue: " + queueName);
    }
    
    
  4. Добавьте метод с именем createMessages в класс, чтобы создать список сообщений. Как правило, эти сообщения поступают из различных частей приложения. Здесь мы создадим список примеров сообщений.

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. Добавьте метод с именем sendMessageBatch для отправки сообщений в созданную вами очередь. Этот метод создает ServiceBusSenderClient для очереди, вызывает метод createMessages для получения списка сообщений, подготавливает один или несколько пакетов и отправляет пакеты в очередь.

    Внимание

    Замените NAMESPACENAME именем пространства имен используемой служебной шины.

    static void sendMessageBatch()
    {
        // create a token using the default Azure credential
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
        }
    
        //close the client
        senderClient.close();
    }
    

Получение сообщений из очереди

В этом разделе вы добавите код для получения сообщений из очереди.

  1. Добавьте метод с именем receiveMessages для получения сообщений из очереди. Этот метод создает ServiceBusProcessorClient для очереди, указывая один обработчик для обработки сообщений и другой — для обработки ошибок. Затем он запускает процессор, ждет несколько секунд, выводит полученные сообщения, а затем останавливает и закрывает процессор.

    Внимание

    • Замените NAMESPACENAME именем пространства имен используемой служебной шины.
    • Замените QueueTest в QueueTest::processMessage в коде именем класса.
    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .build();
    
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net")
                .credential(credential)
                .processor()
                .queueName(queueName)
                .processMessage(context -> processMessage(context))
                .processError(context -> processError(context))
                .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();
    }
    
  2. Добавьте метод processMessage для обработки сообщения, полученного от подписки Служебной шины.

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }
    
  3. Добавьте метод processError для обработки сообщений об ошибках.

    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }
    
  4. Обновите метод main, чтобы вызвать методы sendMessage, sendMessageBatch и receiveMessages и вызвать InterruptedException.

    public static void main(String[] args) throws InterruptedException {
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }
    

Выполнить приложение

  1. Если вы используете Eclipse, щелкните проект правой кнопкой мыши, выберите "Экспорт", разверните Java, выберите JAR-файл с возможностью запуска и выполните действия, чтобы создать исполняемый JAR-файл.

  2. Если вы вошли на компьютер с помощью учетной записи пользователя, отличной от учетной записи пользователя, добавленной в роль владельца данных Служебная шина Azure, выполните следующие действия. В противном случае пропустите этот шаг и перейдите к запуску JAR-файла на следующем шаге.

    1. Установите Azure CLI на компьютере.

    2. Выполните следующую команду CLI, чтобы войти в Azure. Используйте ту же учетную запись пользователя, которую вы добавили в роль владельца данных Служебная шина Azure.

      az login
      
  3. Запустите JAR-файл с помощью следующей команды.

    java -jar <JAR FILE NAME>
    
  4. В окне консоли вы увидите приведенные ниже выходные данные.

    Sent a single message to the queue: myqueue
    Sent a batch of messages to the queue: myqueue
    Starting the processor
    Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
    Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
    Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
    Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
    Stopping and closing the processor
    

На странице Обзор для пространства имен служебной шины в портале Azure можно видеть количество входящих и исходящих сообщений. Подождите минуту или около того, а затем обновите страницу, чтобы просмотреть последние значения.

Количество входящих и исходящих сообщений

Выберите очередь на этой странице Обзор, чтобы перейти на страницу Очередь служебной шины. На этой странице отображаются количество входящих и исходящих сообщений. Вы также можете увидеть другие сведения, например текущий размер очереди, максимальный размер, количество активных сообщений и т. д.

Сведения об очереди

Next Steps

Ознакомьтесь со следующими примерами и документацией: