Поделиться через


Использование библиотеки .NET исполнителя массовых операций в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Примечание.

Библиотека массового исполнителя, описанная в этой статье, поддерживается для приложений, использующих версию пакета SDK для .NET 2.x. Для новых приложений можно использовать массовую поддержку , доступную непосредственно в пакете SDK для .NET версии 3.x, и для нее нет внешней библиотеки.

Если в настоящее время вы используете библиотеку массового исполнителя и планируете выполнить миграцию на массовую поддержку в новом пакете SDK, выполните действия, описанные в руководстве по миграции, чтобы перенести приложение.

В этом руководстве приводятся инструкции по использованию библиотеки массового исполнителя .NET для импорта и обновления документов в контейнер Azure Cosmos DB. Сведения о библиотеке массового исполнителя и о том, как она помогает использовать массовую пропускную способность и хранилище, см. в обзоре библиотеки массового исполнителя Azure Cosmos DB. В этом руководстве вы увидите пример приложения .NET, в котором массовый импорт случайно созданных документов в контейнер Azure Cosmos DB. После импорта данных библиотека показывает, как можно массово обновлять импортированные данные, указывая исправления в качестве операций для выполнения в определенных полях документов.

В настоящее время библиотека массового исполнителя поддерживается только учетными записями Gremlin для Azure Cosmos DB для NoSQL и API для учетных записей Gremlin. В этой статье описывается, как использовать библиотеку .NET массового исполнителя с api для учетных записей NoSQL. Чтобы узнать, как использовать библиотеку .NET массового исполнителя с API для учетных записей Gremlin, см . сведения о массовом приеме данных в Azure Cosmos DB для Gremlin с помощью библиотеки массового исполнителя.

Необходимые компоненты

Клонирование примера приложения

Теперь перейдем к работе с кодом, скачав пример приложения .NET из GitHub. Это приложение выполняет массовые операции с данными, хранящимися в учетной записи Azure Cosmos DB. Чтобы клонировать приложение, откройте командную строку, перейдите в каталог, в котором нужно скопировать его, и выполните следующую команду:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Клонированный репозиторий содержит два примера: BulkImportSample и BulkUpdateSample. Вы можете открыть любой из примеров приложений, обновить строка подключения в файле App.config с помощью строка подключения учетной записи Azure Cosmos DB, создать решение и запустить его.

Приложение BulkImportSample создает случайные документы и массовый импортирует их в учетную запись Azure Cosmos DB. Приложение BulkUpdateSample массово обновляет импортированные документы, указывая исправления в качестве операций для выполнения в определенных полях документов. В следующих разделах вы рассмотрите код в каждом из этих примеров приложений.

Массовый импорт данных в учетную запись Azure Cosmos DB

  1. Перейдите в папку BulkImportSample и откройте файл BulkImportSample.sln .

  2. Строки подключения Azure Cosmos DB извлечены из файла App.config, как показано в следующем коде:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Средство массового импорта создает базу данных и коллекцию с именем базы данных, именем контейнера и значениями пропускной способности, указанными в файле App.config.

  3. Затем объект DocumentClient инициализируется с режимом подключения Direct TCP:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. Объект BulkExecutor инициализирован с высоким значением повторных попыток для времени ожидания и регулирования запросов. Затем для них задано значение 0, чтобы передать элемент управления перегрузки в BulkExecutor в течение своего существования.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Приложение вызывает API BulkImportAsync . Библиотека .NET предоставляет две перегрузки API массового импорта— одну из которых принимает список сериализованных документов JSON, а другой — список десериализированных документов POCO. Дополнительные сведения об определениях каждого из этих перегруженных методов см. в документации по API.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Метод BulkImportAsync принимает следующие параметры:

    Параметр Description
    enableUpsert Флаг для включения операций Upsert в документах. Если документ с заданным идентификатором уже существует, он обновляется. Значение по умолчанию — false.
    disableAutomaticIdGeneration Параметр, отключающий автоматическое создание идентификатора. Значение по умолчанию — true.
    maxConcurrencyPerPartitionKeyRange Максимальная степень параллелизма для одного диапазона ключей разделов. Значение NULL приводит к тому, что библиотека будет использовать значение по умолчанию 20.
    maxInMemorySortingBatchSize Максимальное количество документов, выведенных из перечислителя документов, которое передается вызову API на каждом этапе. Для этапа сортировки в памяти, который происходит перед массовым импортом. Если этот параметр имеет значение NULL, библиотека будет использовать минимальное значение по умолчанию (documents.count, 1000000).
    отменаToken Токен отмены для корректного выхода из операции массового импорта.

