Azure Data Explorer の Python ライブラリを使用してデータを取り込む
この記事では、Azure Data Explorer の Python ライブラリを使用してデータを取り込みます。 Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Azure Data Explorer では、Python 用のクライアント ライブラリとして、取り込みライブラリとデータ ライブラリの 2 つが用意されています。 これらのライブラリを使用すると、クラスターにデータを取り込み (読み込み)、コードからデータのクエリを行うことができます。
まず、クラスター内にテーブルとデータ マッピングを作成します。 その後、クラスターに対するインジェストをキューに入れて、結果を検証します。
前提条件
- Microsoft アカウントまたはMicrosoft Entraユーザー ID。 Azure サブスクリプションは不要です。
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します。
- Python 3.4 以上。
データ ライブラリと取り込みライブラリをインストールする
azure-kusto-data と azure-kusto-ingest をインストールします。
pip install azure-kusto-data
pip install azure-kusto-ingest
import ステートメントおよび定数を追加する
azure-kusto-data からクラスをインポートします。
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
アプリケーションを認証するために、Azure Data ExplorerはMicrosoft Entraテナント ID を使用します。 テナント ID を検索するには、YourDomain をお使いのドメインに置き換えて、次の URL を使用します。
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
たとえば、ドメインが contoso.com の場合、URL は https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/ になります。 結果を表示するには、この URL をクリックします。最初の行は次のとおりです。
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
この場合のテナント ID は 6babcaad-604b-40ac-a9d7-9fd97c0b779f
です。 このコードを実行する前に、AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI、KUSTO_DATABASE の値を設定します。
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
では、接続文字列を作成します。 以下の例では、デバイス認証を使用してクラスターにアクセスします。 マネージド ID 認証、アプリケーション証明書のMicrosoft Entra、アプリケーション キーのMicrosoft Entra、ユーザーとパスワードのMicrosoft Entraを使用することもできます。
ターゲット テーブルとマッピングは後のステップで作成します。
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
ソース ファイルの情報を設定する
他のクラスをインポートし、データ ソース ファイルに対する定数を設定します。 この例では、Azure Blob Storage でホストされているサンプル ファイルを使います。 StormEvents サンプル データセットには、国立環境情報センターの気象関連データが含まれています。
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
クラスターにテーブルを作成する
StormEvents.csv ファイル内のデータのスキーマと一致するテーブルを作成します。 このコードを実行すると、次のようなメッセージが返されます: "サインインするには、Web ブラウザーを使用して https://microsoft.com/devicelogin ページを開き、認証するためのコード F3W4VWZDM を入力します。" この手順に従ってサインインし、元のページに戻って次のコード ブロックを実行します。 接続を行う後続のコード ブロックでは、再びサインインする必要があります。
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
インジェストのマッピングを定義する
受信した CSV データを、テーブル作成時に使用される列名とデータ型にマップします。 これにより、ソース データのフィールドがターゲットのテーブル列にマップされます。
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
取り込みのためにメッセージをキューに入れる
BLOB ストレージからデータをプルし、そのデータを Azure Data Explorer に取り込むために、メッセージをキューに入れます。
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
テーブルに取り込まれたデータを照会する
キューに入れられたインジェストで取り込みがスケジュールされて、Azure Data Explorer にデータが読み込まれるまで、5 から 10 分待ちます。 その後、次のコードを実行して、StormEvents テーブル内のレコードの数を取得します。
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
トラブルシューティングのクエリを実行する
https://dataexplorer.azure.com にサインインして、クラスターに接続します。 データベースで次のコマンドを実行し、過去 4 時間以内にインジェスト エラーがあったかどうかを調べます。 実行する前にデータベース名を置き換えてください。
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
次のコマンドを実行し、過去 4 時間以内のすべてのインジェスト操作の状態を表示します。 実行する前にデータベース名を置き換えてください。
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
リソースをクリーンアップする
他の記事に進む場合は、作成したリソースをそのままにします。 行わない場合は、データベースで次のコマンドを実行して、StormEvents テーブルをクリーンアップします。
.drop table StormEvents