Поделиться через


Использование хранилища очередей из Java

Обзор

В этом руководстве показано, как кодировать типичные сценарии с использованием службы хранения очередей Azure. Примеры написаны на Java и используют пакет SDK службы хранилища Azure для Java. К таким сценариям относятся Вставка, Просмотр, Получение и Удаление сообщений очереди. Также рассматривается код для создания и удаления очередей. Дополнительные сведения об очередях см. в разделе Дальнейшие действия.

Что такое хранилище очередей?

Хранилище очередей Azure — это служба для хранения большого количества сообщений, к которым можно получить доступ практически из любой точки мира с помощью вызовов с проверкой подлинности по протоколам HTTP или HTTPS. Одно сообщение очереди может быть размером до 64 КБ, а очередь может содержать миллионы сообщений до общего ограничения емкости учетной записи хранения. Хранилище очередей часто используется для создания списка невыполненных заданий для асинхронной обработки.

Основные понятия службы очередей

Служба очередей Azure содержит следующие компоненты:

Компоненты служба очередей Azure

  • Учетная запись хранения. Весь доступ к хранилищу Azure осуществляется с помощью учетной записи хранения. См. сведения об учетных записях хранения.

  • Очередь. Очередь содержит набор сообщений. Все сообщения должны находиться в очереди. Обратите внимание: имя очереди должно содержать только строчные символы. Дополнительные сведения см. в статье о присвоении имен очередям и метаданным.

  • Сообщение. Сообщение в любом формате размером до 64 КБ. Сообщение может оставаться в очереди не более 7 дней. Начиная с версии 2017-07-29, максимальный срок жизни может быть задан любым положительным числом или значением -1, свидетельствующим о том, что срок жизни сообщения неограничен. Если этот параметр не указан, срок жизни по умолчанию составляет семь дней.

  • Формат URL-адреса: обращаться к очередям можно с помощью URL-адреса следующего вида: http://<storage account>.queue.core.windows.net/<queue>

    Следующий URL-адрес позволяет обратиться к очереди на схеме:

    http://myaccount.queue.core.windows.net/incoming-orders

Создание учетной записи хранения Azure

Самый простой способ создать первую учетную запись хранения Azure — воспользоваться порталом Azure. Дополнительную информацию см. в статье Об учетных записях хранения Azure.

Кроме того, создать учетную запись хранения Azure можно с помощью Azure PowerShell, Azure CLI или поставщика ресурсов службы хранилища Azure для .NET.

Если вы не хотите сейчас создавать учетную запись хранения в Azure, код можно запустить и протестировать в локальной среде с помощью эмулятора хранилища Azurite. Дополнительные сведения см. в статье Использование эмулятора Azurite для разработки и тестирования службы хранилища Azure.

Создание приложения Java

Сначала убедитесь, что ваша система разработки соответствует предварительным требованиям, перечисленным в документе Использование клиентской библиотеки Хранилища очередей Azure версии 12 для Java.

