你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 SDK 为 Azure 数据资源管理器创建事件网格数据连接

本文介绍如何使用事件网格数据连接将 blob 从存储帐户引入到 Azure 数据资源管理器。 你将创建一个用于设置 Azure 事件网格订阅的事件网格数据连接。 事件网格订阅通过 Azure 事件中心将事件从存储帐户路由到 Azure 数据资源管理器。

若要了解如何在 Azure 门户中或使用 ARM 模板创建连接,请参阅创建事件网格数据连接

有关如何从事件网格引入 Azure 数据资源管理器的一般信息,请参阅连接到事件网格

注意

若要使用事件网格连接实现最佳性能,请通过 Blob 元数据设置 rawSizeBytes 引入属性。 有关详细信息,请参阅引入属性

有关基于以前的 SDK 版本的代码示例,请参阅存档的文章

先决条件

创建事件网格数据连接

在本部分,我们将在事件网格与 Azure 数据资源管理器表之间建立连接。

  1. 安装 Microsoft.Azure.Management.Kusto NuGet 包

  2. 创建用于身份验证的 Microsoft Entra 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端机密。

  3. 运行以下代码。

    var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
    var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
    var clientSecret = "PlaceholderClientSecret"; //Client Secret
    var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
    var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
    var resourceManagementClient = new ArmClient(credentials, subscriptionId);
    var resourceGroupName = "testrg";
    //The cluster and database that are created as part of the Prerequisites
    var clusterName = "mykustocluster";
    var databaseName = "mykustodatabase";
    var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
    var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
    var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
    var database = (await cluster.GetKustoDatabaseAsync(databaseName)).Value;
    var dataConnections = database.GetKustoDataConnections();
    var eventGridConnectionName = "myeventgridconnect";
    //The event hub and storage account that are created as part of the Prerequisites
    var eventHubResourceId = new ResourceIdentifier("/subscriptions/<storageAccountSubscriptionId>/resourceGroups/<storageAccountResourceGroupName>/providers/Microsoft.Storage/storageAccounts/<storageAccountName>");
    var storageAccountResourceId = new ResourceIdentifier("/subscriptions/<eventHubSubscriptionId>/resourceGroups/<eventHubResourceGroupName>/providers/Microsoft.EventHub/namespaces/<eventHubNamespaceName>/eventhubs/<eventHubName>");
    var consumerGroup = "$Default";
    var location = AzureLocation.CentralUS;
    //The table and column mapping are created as part of the Prerequisites
    var tableName = "StormEvents";
    var mappingRuleName = "StormEvents_CSV_Mapping";
    var dataFormat = KustoEventGridDataFormat.Csv;
    var blobStorageEventType = BlobStorageEventType.MicrosoftStorageBlobCreated;
    var databaseRouting = KustoDatabaseRouting.Multi;
    var eventGridConnectionData = new KustoEventGridDataConnection
    {
        StorageAccountResourceId = storageAccountResourceId, EventHubResourceId = eventHubResourceId,
        ConsumerGroup = consumerGroup, TableName = tableName, Location = location, MappingRuleName = mappingRuleName,
        DataFormat = dataFormat, BlobStorageEventType = blobStorageEventType, DatabaseRouting = databaseRouting
    };
    await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, eventGridConnectionName, eventGridConnectionData);
    
    设置 建议的值 字段说明
    tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。
    subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。
    clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。
    clientSecret PlaceholderClientSecret 可以访问租户中资源的应用程序的客户端密码。
    resourceGroupName testrg 包含群集的资源组的名称。
    clusterName mykustocluster 群集的名称。
    databaseName mykustodatabase 群集中目标数据库的名称。
    eventGridConnectionName myeventgridconnect 所需的数据连接名称。
    tableName StormEvents 目标数据库中目标表的名称。
    mappingRuleName StormEvents_CSV_Mapping 与目标表相关的列映射的名称。
    dataFormat csv 消息的数据格式。
    eventHubResourceId 资源 ID 将事件网格配置为发送事件的事件中心的资源 ID。
    storageAccountResourceId 资源 ID 包含要引入数据的存储帐户的资源 ID。
    consumerGroup $Default 事件中心的使用者组。
    location 美国中部 数据连接资源的位置。
    blobStorageEventType Microsoft.Storage.BlobCreated 触发引入的事件类型。 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 仅 ADLSv2 存储支持 Blob 重命名。
    databaseRouting 多或单 连接的数据库路由。 如果将此值设置为“单”,数据连接将按“databaseName”设置中指定的那样路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由

使用事件网格数据连接

本部分介绍如何在创建 Blob 或重命名 Blob 后,触发从 Azure Blob 存储或 Azure Data Lake Gen 2 到群集的引入。

根据用于上传 Blob 的存储 SDK 类型选择相关的选项卡。

以下代码示例使用 Azure Blob 存储 SDK 将文件上传到 Azure Blob 存储。 上传会触发事件网格数据连接,该连接将数据引入到 Azure 数据资源管理器中。

var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();

注意

Azure 数据资源管理器在引入后不会删除 blob。 使用 Azure Blob 存储生命周期管理 blob 删除,将 blob 保留三到五天。

注意

启用了分层命名空间功能的存储帐户不支持在 CopyBlob 操作后触发引入。

重要

我们强烈建议不要从自定义代码生成存储事件并将其发送到事件中心。 如果选择这样做,请确保生成的事件严格遵循相应的存储事件架构和 JSON 格式规范。

删除事件网格数据连接

若要删除事件网格连接,请运行以下命令:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);