다음을 통해 공유


대량 실행기 .NET 라이브러리를 사용하여 Azure Cosmos DB에서 대량 작업 수행

적용 대상: NoSQL

참고 항목

이 문서에서 설명하는 대량 실행기 라이브러리는 .NET SDK 2.x 버전을 사용하는 애플리케이션에 대해 유지 관리됩니다. 새 애플리케이션의 경우 .NET SDK 버전 3.x에서 직접 제공되는 대량 지원을 사용할 수 있으며 외부 라이브러리가 필요하지 않습니다.

현재 대량 실행기 라이브러리를 사용 중이고 최신 SDK에서 대량 지원으로 마이그레이션할 계획이라면 마이그레이션 가이드의 단계를 사용하여 애플리케이션을 마이그레이션합니다.

이 자습서에서는 대량 실행기 .NET 라이브러리를 사용하여 Azure Cosmos DB 컨테이너에 문서를 가져오고 업데이트하는 방법에 관한 지침을 제공합니다. 대량 실행기 라이브러리와 방대한 처리량 및 스토리지를 사용하는 방법에 대한 자세한 내용은 Azure Cosmos DB 대량 실행기 라이브러리 개요를 참조하세요. 이 자습서에서는 임의로 생성된 문서를 Azure Cosmos DB 컨테이너에 대량으로 가져오는 샘플 .NET 애플리케이션을 보게 됩니다. 데이터를 가져오면 라이브러리는 패치를 특정 문서 필드에서 수행할 작업으로 지정하여 가져온 데이터를 대량 업데이트하는 방법을 보여 줍니다.

현재 Azure Cosmos DB for NoSQL 및 API for Gremlin 계정에서만 대량 실행기 라이브러리가 지원됩니다. 이 문서에서는 API for NoSQL을 통해 Bulk Executor .NET 라이브러리를 사용하는 방법을 설명합니다. Gremlin 계정용 API와 함께 대량 실행기 .NET 라이브러리를 사용하는 방법을 알아보려면 대량 실행기 라이브러리를 사용하여 Azure Cosmos DB for Gremlin에서 대량으로 데이터 수집을 참조하세요.

필수 조건

샘플 애플리케이션 복제

이제 GitHub에서 샘플 .NET 애플리케이션을 다운로드하여 코드 작업으로 전환해 보겠습니다. 이 애플리케이션은 Azure Cosmos DB 계정에 저장된 데이터에 대해 대량 작업을 수행합니다. 애플리케이션을 복제하려면 명령 프롬프트를 열고 복사할 디렉터리로 이동한 후 다음 명령을 실행합니다.

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

복제된 리포지토리에는 BulkImportSampleBulkUpdateSample이라는 두 샘플이 포함되어 있습니다. 샘플 애플리케이션 중 하나를 열고, Azure Cosmos DB 계정의 연결 문자열로 App.config 파일에서 연결 문자열을 업데이트하고, 솔루션을 빌드하고 실행할 수 있습니다.

BulkImportSample 애플리케이션은 임의의 문서를 생성하고 Azure Cosmos DB 계정에 대량으로 가져옵니다. BulkUpdateSample 애플리케이션은 특정 문서 필드에서 수행할 작업으로 패치를 지정하여 가져온 문서를 대량으로 업데이트합니다. 다음 섹션에서는 이러한 각 샘플 앱에서 코드를 검토하겠습니다.

Azure Cosmos DB 계정으로 데이터 대량 가져오기

  1. BulkImportSample 폴더로 이동하고 BulkImportSample.sln 파일을 엽니다.

  2. 다음 코드에 나와 있는 것처럼 Azure Cosmos DB의 연결 문자열이 App.config 파일에서 검색됩니다.

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    대량 가져오기는 App.config 파일에 지정된 데이터베이스 이름, 컨테이너 이름 및 처리량 값이 포함된 새 데이터베이스 및 컨테이너를 만듭니다.

  3. 다음으로, DocumentClient 개체가 직접 TCP 연결 모드를 사용하여 초기화됩니다.

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. BulkExecutor 개체는 대기 시간 및 제한된 요청을 위해 높은 다시 시도 값으로 초기화됩니다. 그런 다음, 해당 수명 동안 정체 제어를 BulkExecutor로 전달하도록 0으로 설정됩니다.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. 애플리케이션은 BulkImportAsync API를 호출합니다. .NET 라이브러리는 대량 가져오기 API의 두 가지 오버로드를 제공합니다. 하나는 직렬화된 JSON 문서 목록을 허용하고 다른 하나는 역직렬화된 POCO 문서 목록을 허용합니다. 이러한 오버로드된 메서드의 정의를 자세히 알아보려면 API 설명서를 참조하세요.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkImportAsync 메서드는 다음과 같은 매개 변수를 수락합니다.

    매개 변수 설명
    enableUpsert 문서에 대한 upsert 작업을 사용하도록 설정하는 플래그입니다. 지정된 ID가 있는 문서가 이미 존재하는 경우 업데이트됩니다. 기본 설정은 false입니다.
    disableAutomaticIdGeneration ID의 자동 생성을 사용하지 않도록 설정하는 플래그입니다. 기본 설정은 true입니다.
    maxConcurrencyPerPartitionKeyRange 파티션 키 범위당 최대 동시성 수준입니다. null로 설정하면 라이브러리에서 기본값인 20을 사용합니다.
    maxInMemorySortingBatchSize 각 단계에서 API 호출로 전달되는 문서 열거자에서 가져온 최대 문서 수입니다. 대량 가져오기 전에 발생하는 메모리 내 정렬 단계의 경우, 이 매개 변수를 null로 설정하면 라이브러리에서 기본 최솟값(documents.count, 1000000)을 사용합니다.
    cancellationToken 대량 가져오기 작업을 정상적으로 종료하기 위한 취소 토큰입니다.

