建立 Azure 數據總管的事件中樞數據連線
- 發行項
Azure 數據總管提供從 事件中樞擷取、巨量數據串流平臺和事件擷取服務。 事件中樞可以近乎即時地每秒處理數百萬個事件。
在本文中,您將連線到事件中樞,並將數據內嵌至 Azure 數據總管。 如需從事件中樞擷取的概觀,請參閱 Azure 事件中樞 數據連線。
必要條件
- Azure 訂用帳戶。 建立免費的 Azure 帳戶。
- Azure 資料總管叢集和資料庫。 建立叢集和資料庫。
- 目的地數據表。 建立數據表 或使用現有的數據表。
- 數據表的擷取對應。
- 具有 擷取數據的事件中樞 。
建立事件中樞數據連線
在本節中,您將建立事件中樞與 Azure 數據總管數據表之間的連線。 只要此連線已就緒,數據就會從事件中樞傳輸至目標數據表。 如果事件中樞移至不同的資源或訂用帳戶,您必須更新或重新建立連線。
在 Azure 入口網站 中,移至您的叢集,然後選取 [資料庫]。 然後,選取包含目標數據表的資料庫。
從左側功能表中,選取 [ 數據擷取]。 然後,在頂端列中,選取 [ 新增數據連線]。
使用下列資訊填寫窗體,然後選取 [ 建立]。
設定 建議的值 欄位描述 資料連線名稱 test-hub-connection 您想要在 Azure 資料總管中建立的連接名稱。 訂用帳戶 事件中樞資源所在的訂用帳戶標識碼。 事件中樞命名空間 唯一的命名空間名稱 您稍早選擇用來識別命名空間的名稱。 事件中樞 test-hub 您建立的事件中樞。 取用者群組 test-group 在您建立的事件中樞內定義的取用者群組。 事件系統屬性 選取相關的屬性 事件中 樞系統屬性。 如果每個事件訊息有多個記錄,系統會將系統屬性新增至第一筆記錄。 新增系統屬性時, 請建立 或 更新 數據表架構,並 對應 以包含選取的屬性。 壓縮 None 事件中樞訊息承載的壓縮類型。 支援的壓縮類型: None、Gzip。 受控識別 (建議) 系統指派 數據總管叢集用來存取從事件中樞讀取的受控識別。 建議您使用受控識別來控制事件中樞的存取權。
注意:
建立資料連線時:
* 如果系統指派的 身分識別不存在,系統指派的身分識別會自動建立
* 受控識別會自動指派 Azure 事件中樞 數據接收者角色,並新增至數據總管叢集。 建議您確認已指派角色,並將身分識別新增至叢集。注意
如果您有未使用受控識別的現有數據連線,建議您將其更新為使用受控識別。
下列步驟將引導您透過 Azure 數據總管 Web UI 中的擷取精靈建立事件中樞連線。
注意
從 Azure 數據總管 Web UI 的 [數據] 索引標籤中,從 [事件中樞] 卡片中選取 [內嵌數據]。
[內嵌數據] 視窗隨即開啟,並已選取 [目的地] 索引卷標。 [叢集] 和 [資料庫] 字段會自動填入。 您可以從下拉功能表中選取不同的叢集或資料庫。
在 [數據表] 底下,選取 [新增數據表],然後輸入新數據表的名稱。 或者,使用現有的數據表。
選取 [下一步:來源]。
在 [來源類型] 底下,選取 [事件中樞]。
在 [數據連線] 底下,填入下列字段,然後選取 [下一步:架構]。
設定 建議的值 欄位描述 訂用帳戶 事件中樞資源所在的訂用帳戶標識碼。 事件中樞命名空間 識別命名空間的名稱。 事件中樞 您想要使用的事件中樞。 資料連線名稱 TestDataConnection 識別數據連線的名稱。 取用者群組 事件中樞中定義的取用者群組。 壓縮 事件中樞訊息承載的壓縮類型。 事件系統屬性 選取相關的屬性 事件中 樞系統屬性。 如果每個事件訊息有多個記錄,會將系統屬性新增至第一個記錄。 新增系統屬性時, 請建立 或 更新 數據表架構,並 對應 以包含選取的屬性。 事件擷取開始日期 國際標準時間(UTC) 數據聯機會擷取事件擷取開始日期之後建立的現有事件中樞事件。 只能擷取事件中樞保留期間所保留的事件。 如果未指定事件擷取開始日期,則默認時間是建立數據連線的時間。 設定擷取原則。 如果 叢集已啟用串流 ,您可以選取 [串流擷取]。 如果叢集未啟用串流,請設定 Batching 時間。 針對事件中樞,建議 的批處理時間為 30秒。
選取 [ 數據格式]。 若為 CSV 格式的數據, 請忽略第一筆記錄 以忽略檔案的標題數據列。 針對 JSON 格式的數據,選取 [ 忽略數據格式錯誤 ] 以擷取 JSON 格式的數據,或將未選取為以 multijson 格式擷取數據。 選取巢 狀層級 以判斷數據表數據行數據分割。
如果您在預覽視窗中看到的數據尚未完成,您可能需要更多數據來建立具有所有必要數據欄位的數據表。 使用下列命令從事件中樞擷取新資料:
- 丟棄並擷取新資料:丟棄顯示的資料並搜尋新事件。
- 擷取更多資料:除了已找到的事件之外,還搜尋更多事件。
注意
若要查看數據的預覽,事件中樞必須傳送事件。
選取 [下一步:摘要]。
在 [ 事件中樞已建立 的連續擷取] 視窗中,當建立成功完成時,所有步驟都會標示為綠色複選標記。
建立要用於驗證的 Azure AD 應用程式主體 。 您將需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。
執行下列程式碼。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID var clientSecret = "PlaceholderClientSecret"; //Client Secret var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; var authClient = ConfidentialClientApplicationBuilder.Create(clientId) .WithAuthority($"https://login.microsoftonline.com/{tenantId}") .WithClientSecret(clientSecret) .Build(); var result = authClient.AcquireTokenForClient(new[] { "https://management.core.windows.net/" }).ExecuteAsync().Result; var credentials = new TokenCredentials(result.AccessToken, result.TokenType); var kustoManagementClient = new KustoManagementClient(credentials) { SubscriptionId = subscriptionId }; var resourceGroupName = "testrg"; //The cluster and database that are created as part of the Prerequisites var clusterName = "mykustocluster"; var databaseName = "mykustodatabase"; var eventHubConnectionName = "myeventhubconnect"; //The event hub that is created as part of the Prerequisites var eventHubResourceId = "/subscriptions/<eventHubSubscriptionId>/resourceGroups/<eventHubResourceGroupName>/providers/Microsoft.EventHub/namespaces/<eventHubNamespaceName>/eventhubs/<eventHubName>"; var consumerGroup = "$Default"; var location = "Central US"; //The table and column mapping are created as part of the Prerequisites var tableName = "StormEvents"; var mappingRuleName = "StormEvents_CSV_Mapping"; var dataFormat = EventHubDataFormat.CSV; var compression = "None"; var databaseRouting = "Multi"; var eventHubConnectionData = new EventHubDataConnection( eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression, databaseRouting: databaseRouting ); await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, eventHubConnectionName, eventHubConnectionData);
設定 建議的值 欄位描述 tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 您的租用戶識別碼。 也稱為目錄標識碼。 subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 您用來建立資源的訂用帳戶標識碼。 clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 應用程式可存取租用戶中資源的用戶端標識碼。 clientSecret PlaceholderClientSecret 應用程式可存取租用戶中資源的客戶端密碼。 resourceGroupName testrg 包含叢集的資源群組名稱。 clusterName mykustocluster 您叢集的名稱。 databaseName mykustodatabase 叢集中目標資料庫的名稱。 eventHubConnectionName myeventhubconnect 數據連線所需的名稱。 tableName StormEvents 目標資料庫中的目標數據表名稱。 mappingRuleName StormEvents_CSV_Mapping 與目標數據表相關的數據行對應名稱。 dataFormat csv 訊息的數據格式。 eventHubResourceId 資源識別碼 事件中樞的資源標識碼,其中包含要擷取的數據。 consumerGroup $Default 事件中樞的取用者群組。 location Central US 數據連線資源的位置。 壓縮 Gzip 或 None 數據壓縮的類型。 databaseRouting 多重 或 單一 線上的資料庫路由。 如果您將值設定為Single,數據聯機會路由傳送至叢集中的單一資料庫,如 databaseName 設定中所指定。 如果您將值設定為 Multi,您可以使用資料庫擷取屬性覆寫預設目標資料庫。 如需詳細資訊,請參閱 事件路由。
安裝必要的程式庫。
pip install azure-common pip install azure-mgmt-kusto
建立要用於驗證的 Azure AD 應用程式主體 。 您將需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。
執行下列程式碼。
from azure.mgmt.kusto import KustoManagementClient from azure.mgmt.kusto.models import EventHubDataConnection from azure.identity import ClientSecretCredential #Directory (tenant) ID tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Application ID client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Client Secret client_secret = "xxxxxxxxxxxxxx" subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" credentials = ServicePrincipalCredentials( client_id=client_id, secret=client_secret, tenant=tenant_id ) kusto_management_client = KustoManagementClient(credentials, subscription_id) resource_group_name = "myresourcegroup" #The cluster and database that are created as part of the Prerequisites cluster_name = "mycluster" database_name = "mydatabase" data_connection_name = "myeventhubconnect" #The event hub that is created as part of the Prerequisites event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/myresourcegroup/providers/Microsoft.EventHub/namespaces/myeventhubnamespace/eventhubs/myeventhub""; consumer_group = "$Default" location = "Central US" #The table and column mapping that are created as part of the Prerequisites table_name = "mytable" mapping_rule_name = "mytablemappingrule" data_format = "csv" database_routing = "Multi" #Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python poller = kusto_management_client.data_connections.create_or_update( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name, parameters=EventHubDataConnection( event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location, table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing ) ) poller.wait() print(poller.result())
設定 建議的值 欄位描述 tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 您的租用戶識別碼。 也稱為目錄標識碼。 subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 您用來建立資源的訂用帳戶標識碼。 client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 應用程式可存取租用戶中資源的用戶端標識碼。 client_secret xxxxxxxx 應用程式可存取租用戶中資源的客戶端密碼。 resource_group_name myresourcegroup 包含叢集的資源群組名稱。 cluster_name mycluster 您叢集的名稱。 database_name mydatabase 叢集中目標資料庫的名稱。 data_connection_name myeventhubconnect 數據連線所需的名稱。 table_name mytable 目標資料庫中的目標數據表名稱。 mapping_rule_name mytablemappingrule 與目標數據表相關的數據行對應名稱。 data_format csv 訊息的數據格式。 event_hub_resource_id 資源識別碼 事件中樞的資源標識碼,其中包含要擷取的數據。 consumer_group $Default 事件中樞的取用者群組。 location Central US 數據連線資源的位置。 databaseRouting 多重 或 單一 線上的資料庫路由。 如果您將值設定為Single,數據聯機會路由傳送至叢集中的單一資料庫,如 databaseName 設定中所指定。 如果您將值設定為 Multi,您可以使用資料庫擷取屬性覆寫預設目標資料庫。 如需詳細資訊,請參閱 事件路由。
下列範例示範用於新增事件中樞數據連線的 Azure Resource Manager 範本。 您可以使用表單,在 Azure 入口網站 中編輯和部署範本。
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"namespaces_eventhubns_name": {
"type": "string",
"defaultValue": "eventhubns",
"metadata": {
"description": "Specifies the Event Hubs Namespace name."
}
},
"EventHubs_eventhubdemo_name": {
"type": "string",
"defaultValue": "eventhubdemo",
"metadata": {
"description": "Specifies the event hub name."
}
},
"consumergroup_default_name": {
"type": "string",
"defaultValue": "$Default",
"metadata": {
"description": "Specifies the consumer group of the event hub."
}
},
"Clusters_kustocluster_name": {
"type": "string",
"defaultValue": "kustocluster",
"metadata": {
"description": "Specifies the name of the cluster"
}
},
"databases_kustodb_name": {
"type": "string",
"defaultValue": "kustodb",
"metadata": {
"description": "Specifies the name of the database"
}
},
"tables_kustotable_name": {
"type": "string",
"defaultValue": "kustotable",
"metadata": {
"description": "Specifies the name of the table"
}
},
"mapping_kustomapping_name": {
"type": "string",
"defaultValue": "kustomapping",
"metadata": {
"description": "Specifies the name of the mapping rule"
}
},
"dataformat_type": {
"type": "string",
"defaultValue": "csv",
"metadata": {
"description": "Specifies the data format"
}
},
"databaseRouting_type": {
"type": "string",
"defaultValue": "Single",
"metadata": {
"description": "The database routing for the connection. If you set the value to **Single**, the data connection will be routed to a single database in the cluster as specified in the *databaseName* setting. If you set the value to **Multi**, you can override the default target database using the *Database* EventData property."
}
},
"dataconnections_kustodc_name": {
"type": "string",
"defaultValue": "kustodc",
"metadata": {
"description": "Name of the data connection to create"
}
},
"subscriptionId": {
"type": "string",
"defaultValue": "[subscription().subscriptionId]",
"metadata": {
"description": "Specifies the subscriptionId of the event hub"
}
},
"resourceGroup": {
"type": "string",
"defaultValue": "[resourceGroup().name]",
"metadata": {
"description": "Specifies the resourceGroup of the event hub"
}
},
"location": {
"type": "string",
"defaultValue": "[resourceGroup().location]",
"metadata": {
"description": "Location for all resources."
}
}
},
"variables": {
},
"resources": [{
"type": "Microsoft.Kusto/Clusters/Databases/DataConnections",
"apiVersion": "2022-02-01",
"name": "[concat(parameters('Clusters_kustocluster_name'), '/', parameters('databases_kustodb_name'), '/', parameters('dataconnections_kustodc_name'))]",
"location": "[parameters('location')]",
"kind": "EventHub",
"properties": {
"managedIdentityResourceId": "[resourceId('Microsoft.Kusto/clusters', parameters('clusters_kustocluster_name'))]",
"eventHubResourceId": "[resourceId(parameters('subscriptionId'), parameters('resourceGroup'), 'Microsoft.EventHub/namespaces/eventhubs', parameters('namespaces_eventhubns_name'), parameters('EventHubs_eventhubdemo_name'))]",
"consumerGroup": "[parameters('consumergroup_default_name')]",
"tableName": "[parameters('tables_kustotable_name')]",
"mappingRuleName": "[parameters('mapping_kustomapping_name')]",
"dataFormat": "[parameters('dataformat_type')]",
"databaseRouting": "[parameters('databaseRouting_type')]"
}
}
]
}
拿掉事件中樞數據連線
若要從 Azure 入口網站 移除事件中樞連線,請執行下列動作:
- 移至您的叢集。 從左側功能表中,選取 [ 資料庫]。 然後,選取包含目標數據表的資料庫。
- 從左側功能表中,選取 [數據連線]。 然後,選取相關事件中樞數據連線旁的複選框。
- 從頂端功能表欄,選取 [ 刪除]。
從最後一個精靈頁面,選取 [ 管理數據連線] 卡片。 如果您已經結束精靈,請透過 Azure 入口網站 移除數據連線,如上一個索引卷標所述。
若要移除事件中樞連線,請執行下列命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);
若要移除事件中樞連線,請執行下列命令:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)