Чтобы создать приложение Java с именем queues-how-to-v12:

  1. В окне консоли (командная строка, PowerShell или Bash) с помощью Maven создайте консольное приложение с именем queues-how-to-v12. Введите следующую команду mvn, чтобы создать проект Java hello world.

     mvn archetype:generate \
         --define interactiveMode=n \
         --define groupId=com.queues.howto \
         --define artifactId=queues-howto-v12 \
         --define archetypeArtifactId=maven-archetype-quickstart \
         --define archetypeVersion=1.4
    
    mvn archetype:generate `
        --define interactiveMode=n `
        --define groupId=com.queues.howto `
        --define artifactId=queues-howto-v12 `
        --define archetypeArtifactId=maven-archetype-quickstart `
        --define archetypeVersion=1.4
    
  2. Примерный результат создания проекта показан ниже.

    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------< org.apache.maven:standalone-pom >-------------------
    [INFO] Building Maven Stub Project (No POM) 1
    [INFO] --------------------------------[ pom ]---------------------------------
    [INFO]
    [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
    [INFO]
    [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
    [INFO]
    [INFO]
    [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
    [INFO] Generating project in Batch mode
    [INFO] ----------------------------------------------------------------------------
    [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
    [INFO] ----------------------------------------------------------------------------
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: packageInPathFormat, Value: com/queues/howto
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Project created from Archetype in dir: C:\queues\queues-howto-v12
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  6.775 s
    [INFO] Finished at: 2020-08-17T15:27:31-07:00
    [INFO] ------------------------------------------------------------------------
    
  3. Переключитесь на только что созданный каталог queues-howto-v12.

    cd queues-howto-v12
    

Установка пакета

Откройте файл pom.xml в текстовом редакторе. Добавьте приведенный ниже элемент зависимости в группу зависимостей.

<dependency>
  <groupId>com.azure</groupId>
  <artifactId>azure-storage-queue</artifactId>
  <version>12.6.0</version>
</dependency>

Настройка приложения для доступа к хранилищу очередей

Если нужно использовать API-интерфейсы Azure для доступа к очередям, добавьте следующие инструкции импорта в верхнюю часть файла Java.

// Include the following imports to use queue APIs
import com.azure.core.util.*;
import com.azure.storage.queue.*;
import com.azure.storage.queue.models.*;

Настройка строки подключения к службе хранилища Azure

Клиент хранилища Azure использует строку подключения хранилища для доступа к службам управления данными. Получите имя и первичный ключ доступа для учетной записи хранения, указанной на Портале Azure. Используйте их в качестве значений AccountName и AccountKey в строке подключения. В этом примере показано, как объявить статическое поле для размещения строки подключения:

// Define the connection-string with your values
final String connectStr = 
    "DefaultEndpointsProtocol=https;" +
    "AccountName=your_storage_account;" +
    "AccountKey=your_storage_account_key";

В следующих примерах предполагается, что у вас есть объект String, содержащий строку подключения к хранилищу.

Практическое руководство. Создание очереди

Объект QueueClient содержит операции для взаимодействия с очередью. Следующий код создает объект QueueClient. Используйте объект QueueClient для создания необходимой очереди.

public static String createQueue(String connectStr)
{
    try
    {
        // Create a unique name for the queue
        String queueName = "queue-" + java.util.UUID.randomUUID();

        System.out.println("Creating queue: " + queueName);

        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queue = new QueueClientBuilder()
                                .connectionString(connectStr)
                                .queueName(queueName)
                                .buildClient();

        // Create the queue
        queue.create();
        return queue.getQueueName();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println("Error code: " + e.getErrorCode() + "Message: " + e.getMessage());
        return null;
    }
}

Практическое руководство. Добавление сообщения в очередь

Чтобы вставить сообщение в существующую очередь, вызовите метод sendMessage. Для создания объекта можно использовать либо строку (в формате UTF-8), либо массив байтов. Ниже приведен код, который отправляет строковое сообщение в очередь.

public static void addQueueMessage
    (String connectStr, String queueName, String messageText)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Adding message to the queue: " + messageText);

        // Add a message to the queue
        queueClient.sendMessage(messageText);
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Практическое руководство. Просмотр следующего сообщения

Вы можете просмотреть сообщение в начале очереди, не удаляя его из очереди, вызвав peekMessage.

public static void peekQueueMessage
    (String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Peek at the first message
        PeekedMessageItem peekedMessageItem = queueClient.peekMessage();
        System.out.println("Peeked message: " + peekedMessageItem.getMessageText());
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Практическое руководство. Изменение содержимого сообщения в очереди

Вы можете изменить содержимое сообщения непосредственно в очереди. Если сообщение представляет собой рабочую задачу, можно использовать эту функцию для обновления состояния. Следующий код добавляет новое содержимое в очередь сообщений и продлевает время ожидания видимости еще на 30 секунд. Увеличение времени ожидания позволяет клиенту продолжать работу с сообщением через 30 секунд. Также можно сократить число повторных попыток. Если сообщение будет повторено более n раз, удалите его. Этот сценарий обеспечивает защиту от сообщений, которые инициируют ошибку приложения при каждой попытке обработки.

Следующий пример кода выполняет поиск в очереди сообщений, находит первое содержимое сообщения, которое соответствует строке поиска, изменяет содержимое сообщения и завершает работу.

public static void updateQueueMessage
    (String connectStr, String queueName,
    String searchString, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 32
        final int MAX_MESSAGES = 32;

        // Iterate through the queue messages
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES))
        {
            // Check for a specific string
            if (message.getMessageText().equals(searchString))
            {
                // Update the message to be visible in 30 seconds
                queueClient.updateMessage(message.getMessageId(),
                                          message.getPopReceipt(),
                                          updatedContents,
                                          Duration.ofSeconds(30));
                System.out.println(
                    String.format("Found message: \'%s\' and updated it to \'%s\'",
                                    searchString,
                                    updatedContents)
                                  );
                break;
            }
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Следующий пример кода просто обновляет первое видимое сообщение в очереди.

public static void updateFirstQueueMessage
    (String connectStr, String queueName, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            // Update the message to be visible in 30 seconds
            UpdateMessageResult result = queueClient.updateMessage(message.getMessageId(),
                                                                   message.getPopReceipt(),
                                                                   updatedContents,
                                                                   Duration.ofSeconds(30));
            System.out.println("Updated the first message with the receipt: " +
                    result.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Практическое руководство. Получение длины очереди

Вы можете узнать приблизительное количество сообщений в очереди.

Метод getProperties возвращает несколько значений, включая количество сообщений, находящихся в очереди в данный момент. Счетчик указывает число лишь приблизительно, так как сообщения могут добавляться или удаляться из вашего запроса. Метод getApproximateMessageCount возвращает последнее значение, полученное при вызове метода getProperties, без вызова хранилища очередей.

public static void getQueueLength(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        QueueProperties properties = queueClient.getProperties();
        long messageCount = properties.getApproximateMessagesCount();

        System.out.println(String.format("Queue length: %d", messageCount));
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Практическое руководство. Удаление следующего сообщения из очереди

Код удаляет сообщение из очереди в два этапа. При вызове метода receiveMessage вы получаете следующее сообщение в очереди. Сообщение, возвращаемое методом receiveMessage, становится невидимым для другого кода, считывающего сообщения из этой очереди. По умолчанию это сообщение остается невидимым в течение 30 секунд. Чтобы завершить удаление сообщения из очереди, необходимо также вызвать метод deleteMessage. Если код не может обработать сообщение, такой двухэтапный процесс гарантирует, что вы получите то же сообщение и сможете повторить попытку. Код вызывает deleteMessage сразу после обработки сообщения.

public static void dequeueMessage(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            System.out.println("Dequeing message: " + message.getMessageText());

            // Delete the message
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
        else
        {
            System.out.println("No visible messages in queue");
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Дополнительные варианты удаления сообщений из очереди

Извлечение сообщений из очереди можно настроить двумя способами. Во-первых, можно получить пакет сообщений (до 32 сообщений). Во-вторых, можно задать более длительное или короткое время ожидания невидимости, чтобы предоставить коду больше или меньше времени на полную обработку каждого сообщения.

В следующем примере кода метод receiveMessages используется для получения 20 сообщений в одном вызове. Затем он обрабатывает каждое сообщение с помощью цикла for. Пример также задает время ожидания невидимости в размере пяти минут (300 секунд) для каждого сообщения. Время ожидания начинается для всех сообщений одновременно. По прошествии пяти минут с момента вызова receiveMessages все неудаленные сообщения снова станут видимыми.

public static void dequeueMessages(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 20
        final int MAX_MESSAGES = 20;

        // Retrieve 20 messages from the queue with a
        // visibility timeout of 300 seconds (5 minutes)
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES,
                Duration.ofSeconds(300), Duration.ofSeconds(1), new Context("key1", "value1")))
        {
            // Do processing for all messages in less than 5 minutes,
            // deleting each message after processing.
            System.out.println("Dequeing message: " + message.getMessageText());
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как перечислять очереди

Чтобы получить список текущей очереди, вызовите метод QueueServiceClient.listQueues(), который возвращает коллекцию объектов QueueItem.

public static void listQueues(String connectStr)
{
    try
    {
        // Instantiate a QueueServiceClient which will be
        // used to list the queues
        QueueServiceClient queueServiceClient = new QueueServiceClientBuilder()
                                    .connectionString(connectStr)
                                    .buildClient();

        // Loop through the collection of queues.
        for (QueueItem queue : queueServiceClient.listQueues())
        {
            // Output each queue name.
            System.out.println(queue.getName());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Практическое руководство. Удаление очереди

Чтобы удалить очередь и все сообщения в ней, вызовите метод delete для объекта QueueClient.

public static void deleteMessageQueue(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Deleting queue: " + queueClient.getQueueName());

        // Delete the queue
        queueClient.delete();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Совет

См. примеры кода в репозитории службы хранилища Azure

Полные и простые в применении примеры кода для службы хранилища Azure можно скачать и запустить отсюда.

Дальнейшие действия

Вы изучили основные сведения о хранилище очередей. Дополнительные сведения о более сложных задачах по использованию хранилища можно найти по следующим ссылкам.

Связанные примеры кода с использованием устаревших пакетов SDK для Java версии 8 см. в статье Примеры кода с использованием Java версии 8.