Partilhar via


Executar operações em massa em dados do Azure Cosmos DB

APLICA-SE A: NoSQL

Este tutorial fornece instruções sobre como executar operações em massa no SDK Java V4 do Azure Cosmos DB. Esta versão do SDK vem com a biblioteca de executores em massa integrada. Se você estiver usando uma versão mais antiga do Java SDK, é recomendável migrar para a versão mais recente. O SDK Java V4 do Azure Cosmos DB é a solução recomendada atualmente para suporte em massa a Java.

Atualmente, a biblioteca de executores em massa é suportada apenas pelo Azure Cosmos DB para NoSQL e API para contas Gremlin. Para saber mais sobre como usar a biblioteca .NET do executor em massa com a API para Gremlin, consulte Executar operações em massa no Azure Cosmos DB para Gremlin.

Pré-requisitos

  • Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

  • Você pode experimentar o Azure Cosmos DB gratuitamente sem uma assinatura do Azure, gratuitamente e compromissos. Ou, você pode usar o emulador do Azure Cosmos DB com o ponto de https://localhost:8081 extremidade. A Chave Primária é fornecida em Autenticar pedidos.

  • Java Development Kit (JDK) 1.8+

    • No Ubuntu, execute apt-get install default-jdk para instalar o JDK.

    • Certifique-se de que define a variável de ambiente JAVA_HOME para apontar para a pasta onde está instalado o JDK.

  • Transferir e instalar um arquivo binário Maven

    • No Ubuntu, pode executar apt-get install maven para instalar o Maven.
  • Crie uma conta do Azure Cosmos DB para NoSQL usando as etapas descritas na seção criar conta de banco de dados do artigo de início rápido Java.

Clonar a aplicação de exemplo

Agora vamos mudar para trabalhar com código baixando um repositório de exemplos genéricos para Java V4 SDK para Azure Cosmos DB do GitHub. Esses aplicativos de exemplo executam operações CRUD e outras operações comuns no Azure Cosmos DB. Para clonar o repositório, abra um prompt de comando, navegue até o diretório onde deseja copiar o aplicativo e execute o seguinte comando:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

O repositório clonado /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async contém um exemplo SampleBulkQuickStartAsync.java na pasta. O aplicativo gera documentos e executa operações para criar, atualizar, substituir e excluir itens em massa no Azure Cosmos DB. Nas próximas seções, analisaremos o código no aplicativo de exemplo.

Execução em massa no Azure Cosmos DB

  1. As cadeias de conexão do Azure Cosmos DB são lidas como argumentos e atribuídas a variáveis definidas em /examples/common/AccountSettings.java file. Essas variáveis de ambiente devem ser definidas
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Para executar a amostra global, especifique sua classe principal:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. O CosmosAsyncClient objeto é inicializado usando as seguintes instruções:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. O exemplo cria um banco de dados e um contêiner assíncronos. Em seguida, ele cria vários documentos nos quais as operações em massa serão executadas. Ele adiciona estes documentos a um Flux<Family> objeto de fluxo reativo:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. O exemplo contém métodos para criar, atualizar, substituir e excluir em massa. Em cada método, mapeamos os documentos das famílias no fluxo do BulkWriter Flux<Family> para várias chamadas de método no CosmosBulkOperations. Essas operações são adicionadas a outro objeto Flux<CosmosItemOperation>de fluxo reativo. O fluxo é então passado para o executeBulkOperations método do assíncrono container que criamos no início, e as operações são executadas em massa. Veja o método de criação em massa abaixo como exemplo:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Há também uma classe BulkWriter.java no mesmo diretório que o aplicativo de exemplo. Esta classe demonstra como lidar com erros de limitação de taxa (429) e tempo limite (408) que podem ocorrer durante a execução em massa e tentar novamente essas operações de forma eficaz. Ele é implementado nos métodos abaixo, mostrando também como implementar o controle de taxa de transferência local e global.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Além disso, há métodos de criação em massa no exemplo, que ilustram como adicionar processamento de resposta e definir opções de execução:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Sugestões de desempenho

Considere os seguintes pontos para obter um melhor desempenho ao usar a biblioteca de executores em massa:

  • Para obter o melhor desempenho, execute seu aplicativo a partir de uma VM do Azure na mesma região que sua região de gravação de conta do Azure Cosmos DB.

  • Para alcançar uma taxa de transferência mais elevada:

    • Defina o tamanho da pilha da JVM para um número grande o suficiente para evitar qualquer problema de memória no processamento de um grande número de documentos. Tamanho de heap sugerido: max(3 GB, 3 * sizeof(todos os documentos passados para a API de importação em massa em um lote)).
    • Há um tempo de pré-processamento, devido ao qual você obterá uma taxa de transferência mais alta ao executar operações em massa com um grande número de documentos. Portanto, se você quiser importar 10.000.000 de documentos, executar a importação em massa 10 vezes em 10 grandes documentos cada um de tamanho 1.000.000 é preferível do que executar a importação em massa 100 vezes em 100 grandes documentos de tamanho 100.000.
  • É recomendável instanciar um único objeto CosmosAsyncClient para todo o aplicativo em uma única máquina virtual que corresponde a um contêiner específico do Azure Cosmos DB.

  • Como uma única operação em massa, a execução da API consome uma grande parte da CPU e da E/S da rede da máquina cliente. Isso acontece gerando várias tarefas internamente, evitando gerar várias tarefas simultâneas dentro do seu processo de aplicativo, cada uma executando chamadas de API de operação em massa. Se uma única operação em massa chamadas de API em execução em uma única máquina virtual não puder consumir toda a taxa de transferência do contêiner (se a taxa > de transferência do contêiner for de 1 milhão de RU/s), é preferível criar máquinas virtuais separadas para executar simultaneamente chamadas de API de operação em massa.

Próximos passos

  • Para obter uma visão geral da funcionalidade do executor em massa, consulte Visão geral do executor em massa.