Определение объекта ответа массового импорта
Результат вызова API массового импорта содержит следующие атрибуты:

Параметр Description
NumberOfDocumentsImported (long) Общее число документов, которые были успешно импортированы из общего количества документов, предоставленных для вызова API массового импорта.
TotalRequestUnitsConsumed (double) Общее число единиц запросов (ЕЗ), потребляемых вызовом API массового импорта.
TotalTimeTaken (TimeSpan) Общее время, затрачиваемое на выполнение вызова API массового импорта.
BadInputDocuments (объект> List<) Список документов с неверным форматом, которые не были успешно импортированы при вызове API массового импорта. Необходимо исправить возвращенные документы и повторить импорт. Документы с неверным форматом включают в себя документы, значение идентификаторов которых не является строкой (NULL или любой другой тип данных считается недействительным).

Массовое обновление данных в учетной записи Azure Cosmos DB

Существующие документы можно обновить с помощью API BulkUpdateAsync . В этом примере вы задайте Name для поля новое значение и удалите Description поле из существующих документов. Полный набор поддерживаемых операций обновления см. в документации по API.

  1. Перейдите в папку BulkUpdateSample и откройте файл BulkUpdateSample.sln .

  2. Определите элементы для обновления вместе с соответствующими операциями обновления полей. В этом примере вы используете SetUpdateOperation для обновления Name поля и UnsetUpdateOperation, чтобы удалить Description поле из всех документов. Вы также можете выполнять другие операции, такие как увеличение поля документа по определенному значению, отправка определенных значений в поле массива или удаление определенного значения из поля массива. Дополнительные сведения о различных методах, предоставляемых API массового обновления, см. в документации по API.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Приложение вызывает API BulkUpdateAsync . Сведения об определении метода BulkUpdateAsync см. в документации по API.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    Метод BulkUpdateAsync принимает следующие параметры:

    Параметр Description
    maxConcurrencyPerPartitionKeyRange Максимальная степень параллелизма для одного диапазона ключей разделов. Если этот параметр имеет значение NULL, библиотека использует значение по умолчанию (20).
    maxInMemorySortingBatchSize Максимальное число элементов обновления, извлеченных из перечислителя элементов обновления, переданного в вызов API на каждом этапе. Для этапа сортировки в памяти, который происходит перед массовым обновлением. Установка этого параметра на значение NULL приводит к тому, что библиотека использует минимальное значение по умолчанию (updateItems.count, 1000000).
    отменаToken Токен отмены для корректного выхода из операции массового обновления.

Определение объекта ответа массового обновления
Результат вызова API массового обновления содержит следующие атрибуты:

Параметр Description
NumberOfDocumentsUpdated (long) Общее число документов, которые были успешно обновлены из общего количества документов, предоставленных для вызова API массового обновления.
TotalRequestUnitsConsumed (double) Общее число единиц запросов (ЕЗ), потребляемых вызовом API массового обновления.
TotalTimeTaken (TimeSpan) Общее время, затрачиваемое на выполнение вызова API массового обновления.

Советы по производительности

При использовании библиотеки массового исполнителя следует учитывать следующие моменты для повышения производительности.

  • Для повышения производительности запустите приложение из виртуальной машины Azure, которая находится в том же регионе, что и область записи учетной записи Azure Cosmos DB.

  • Рекомендуется создать экземпляр одного объекта BulkExecutor для всего приложения в пределах одной виртуальной машины, соответствующей конкретному контейнеру Azure Cosmos DB.

  • Одно выполнение API массовых операций использует большой блок ЦП и сетевого ввода-вывода клиентского компьютера при внутренней работе нескольких задач. Не рекомендуется порождать в области памяти процесса приложения несколько параллельных задач, выполняющих вызовы API массовых операций. Если один вызов API массовой операции, выполняющийся на одной виртуальной машине, не может использовать пропускную способность всего контейнера (если пропускная способность > контейнера составляет 1 миллион ЕЗ/с), рекомендуется создать отдельные виртуальные машины для параллельного выполнения вызовов API массовой операции.

  • Убедитесь, InitializeAsync() что метод вызывается после создания экземпляра объекта BulkExecutor, чтобы получить карту секционирования целевого контейнера Azure Cosmos DB.

  • Убедитесь, что в файле App.Config приложения включен параметр gcServer для обеспечения лучшей производительности.

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Библиотека создает трассировки, которые могут записываться в файл журнала, либо в консоль. Чтобы включить оба, добавьте следующий код в файл App.Config приложения:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Следующие шаги