대량 가져오기 응답 개체 정의
대량 가져오기 API 호출의 결과에는 다음과 같은 특성이 포함되어 있습니다.

매개 변수 설명
NumberOfDocumentsImported(long) 대량 가져오기 API 호출에 적용된 총 문서 중에서 성공적으로 가져온 총 문서 수입니다.
TotalRequestUnitsConsumed(double) 대량 가져오기 API 호출에서 사용된 총 요청 단위(RU)입니다.
TotalTimeTaken(TimeSpan) 실행을 완료하기까지 대량 가져오기 API 호출에서 사용하는 총 시간입니다.
BadInputDocuments(List<object>) 대량으로 가져오기 API 호출에 성공적으로 가져오지 못한 잘못된 형식의 문서 목록입니다. 반환된 문서를 수정하고 가져오기를 다시 시도하세요. 잘못된 형식의 문서에는 ID 값이 문자열이 아닌 문서가 포함됩니다(null 또는 다른 데이터 형식이 잘못된 것으로 간주됨).

Azure Cosmos DB 계정의 대량 업데이트 데이터

BulkUpdateAsync API를 사용하여 기존 문서를 업데이트할 수 있습니다. 이 예제에서는 Name 필드를 새 값으로 설정하고 기존 문서에서 Description 필드를 제거합니다. 지원되는 업데이트 작업의 전체 집합은 API 설명서를 참조하세요.

  1. BulkUpdateSample 폴더로 이동하고 BulkUpdateSample.sln 파일을 엽니다.

  2. 해당 필드 업데이트 작업과 함께 업데이트 항목을 정의합니다. 이 예제에서는 SetUpdateOperation을 사용하여 Name 필드를 업데이트하고, UnsetUpdateOperation을 사용하여 모든 문서에서 Description 필드를 제거합니다. 또한 특정 값으로 문서 필드 증가와 같은 다른 작업을 수행하거나, 배열 필드에 특정 값을 푸시하거나, 배열 필드에서 특정 값을 제거할 수 있습니다. 대량 업데이트 API에서 제공하는 다른 방법을 알아보려면 API 설명서를 참조하세요.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. 애플리케이션은 BulkUpdateAsync API를 호출합니다. BulkUpdateAsync 메서드의 정의에 대해 알아보려면 API 문서를 참조하세요.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkUpdateAsync 메서드는 다음과 같은 매개 변수를 수락합니다.

    매개 변수 설명
    maxConcurrencyPerPartitionKeyRange 파티션 키 범위당 최대 동시성 수준입니다. 이 매개 변수를 null로 설정하면 라이브러리에서 기본값(20)을 사용합니다.
    maxInMemorySortingBatchSize 각 단계에서 API 호출에 전달된 업데이트 항목 열거자에서 가져온 업데이트 항목의 최대 수입니다. 대량 업데이트 전에 발생하는 메모리 내 정렬 단계의 경우, 이 매개 변수를 null로 설정하면 라이브러리에서 기본 최솟값(updateItems.count, 1000000)을 사용합니다.
    cancellationToken 대랑 업데이트 작업을 정상적으로 종료하기 위한 취소 토큰입니다.

대량 업데이트 응답 개체 정의
대량 업데이트 API 호출의 결과에는 다음과 같은 특성이 포함되어 있습니다.

매개 변수 설명
NumberOfDocumentsUpdated(long) 대량 업데이트 API 호출에 적용된 총 문서 중에서 성공적으로 업데이트된 문서 수입니다.
TotalRequestUnitsConsumed(double) 대량 업데이트 API 호출에서 사용한 총 RU(요청 단위)입니다.
TotalTimeTaken(TimeSpan) 실행을 완료하기까지 대량 업데이트 API 호출에서 사용하는 총 시간입니다.

성능 팁

대량 실행기 라이브러리를 사용하는 경우 성능 향상을 위해 다음 사항을 고려합니다.

  • 최상의 성능을 위해 Azure Cosmos DB 계정의 쓰기 지역과 동일한 지역에 있는 Azure 가상 머신에서 애플리케이션을 실행합니다.

  • 특정 Azure Cosmos DB 컨테이너에 해당하는 단일 가상 머신 내에서 전체 애플리케이션에 대해 단일 BulkExecutor 개체를 인스턴스화하는 것이 좋습니다.

  • 단일 대량 작업 API 실행은 내부적으로 여러 작업을 생성할 때 클라이언트 컴퓨터의 CPU 및 네트워크 IO의 상당한 청크를 소비합니다. 대량 작업 API 호출을 실행하는 애플리케이션 프로세스 내에서 동시 작업을 여러 개 생성하지 않는 것이 좋습니다. 단일 가상 머신에서 실행되는 단일 대량 작업 API 호출에서 전체 컨테이너의 처리량을 소비할 수 없는 경우(컨테이너의 처리량 > 1백만 RU/s인 경우) 대량 작업 API 호출을 동시에 실행하도록 별도의 가상 머신을 만드는 것이 좋습니다.

  • InitializeAsync() 메서드가 대상 Azure Cosmos DB 컨테이너 파티션 맵을 가져오기 위해 BulkExecutor 개체를 인스턴스화한 후 호출되는지 확인합니다.

  • 애플리케이션의 App.Config에서 성능 향상을 위해 gcServer를 사용할 수 있도록 설정되었는지 확인합니다.

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • 라이브러리는 로그 파일 또는 콘솔에 수집할 수 있는 추적을 내보냅니다. 둘 다 사용하도록 설정하려면 애플리케이션의 App.Config 파일에 다음 코드를 추가합니다.

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

다음 단계