Pozyskiwanie przykładowych danych w formacie JSON do usługi Azure Data Explorer
Artykuł
W tym artykule pokazano, jak pozyskiwać dane sformatowane w formacie JSON do bazy danych usługi Azure Data Explorer. Zaczniesz od prostych przykładów nieprzetworzonych i zamapowanych danych JSON, będziesz nadal korzystać z wielowierszowego kodu JSON, a następnie zajmiesz się bardziej złożonymi schematami JSON zawierającymi tablice i słowniki. W przykładach szczegółowo omówiono proces pozyskiwania danych sformatowanych w formacie JSON przy użyciu języka język zapytań Kusto (KQL), C# lub Python.
Uwaga
Nie zalecamy używania .ingest poleceń zarządzania w scenariuszach produkcyjnych. Zamiast tego należy użyć łącznikadanych lub programowego pozyskiwania danych przy użyciu jednej z bibliotek klienckich usługi Kusto.
Wymagania wstępne
Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
Usługa Azure Data Explorer obsługuje dwa formaty plików JSON:
json: Oddzielony wierszem kod JSON. Każdy wiersz w danych wejściowych ma dokładnie jeden rekord JSON. Ten format obsługuje analizowanie komentarzy i właściwości z pojedynczym cudzysłów. Aby uzyskać więcej informacji, zobacz Wiersze JSON.
multijson: wielowierszowy kod JSON. Analizator ignoruje separatory wierszy i odczytuje rekord z poprzedniej pozycji na końcu prawidłowego kodu JSON.
Uwaga
Podczas pozyskiwania przy użyciu środowiska pobierania danych domyślny format to multijson. Format może obsługiwać wielowierszowe rekordy JSON i tablice rekordów JSON. Po napotkaniu błędu analizowania cały plik zostanie odrzucony. Aby zignorować nieprawidłowe rekordy JSON, wybierz opcję "Ignoruj błędy formatu danych". Spowoduje to przełączenie formatu na json (linie JSON).
Jeśli używasz formatu linii JSON (json), wiersze, które nie reprezentują prawidłowych rekordów JSON, są pomijane podczas analizowania.
Pozyskiwanie i mapowanie sformatowanych danych JSON
Pozyskiwanie danych sformatowanych w formacie JSON wymaga określenia formatu przy użyciu właściwości pozyskiwania. Pozyskiwanie danych JSON wymaga mapowania, które mapuje wpis źródłowy JSON na kolumnę docelową. Podczas pozyskiwania danych należy użyć IngestionMapping właściwości z właściwością ingestionMappingReference (dla wstępnie zdefiniowanego mapowania) pozyskiwania lub jej IngestionMappings właściwości. W tym artykule zostanie użyta właściwość pozyskiwania ingestionMappingReference , która jest wstępnie zdefiniowana w tabeli używanej do pozyskiwania. W poniższych przykładach zaczniemy od pozyskiwania rekordów JSON jako danych pierwotnych do tabeli z jedną kolumną. Następnie użyjemy mapowania, aby pozyskać każdą właściwość do jej zamapowanej kolumny.
Prosty przykład kodu JSON
Poniższy przykład to prosty kod JSON z płaską strukturą. Dane zawierają informacje o temperaturze i wilgotności zebrane przez kilka urządzeń. Każdy rekord jest oznaczony identyfikatorem i znacznikiem czasu.
W tym przykładzie pozyskasz rekordy JSON jako nieprzetworzone dane do pojedynczej tabeli kolumn. Manipulowanie danymi przy użyciu zapytań i zasad aktualizacji odbywa się po pozyskiwaniu danych.
W oknie dialogowym Dodawanie klastra wprowadź adres URL klastra w formularzu https://<ClusterName>.<Region>.kusto.windows.net/, a następnie wybierz pozycję Dodaj.
Wklej następujące polecenie i wybierz pozycję Uruchom , aby utworzyć tabelę.
.create table RawEvents (Event: dynamic)
To zapytanie tworzy tabelę z jedną Event kolumną dynamicznego typu danych.
To polecenie tworzy mapowanie i mapuje ścieżkę $ JSON główną do kolumny Event .
Pozyskiwanie danych do RawEvents tabeli.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
W tym mapowaniu, zgodnie ze schematem tabeli, timestamp wpisy zostaną pozyskane do kolumny Time jako datetime typy danych.
Pozyskiwanie danych do Events tabeli.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
Plik "simple.json" zawiera kilka rozdzielonych wierszami rekordów JSON. Format to json, a mapowanie używane w poleceniu pozyskiwania FlatEventMapping jest utworzone.
Utwórz nową tabelę z podobnym schematem do danych wejściowych JSON. Użyjemy tej tabeli dla wszystkich poniższych przykładów i pozyskamy polecenia.
W tym mapowaniu, zgodnie ze schematem tabeli, timestamp wpisy zostaną pozyskane do kolumny Time jako datetime typy danych.
Pozyskiwanie danych do Events tabeli.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Plik "simple.json" zawiera kilka rozdzielonych wierszami rekordów JSON. Format to json, a mapowanie używane w poleceniu pozyskiwania FlatEventMapping jest utworzone.
Utwórz nową tabelę z podobnym schematem do danych wejściowych JSON. Użyjemy tej tabeli dla wszystkich poniższych przykładów i pozyskamy polecenia.
Plik "simple.json" zawiera kilka wierszy rozdzielonych rekordami JSON. Format to json, a mapowanie używane w poleceniu pozyskiwania FlatEventMapping jest utworzone.
Pozyskiwanie wielowierszowych rekordów JSON
W tym przykładzie pozyskasz wielowierszowe rekordy JSON. Każda właściwość JSON jest mapowana na jedną kolumnę w tabeli. Plik "multilined.json" ma kilka wcięć rekordów JSON. Format multijson wskazuje odczytywanie rekordów przez strukturę JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Pozyskiwanie danych do Events tabeli.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Typy danych tablicy to uporządkowana kolekcja wartości. Pozyskiwanie tablicy JSON odbywa się za pomocą zasad aktualizacji. Kod JSON jest pozyskiwany zgodnie z tabelą pośrednią. Zasady aktualizacji uruchamia wstępnie zdefiniowaną funkcję w RawEvents tabeli, ponownie pozyskając wyniki do tabeli docelowej. Pozyskamy dane z następującą strukturą:
update policy Utwórz funkcję, która rozszerza kolekcjęrecords, tak aby każda wartość w kolekcji odbierała oddzielny wiersz przy użyciu mv-expand operatora . Użyjemy tabeli RawEvents jako tabeli źródłowej i Events jako tabeli docelowej.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Schemat odebrany przez funkcję musi być zgodny ze schematem tabeli docelowej. Użyj getschema operatora , aby przejrzeć schemat.
EventRecordsExpand() | getschema
Dodaj zasady aktualizacji do tabeli docelowej. Te zasady automatycznie uruchamiają zapytanie dotyczące wszystkich nowo pozyskanych danych w tabeli pośredniej RawEvents i pozyskują wyniki w Events tabeli. Zdefiniuj zasady zerowego przechowywania, aby uniknąć utrwalania tabeli pośredniej.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Przejrzyj dane w Events tabeli.
Events
Utwórz funkcję aktualizacji, która rozszerza kolekcję records , tak aby każda wartość w kolekcji odbierała oddzielny wiersz przy użyciu mv-expand operatora . Użyjemy tabeli RawEvents jako tabeli źródłowej i Events jako tabeli docelowej.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Uwaga
Schemat odebrany przez funkcję musi być zgodny ze schematem tabeli docelowej.
Dodaj zasady aktualizacji do tabeli docelowej. Te zasady automatycznie uruchamiają zapytanie dotyczące wszystkich nowo pozyskanych danych w tabeli pośredniej RawEvents i pozyskują wyniki w Events tabeli. Zdefiniuj zasady zerowego przechowywania, aby uniknąć utrwalania tabeli pośredniej.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Przejrzyj dane w Events tabeli.
Utwórz funkcję aktualizacji, która rozszerza kolekcję records , tak aby każda wartość w kolekcji odbierała oddzielny wiersz przy użyciu mv-expand operatora . Użyjemy tabeli RawEvents jako tabeli źródłowej i Events jako tabeli docelowej.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Uwaga
Schemat odebrany przez funkcję musi być zgodny ze schematem tabeli docelowej.
Dodaj zasady aktualizacji do tabeli docelowej. Te zasady automatycznie uruchamiają zapytanie dotyczące wszystkich nowo pozyskanych danych w tabeli pośredniej RawEvents i pozyskują wyniki w Events tabeli. Zdefiniuj zasady zerowego przechowywania, aby uniknąć utrwalania tabeli pośredniej.