你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Azure 数据资源管理器 Python 库引入数据

本文将使用 Azure 数据资源管理器 Python 库引入数据。 Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure 数据资源管理器为 Python 提供了两个客户端库:引入库数据库。 使用这些库,可以从代码将数据引入或加载到群集中并查询数据。

首先,在群集中创建一个表和数据映射。 然后将引入排列到群集并验证结果。

先决条件

安装数据和引入库

安装 azure-kusto-data 和 azure-kusto-ingest 。

pip install azure-kusto-data
pip install azure-kusto-ingest

添加导入语句和常量

从 azure-kusto-data 导入类。

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

为了对应用程序进行身份验证,Azure 数据资源管理器使用Microsoft Entra租户 ID。 若要查找租户 ID,请使用以下 URL,将 YourDomain 替换为你的域。

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

例如,如果域名为 contoso.com,则该 URL 将是:https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 单击此 URL 以查看结果;第一行如下所示。

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在这种情况下,租户 ID 为 6babcaad-604b-40ac-a9d7-9fd97c0b779f。 在运行此代码之前,请为 AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI 和 KUSTO_DATABASE 设置值。

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

现在构造连接字符串。 下面的示例使用设备身份验证来访问群集。 还可以使用托管标识身份验证、Microsoft Entra应用程序证书Microsoft Entra应用程序密钥以及Microsoft Entra用户和密码

在后续步骤中创建目标表和映射。

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

设置源文件信息

导入其他类并设置数据源文件的常数。 此示例使用 Azure Blob 存储上托管的示例文件。 StormEvents 示例数据集包含来自国家环境信息中心与天气相关的数据。

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

在群集上创建表

创建与 StormEvents.csv 文件中的数据架构匹配的表。 运行此代码时,它会返回如下消息:若要登录,请使用 Web 浏览器打开页面 https://microsoft.com/devicelogin ,然后输入代码 F3W4VWZDM 进行身份验证。 按照步骤登录,然后返回运行下一个代码块。 建立连接的后续代码块将要求你再次登录。

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

定义引入映射

将传入的 CSV 数据映射到创建表时使用的列名称和数据类型。 这会将源数据字段映射到目标表列

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

列入一条引入消息

将一条消息排入队列,以便从 blob 存储中提取数据并将该数据引入到 Azure 数据资源管理器。

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

查询已引入表中的数据

等待五到十分钟,直到排入队列的引入已计划在 Azure 数据资源管理器中引入和加载数据。 然后运行以下代码,以获取 StormEvents 表中记录的计数。

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

运行故障排除查询

登录到 https://dataexplorer.azure.com 并连接到群集。 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。 在运行之前替换数据库名称。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四个小时内所有引入操作的状态。 在运行之前替换数据库名称。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源

如果计划学习我们的其他文章,请保留已创建的资源。 否则,在数据库中运行以下命令以清除 StormEvents 表。

.drop table StormEvents

后续步骤