Поделиться через


Как использовать хранилище очередей в Java

Обзор

В этом руководстве показано, как кодировать распространенные сценарии с помощью службы хранилища очередей Azure. Примеры написаны на Java и используют пакет SDK службы хранилища Azure для Java. Сценарии включают вставление, просмотр, получение и удаление сообщений очереди. Также рассматривается код для создания и удаления очередей. Дополнительные сведения о очередях см. в разделе "Дальнейшие действия ".

Что представляет собой хранилище очередей?

Хранилище очередей Azure — это служба для хранения большого количества сообщений, к которым можно обращаться из любой точки мира через вызовы, прошедшие проверку подлинности с помощью HTTP или HTTPS. Одно сообщение очереди может превышать 64 КБ, а очередь может содержать миллионы сообщений до общего предела емкости учетной записи хранения. Хранилище очередей часто используется для создания накопления задач для асинхронной обработки.

Основные понятия службы очередей

Служба очередей Azure содержит следующие компоненты:

Компоненты службы очередей Azure

  • Учетная запись хранения: Все доступ к службе хранилища Azure осуществляется с помощью учетной записи хранения. Дополнительные сведения об учетных записях хранения см. в обзоре учетной записи хранения.

  • Очередь: Очередь содержит набор сообщений. Все сообщения должны находиться в очереди. Обратите внимание: имя очереди должно быть в нижнем регистре. Дополнительные сведения об именовании очередей см. в разделе Именование очередей и метаданных.

  • Сообщение: Сообщение в любом формате до 64 КБ. Максимальное время, когда сообщение может остаться в очереди, составляет 7 дней. Начиная с версии 2017-07-29, максимальный срок жизни может быть задан любым положительным числом или значением -1, свидетельствующим о том, что срок жизни сообщения неограничен. Если этот параметр не указан, срок жизни по умолчанию составляет семь дней.

  • Формат URL-адреса: Очереди можно адресировать с помощью следующего формата URL-адреса: http://<storage account>.queue.core.windows.net/<queue>

    Следующий URL-адрес адресует очередь на схеме:

    http://myaccount.queue.core.windows.net/incoming-orders

Создание учетной записи хранилища Azure

Проще всего создать свою первую учетную запись хранения Azure, используя портал Azure . Дополнительные сведения см. в статье "Создание учетной записи хранения".

Вы также можете создать учетную запись хранения Azure с помощью Azure PowerShell, Azure CLI или поставщика ресурсов службы хранилища Azure для .NET.

Если вы предпочитаете не создавать учетную запись хранения в Azure в настоящее время, вы также можете использовать эмулятор хранилища Azurite для запуска и тестирования кода в локальной среде. Дополнительные сведения см. в статье "Использование эмулятора Azurite для локальной разработки службы хранилища Azure".

Создание приложения Java

Сначала убедитесь, что ваша система разработки соответствует предварительным требованиям, перечисленным в клиентской библиотеке хранилища очередей Azure версии 12 для Java.

