[アーティクル] 10/15/2023
22 人の共同作成者
フィードバック
この記事の内容
この記事では、Azure Data Explorer データベースに JSON 書式付きデータを取り込む方法を示します。 まず、未加工の JSON とマップされた JSON の単純な例を紹介します。次に、複数行の JSON に進み、その後、配列とディクショナリを含むさらに複雑な JSON スキーマに取り組みます。 この例では、Kusto 照会言語 (KQL)、C#、または Python を使用して JSON 形式のデータを取り込むプロセスについて詳しく説明します。
前提条件
Microsoft アカウントまたは Microsoft Entra ユーザー ID。 Azure サブスクリプションは不要です。
Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します 。
Azure Data Explorer は、次の 2 つの JSON ファイル形式をサポートしています。
json
: 行区切り JSON。 入力データの各行には、JSON レコードが 1 つだけ含まれています。 この形式では、コメントと一重引用符で囲まれたプロパティの解析がサポートされます。 詳細については、「JSON Lines 」を参照してください。
multijson
: 複数行の JSON。 パーサーは行区切り記号を無視し、前の位置から有効な JSON の末尾までのレコードを読み取ります。
Note
get データ エクスペリエンスを使用して取り込む場合 既定の形式はmultijson
。 この形式では、複数行の JSON レコードと JSON レコードの配列を処理できます。 解析エラーが発生すると、ファイル全体が破棄されます。 無効な JSON レコードを無視するには、[データ形式のエラーを無視する] オプションを選択します。このオプションを選択すると、形式が json
(JSON Lines) に切り替わります。
JSON 行形式 (json
) を使用している場合、有効な JSON レコードを表していない行は解析中にスキップされます。
JSON 書式付きデータを取り込むには、インジェスト プロパティ を使用して "書式 " を指定する必要があります。 JSON データを取り込むには、JSON ソース エントリをターゲット列にマップするマッピング が必要です。 データを取り込むときは、IngestionMapping
プロパティと ingestionMappingReference
(事前定義されたマッピングの場合) インジェスト プロパティを使用するか、IngestionMappings
プロパティを使用します。 この記事では、インジェストに使用するテーブルで事前に定義されている ingestionMappingReference
インジェスト プロパティを使用します。 次の例では、最初に JSON レコードを生データとして 1 列のテーブルに取り込みます。 次に、マッピングを使用して、各プロパティを、そのマップされた列に取り込みます。
単純な JSON の例
フラットな構造体を持つ単純な JSON の例を次に示します。 データには、複数のデバイスによって収集された温度と湿度の情報が含まれています。 各レコードには、ID とタイムスタンプが付いています。
{
"timestamp": "2019-05-02 15:23:50.0369439",
"deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
}
未加工の JSON レコードを取り込む
この例では、JSON レコードを生データとして 1 列のテーブルに取り込みます。 データ操作、クエリの使用、および更新ポリシーは、データが取り込まれた後に行われます。
Kusto 照会言語を使用して、生の JSON 形式でデータを取り込みます 。
https://dataexplorer.azure.com にサインインします。
[Add cluster]\(クラスターの追加\) を選択します。
[Add cluster] ダイアログ ボックスで https://<ClusterName>.<Region>.kusto.windows.net/
の形式でラスターの URL を入力して、[追加] を選択します。
次のコマンドを貼り付け、 [実行] を選択してテーブルを作成します。
.create table RawEvents (Event: dynamic)
このクエリでは、動的 データ型の 1 つの Event
列を含むテーブルを作成します。
JSON マッピングを作成します。
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
このコマンドは、マッピングを作成し、JSON ルート パス $
を Event
列にマップします。
RawEvents
テーブルにデータを取り込みます。
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
C# を使用して、未加工の JSON 形式 でデータを取り込みます。
RawEvents
テーブルを作成します。
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
JSON マッピングを作成します。
var tableMappingName = "RawEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
このコマンドは、マッピングを作成し、JSON ルート パス $
を Event
列にマップします。
RawEvents
テーブルにデータを取り込みます。
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net/";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Note
データはバッチ処理ポリシー に従って集約され、その結果、数分の待機時間が発生します。
Python を使用して、未加工の JSON 形式 でデータを取り込みます。
RawEvents
テーブルを作成します。
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
JSON マッピングを作成します。
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
RawEvents
テーブルにデータを取り込みます。
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Note
データはバッチ処理ポリシー に従って集約され、その結果、数分の待機時間が発生します。
マップされた JSON レコードを取り込む
この例では、JSON レコード データを取り込みます。 各 JSON プロパティは、テーブル内の 1 つの列にマップされます。
JSON 入力データに類似したスキーマを使用して、新しいテーブルを作成します。 このテーブルは、次のすべての例とインジェスト コマンドで使用します。
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
JSON マッピングを作成します。
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
このマッピングでは、テーブル スキーマによって定義されているように、timestamp
エントリは datetime
データ型として列 Time
に取り込まれます。
Events
テーブルにデータを取り込みます。
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
ファイル 'simple.json' には、行区切りの JSON レコードがいくつか含まれています。 形式は json
であり、インジェスト コマンドで使用されるマッピングは、作成した FlatEventMapping
です。
JSON 入力データに類似したスキーマを使用して、新しいテーブルを作成します。 このテーブルは、次のすべての例とインジェスト コマンドで使用します。
var tableName = "Events";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double")
}
);
await kustoClient.ExecuteControlCommandAsync(command);
JSON マッピングを作成します。
var tableMappingName = "FlatEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
このマッピングでは、テーブル スキーマによって定義されているように、timestamp
エントリは datetime
データ型として列 Time
に取り込まれます。
Events
テーブルにデータを取り込みます。
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
ファイル 'simple.json' には、行区切りの JSON レコードがいくつか含まれています。 形式は json
であり、インジェスト コマンドで使用されるマッピングは、作成した FlatEventMapping
です。
JSON 入力データに類似したスキーマを使用して、新しいテーブルを作成します。 このテーブルは、次のすべての例とインジェスト コマンドで使用します。
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
JSON マッピングを作成します。
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Events
テーブルにデータを取り込みます。
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
ファイル 'simple.json' には、行区切りの JSON レコードがいくつか含まれています。 形式は json
であり、インジェスト コマンドで使用されるマッピングは、作成した FlatEventMapping
です。
複数行の JSON レコードを取り込む
この例では、複数行の JSON レコードを取り込みます。 各 JSON プロパティは、テーブル内の 1 つの列にマップされます。 ファイル 'multilined.json' には、インデントされた JSON レコードがいくつか含まれています。 multijson
形式は、JSON 構造によってレコードを読み取ります。
Events
テーブルにデータを取り込みます。
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Events
テーブルにデータを取り込みます。
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Events
テーブルにデータを取り込みます。
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
配列を含む JSON レコードを取り込む
配列データ型は、順序が付けられた値のコレクションです。 JSON 配列の取り込みは、更新ポリシー によって行われます。 JSON はそのまま中間テーブルに取り込まれます。 更新ポリシーは、RawEvents
テーブルに対して定義済みの関数を実行し、結果をターゲット テーブルに再度取り込みます。 次の構造体でデータを取り込みます。
{
"records":
[
{
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
},
{
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
}
]
}
mv-expand
演算子を使用して、コレクション内の各値が個別の行を受け取るように records
のコレクションを展開する update policy
関数を作成します。 テーブル RawEvents
をソース テーブルとして使用し、Events
をターゲット テーブルとして使用します。
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
関数が受け取るスキーマは、ターゲット テーブルのスキーマと一致している必要があります。 getschema
演算子を使用してスキーマを確認します。
EventRecordsExpand() | getschema
更新ポリシーをターゲット テーブルに追加します。 このポリシーでは、RawEvents
中間テーブルに新しく取り込まれたデータに対してクエリが自動的に実行され、その結果が Events
テーブルに取り込まれます。 中間テーブルが保持されないようにするために、ゼロ保持ポリシーを定義します。
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
RawEvents
テーブルにデータを取り込みます。
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Events
テーブル内のデータを確認します。
Events
mv-expand
演算子を使用して、コレクション内の各値が個別の行を受け取るように records
のコレクションを展開する update 関数を作成します。 テーブル RawEvents
をソース テーブルとして使用し、Events
をターゲット テーブルとして使用します。
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Note
関数が受け取るスキーマは、ターゲット テーブルのスキーマと一致している必要があります。
更新ポリシーをターゲット テーブルに追加します。 このポリシーでは、RawEvents
中間テーブルに新しく取り込まれたデータに対してクエリが自動的に実行され、その結果が Events
テーブルに取り込まれます。 中間テーブルが保持されないようにするために、ゼロ保持ポリシーを定義します。
command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
await kustoClient.ExecuteControlCommandAsync(command);
RawEvents
テーブルにデータを取り込みます。
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Events
テーブル内のデータを確認します。
mv-expand
演算子を使用して、コレクション内の各値が個別の行を受け取るように records
のコレクションを展開する update 関数を作成します。 テーブル RawEvents
をソース テーブルとして使用し、Events
をターゲット テーブルとして使用します。
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Note
関数が受け取るスキーマは、ターゲット テーブルのスキーマと一致している必要があります。
更新ポリシーをターゲット テーブルに追加します。 このポリシーでは、RawEvents
中間テーブルに新しく取り込まれたデータに対してクエリが自動的に実行され、その結果が Events
テーブルに取り込まれます。 中間テーブルが保持されないようにするために、ゼロ保持ポリシーを定義します。
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
RawEvents
テーブルにデータを取り込みます。
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Events
テーブル内のデータを確認します。
関連するコンテンツ