다음을 통해 공유


Azure Data Explorer Go SDK를 사용하여 데이터 수집

Azure 데이터 탐색기는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장성이 우수한 데이터 탐색 서비스입니다. Azure Data Explorer 서비스와 상호 작용하기 위한 Go SDK 클라이언트 라이브러리를 제공합니다. Go SDK를 사용하여 Azure Data Explorer 클러스터에서 데이터를 수집, 제어 및 쿼리할 수 있습니다.

이 문서에서는 먼저 테스트 클러스터에서 테이블 및 데이터 매핑을 만듭니다. 그런 다음, Go SDK를 사용하여 클러스터 큐에 수집을 넣고 결과의 유효성을 검사합니다.

필수 조건

Go SDK 설치

Azure Data Explorer Go SDK는 Go 모듈을 사용하는 샘플 애플리케이션을 실행할 때 자동으로 설치됩니다. 다른 애플리케이션에 대한 Go SDK를 설치한 경우 Go 모듈을 만들고 go get을 사용하여 Azure Data Explorer 패키지를 가져옵니다. 예를 들면 다음과 같습니다.

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

패키지 종속성이 go.mod 파일에 추가됩니다. Go 애플리케이션에서 사용합니다.

코드 검토

코드 검토 섹션은 선택 사항입니다. 코드의 작동 방식에 관심이 있으면 다음 코드 조각을 살펴보세요. 그렇지 않다면 애플리케이션 실행으로 건너뛰어도 됩니다.

Authenticate

프로그램은 작업을 실행하기 전에 Azure Data Explorer 서비스에 인증해야 합니다.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

kusto.Authorization의 인스턴스는 서비스 주체 자격 증명을 사용하여 생성됩니다. 그런 다음, 클러스터 엔드포인트를 허용하는 함수를 포함하는 kusto.Client를 만드는 데 사용됩니다.

테이블 만들기

create table 명령은 Kusto 문으로 표현됩니다. Mgmt 함수는 management 명령을 실행하는 데 사용됩니다. 명령을 실행하여 테이블을 만드는 데 사용됩니다.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Kusto 문은 보안을 강화하기 위해 기본적으로 상수입니다. NewStmt는 문자열 상수를 허용합니다. UnsafeStmt API는 비상수 문 세그먼트를 사용할 수 있지만 권장되지는 않습니다.

Kusto create table 명령은 다음과 같습니다.

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

매핑 생성

데이터 매핑은 수집 중에 들어오는 데이터를 Azure Data Explorer 테이블 내의 열에 매핑하기 위해 사용됩니다. 자세한 내용은 데이터 매핑을 참조하세요. 데이터베이스 이름 및 적절한 명령과 함께 Mgmt 함수를 사용하여 테이블과 동일한 방식으로 매핑을 만듭니다. 전체 명령은 샘플의 GitHub 리포지토리에서 사용할 수 있습니다.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

데이터 수집

수집은 기존 Azure Blob Storage 컨테이너의 파일을 사용하여 큐에 대기됩니다.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

수집 클라이언트는 ingest.New를 사용하여 생성됩니다. FromFile 함수는 Azure Blob Storage URI를 참조하는 데 사용됩니다. 매핑 참조 이름과 데이터 형식은 FileOption 형식으로 전달됩니다.

애플리케이션 실행

  1. GitHub에서 샘플 코드를 복제합니다.

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. main.go에서 이 코드 조각에 표시된 샘플 코드를 실행합니다.

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    다양한 작업 조합을 시도하려면 main.go에서 각 함수의 주석을 처리하거나 제거할 있습니다.

    샘플 코드를 실행하면 다음 작업이 수행됩니다.

    1. 테이블 삭제: StormEvents 테이블이 있는 경우 삭제됩니다.
    2. 테이블 만들기: StormEvents 테이블이 생성됩니다.
    3. 매핑 만들기: StormEvents_CSV_Mapping 매핑이 만들어집니다.
    4. 파일 수집: CSV 파일(Azure Blob Storage)은 수집을 위해 큐에 대기됩니다.
  3. 인증에 대한 서비스 주체를 만들려면 az ad sp create-for-rbac 명령과 함께 Azure CLI를 사용합니다. 프로그램에서 사용되는 환경 변수 형식으로 클러스터 엔드포인트와 데이터베이스 이름을 사용하여 서비스 주체 정보를 설정합니다.

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. 다음과 같이 프로그램을 실행합니다.

    go run main.go
    

    유사한 결과를 얻게 됩니다.

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

유효성 검사 및 문제 해결

큐에 넣은 수집 프로세스가 예약되고 데이터가 Azure Data Explorer에 로드될 때까지 5~10분 정도 기다립니다.

  1. https://dataexplorer.azure.com에 로그인하고 클러스터에 연결합니다. 그런 다음, 다음 명령을 실행하여 StormEvents 테이블의 레코드 수를 가져옵니다.

    StormEvents | count
    
  2. 데이터베이스에서 다음 명령을 실행하여 지난 4시간 동안 수집 실패가 있었는지 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. 다음 명령을 실행하여 지난 4시간 동안 진행된 모든 수집 작업의 상태를 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

리소스 정리

다른 문서를 따르려는 경우 만든 리소스를 유지합니다. 그렇지 않으면 데이터베이스에서 다음 명령을 실행하여 StormEvents 테이블을 삭제합니다.

.drop table StormEvents

다음 단계