Создание приложения Java с именем queues-how-to-v12:

  1. В окне консоли (например, cmd, PowerShell или Bash) используйте Maven для создания нового консольного приложения с именем queues-how-to-v12. Введите следующую mvn команду, чтобы создать проект Java hello world.

     mvn archetype:generate \
         --define interactiveMode=n \
         --define groupId=com.queues.howto \
         --define artifactId=queues-howto-v12 \
         --define archetypeArtifactId=maven-archetype-quickstart \
         --define archetypeVersion=1.4
    
    mvn archetype:generate `
        --define interactiveMode=n `
        --define groupId=com.queues.howto `
        --define artifactId=queues-howto-v12 `
        --define archetypeArtifactId=maven-archetype-quickstart `
        --define archetypeVersion=1.4
    
  2. Примерный результат создания проекта показан ниже.

    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------< org.apache.maven:standalone-pom >-------------------
    [INFO] Building Maven Stub Project (No POM) 1
    [INFO] --------------------------------[ pom ]---------------------------------
    [INFO]
    [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
    [INFO]
    [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
    [INFO]
    [INFO]
    [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
    [INFO] Generating project in Batch mode
    [INFO] ----------------------------------------------------------------------------
    [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
    [INFO] ----------------------------------------------------------------------------
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: packageInPathFormat, Value: com/queues/howto
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Project created from Archetype in dir: C:\queues\queues-howto-v12
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  6.775 s
    [INFO] Finished at: 2020-08-17T15:27:31-07:00
    [INFO] ------------------------------------------------------------------------
    
  3. Перейдите в только что созданный каталог queues-howto-v12.

    cd queues-howto-v12
    

Установите пакет

Откройте файл pom.xml в текстовом редакторе. Добавьте приведенный ниже элемент зависимости в группу зависимостей.

<dependency>
  <groupId>com.azure</groupId>
  <artifactId>azure-storage-queue</artifactId>
  <version>12.6.0</version>
</dependency>

Настройте ваше приложение для доступа к хранилищу очередей

Добавьте следующие инструкции импорта в начало файла Java, где вы хотите использовать API службы хранилища Azure для доступа к очередям:

// Include the following imports to use queue APIs
import com.azure.core.util.*;
import com.azure.storage.queue.*;
import com.azure.storage.queue.models.*;

Настройка строки подключения к службе хранилища Azure

Клиент службы хранилища Azure использует строку подключения к хранилищу для доступа к службам управления данными. Получите имя и первичный ключ доступа для учетной записи хранения, указанной на портале Azure. Используйте их как AccountName и AccountKey значения в строке подключения. В этом примере показано, как объявить статическое поле для размещения строки подключения:

// Define the connection-string with your values
final String connectStr = 
    "DefaultEndpointsProtocol=https;" +
    "AccountName=your_storage_account;" +
    "AccountKey=your_storage_account_key";

В следующих примерах предполагается, что у вас есть String объект, содержащий строку подключения к хранилищу.

Как создать очередь

Объект QueueClient содержит операции для взаимодействия с очередью. Следующий код создает QueueClient объект. QueueClient Используйте объект для создания очереди, которую вы хотите использовать.

public static String createQueue(String connectStr)
{
    try
    {
        // Create a unique name for the queue
        String queueName = "queue-" + java.util.UUID.randomUUID();

        System.out.println("Creating queue: " + queueName);

        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queue = new QueueClientBuilder()
                                .connectionString(connectStr)
                                .queueName(queueName)
                                .buildClient();

        // Create the queue
        queue.create();
        return queue.getQueueName();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println("Error code: " + e.getErrorCode() + "Message: " + e.getMessage());
        return null;
    }
}

Практическое руководство. Добавление сообщения в очередь

Чтобы вставить сообщение в существующую очередь, вызовите метод sendMessage. Сообщение может быть строкой (в формате UTF-8) или массивом байтов. Ниже приведен код, который отправляет строковое сообщение в очередь.

public static void addQueueMessage
    (String connectStr, String queueName, String messageText)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Adding message to the queue: " + messageText);

        // Add a message to the queue
        queueClient.sendMessage(messageText);
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как взглянуть на следующее сообщение

Вы можете просмотреть сообщение в начале очереди, не удаляя его из очереди, вызвав peekMessage.

public static void peekQueueMessage
    (String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Peek at the first message
        PeekedMessageItem peekedMessageItem = queueClient.peekMessage();
        System.out.println("Peeked message: " + peekedMessageItem.getMessageText());
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как изменить содержимое сообщения в очереди

Содержимое сообщения можно изменить на месте в очереди. Если сообщение представляет рабочую задачу, эту функцию можно использовать для обновления состояния. Следующий код обновляет сообщение очереди с новым содержимым и задает время ожидания видимости, чтобы продлить еще 30 секунд. Расширение времени ожидания видимости дает клиенту еще 30 секунд для продолжения работы над сообщением. Вы также можете сохранить счетчик повторных попыток. Если сообщение повторно выполняется более n раз, удалите его. Этот сценарий защищает от сообщения, которое вызывает ошибку приложения каждый раз при обработке.

Следующий пример кода выполняет поиск по очереди сообщений, находит первое содержимое сообщения, соответствующее строке поиска, изменяет содержимое сообщения и завершает работу.

public static void updateQueueMessage
    (String connectStr, String queueName,
    String searchString, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 32
        final int MAX_MESSAGES = 32;

        // Iterate through the queue messages
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES))
        {
            // Check for a specific string
            if (message.getMessageText().equals(searchString))
            {
                // Update the message to be visible in 30 seconds
                queueClient.updateMessage(message.getMessageId(),
                                          message.getPopReceipt(),
                                          updatedContents,
                                          Duration.ofSeconds(30));
                System.out.println(
                    String.format("Found message: \'%s\' and updated it to \'%s\'",
                                    searchString,
                                    updatedContents)
                                  );
                break;
            }
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Следующий пример кода просто обновляет первое видимое сообщение в очереди.

public static void updateFirstQueueMessage
    (String connectStr, String queueName, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            // Update the message to be visible in 30 seconds
            UpdateMessageResult result = queueClient.updateMessage(message.getMessageId(),
                                                                   message.getPopReceipt(),
                                                                   updatedContents,
                                                                   Duration.ofSeconds(30));
            System.out.println("Updated the first message with the receipt: " +
                    result.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как получить длину очереди

Вы можете узнать приблизительное количество сообщений в очереди.

Метод getProperties возвращает несколько значений, включая количество сообщений в очереди. Количество указано приблизительно, так как сообщения могут добавляться или удаляться после вашего запроса. Метод getApproximateMessageCount возвращает последнее значение, полученное при вызове метода getProperties, без вызова хранилища очередей.

public static void getQueueLength(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        QueueProperties properties = queueClient.getProperties();
        long messageCount = properties.getApproximateMessagesCount();

        System.out.println(String.format("Queue length: %d", messageCount));
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как выполнить: Извлечение следующего сообщения

Код удаляет сообщение из очереди в два этапа. При вызове receiveMessageвы получите следующее сообщение в очереди. Сообщение, возвращаемое методом receiveMessage, становится невидимым для другого кода, считывающего сообщения из этой очереди. По умолчанию это сообщение остается невидимым в течение 30 секунд. Чтобы завершить удаление сообщения из очереди, необходимо также вызвать deleteMessage. Если код не может обработать сообщение, этот двухэтапный процесс гарантирует, что вы можете получить одно и то же сообщение и повторить попытку. Код вызывает deleteMessage сразу после обработки сообщения.

public static void dequeueMessage(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            System.out.println("Dequeing message: " + message.getMessageText());

            // Delete the message
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
        else
        {
            System.out.println("No visible messages in queue");
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Дополнительные варианты удаления сообщений из очереди

Существует два способа настройки извлечения сообщений из очереди. Сначала получите пакет сообщений (до 32). Во-вторых, задайте более длительное или меньшее время ожидания невидимости, что позволяет коду больше или меньше времени полностью обрабатывать каждое сообщение.

В следующем примере кода используется метод receiveMessages для получения 20 сообщений в одном вызове. Затем он обрабатывает каждое сообщение с помощью цикла for. Также устанавливает, что время, в течение которого сообщение будет невидимым, составляет пять минут (300 секунд) для каждого сообщения. Время ожидания начинается для всех сообщений одновременно. Когда пять минут прошло с момента вызова receiveMessages, все сообщения, не удаленные, снова станут видимыми.

public static void dequeueMessages(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 20
        final int MAX_MESSAGES = 20;

        // Retrieve 20 messages from the queue with a
        // visibility timeout of 300 seconds (5 minutes)
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES,
                Duration.ofSeconds(300), Duration.ofSeconds(1), new Context("key1", "value1")))
        {
            // Do processing for all messages in less than 5 minutes,
            // deleting each message after processing.
            System.out.println("Dequeing message: " + message.getMessageText());
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как вывести список очередей

Чтобы получить список текущих очередей, вызовите QueueServiceClient.listQueues() метод, который вернет коллекцию QueueItem объектов.

public static void listQueues(String connectStr)
{
    try
    {
        // Instantiate a QueueServiceClient which will be
        // used to list the queues
        QueueServiceClient queueServiceClient = new QueueServiceClientBuilder()
                                    .connectionString(connectStr)
                                    .buildClient();

        // Loop through the collection of queues.
        for (QueueItem queue : queueServiceClient.listQueues())
        {
            // Output each queue name.
            System.out.println(queue.getName());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Как удалить очередь

Чтобы удалить очередь и все сообщения, содержащиеся в нем, вызовите метод delete в объекте QueueClient.

public static void deleteMessageQueue(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Deleting queue: " + queueClient.getQueueName());

        // Delete the queue
        queueClient.delete();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

Подсказка

Ознакомьтесь с репозиторием примеров кода службы хранилища Azure

Чтобы легко использовать комплексные примеры кода службы хранилища Azure, которые можно скачать и запустить, ознакомьтесь со списком примеров службы хранилища Azure.

Дальнейшие шаги

Теперь, когда вы узнали основы очереди хранения, следуйте этим ссылкам, чтобы узнать о более сложных задачах, связанных с хранением.

Примеры использования устаревших пакетов SDK для Java версии 8 см. примеры кода с использованием Java версии 8.