Azure Data Explorer Go SDK 使用してデータを取り込む
Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Azure Data Explorer サービスとやり取りするための Go SDK クライアント ライブラリが提供されます。 Go SDK を使用して、Azure Data Explorer クラスター内のデータを取り込み、制御し、クエリを実行できます。
この記事ではまず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、Go SDK を使用してクラスターに対するインジェストをキューに登録し、結果を確認します。
前提条件
- Microsoft アカウントまたは Microsoft Entra ユーザー ID。 Azure サブスクリプションは不要です。
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します。
- Git のインストール。
- この Go SDK の最小要件に従って、Go をインストールします。
- アプリの登録を作成し、データベースに対するアクセス許可を付与します。 後で使用するために、クライアント ID とクライアント シークレットを保存します。
Go SDK をインストールする
Go モジュールを使用するサンプル アプリケーションを実行すると、Azure Data Explorer Go SDK が自動的にインストールされます。 別のアプリケーション用の Go SDK をインストールした場合は、次の例のように、Go モジュールを作成し、(go get
を使用して) Azure Data Explorer パッケージをフェッチします。
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
パッケージの依存関係は go.mod
ファイルに追加されます。 これは Go アプリケーションで使用します。
コードの確認
この「コードの確認」セクションは省略可能です。 コードのしくみに関心がある場合は、次のコード スニペットで確認できます。 それ以外の場合は、「アプリケーションの実行」に進んでください。
認証
何らかの操作を実行する前に、プログラムから Azure Data Explorer サービスに対して認証を行う必要があります。
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
kusto.Authorization のインスタンスは、サービス プリンシパルの資格情報を使用して作成されます。 その後、kusto.Client を作成するために使用されます。その際に、クラスター エンドポイントも受け入れる新しい関数が使用されます。
テーブルを作成する
create table コマンドは、Kusto ステートメントによって表されます。Mgmt 関数は、管理コマンドを実行するために使用されます。 これは、コマンドを実行してテーブルを作成するために使用されます。
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
ヒント
セキュリティを強化するため、既定では、Kusto ステートメントは定数となります。 NewStmt
で文字列定数が受け入れられます。 UnsafeStmt
API で非定数のステートメント セグメントを使用できますが、推奨されません。
Kusto の create table コマンドは、次のようになります。
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
マッピングを作成する
インジェスト中にデータ マッピングが使用され、受信データが Azure Data Explorer テーブル内の列にマップされます。 詳細については、「データ マッピング」を参照してください。 テーブルと同じ方法でマッピングが作成されます。その場合、Mgmt
関数を使用し、データベース名と適切なコマンドを指定します。 完全なコマンドは、サンプルの GitHub リポジトリで入手できます。
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
データを取り込む
インジェストは、既存の Azure Blob Storage コンテナーのファイルを使用してキューに登録されます。
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
インジェスト クライアントは、ingest.New を使用して作成されます。 FromFile 関数は、Azure Blob Storage URI を参照するために使用されます。 マッピング参照名とデータ型は FileOption の形式で渡されます。
アプリケーションの実行
GitHub から次のサンプル コードをクローンします。
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
main.go
のこのスニペットに示されているサンプル コードを実行します。func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }
ヒント
操作のさまざまな組み合わせを試すために、
main.go
内の各関数をコメント解除したり、コメント化したりすることができます。サンプル コードを実行すると、次のアクションが行われます。
- テーブルを削除する:
StormEvents
テーブルが削除されます (存在する場合)。 - テーブルの作成:
StormEvents
テーブルが作成されます。 - マッピングの作成:
StormEvents_CSV_Mapping
マッピングが作成されます。 - ファイル インジェスト: (Azure Blob Storage 内の) CSV ファイルがインジェストのためにキューに入れられます。
- テーブルを削除する:
認証用のサービス プリンシパルを作成するには、Azure CLI を使用して az ad sp create-for-rbac コマンドを指定します。 プログラムによって使用される環境変数の形式で、クラスター エンドポイントとデータベース名を指定してサービス プリンシパル情報を設定します。
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
以下のプログラムを実行します。
go run main.go
次のような出力が表示されます。
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
確認してトラブルシューティングを行う
キューに登録されたインジェストでインジェスト プロセスがスケジュールされ、Azure Data Explorer にデータが読み込まれるまで、5 分から 10 分待ちます。
https://dataexplorer.azure.com にサインインして、クラスターに接続します。 その後、次のコマンドを実行して、
StormEvents
テーブル内のレコードの数を取得します。StormEvents | count
データベースで次のコマンドを実行し、過去 4 時間以内にインジェスト エラーがあったかどうかを調べます。 実行する前にデータベース名を置き換えてください。
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
次のコマンドを実行し、過去 4 時間以内のすべてのインジェスト操作の状態を表示します。 実行する前にデータベース名を置き換えてください。
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
リソースをクリーンアップする
他の記事に進む場合は、作成したリソースをそのままにします。 そのようにしない場合は、データベースで次のコマンドを実行し、StormEvents
テーブルを削除します。
.drop table StormEvents