使用 Python 建立 Azure Synapse 數據總管的事件中樞數據連線 (預覽)
記錄和遙測資料時,Azure Synapse 資料總管是快速且調整彈性高的資料探索服務。 Azure Synapse 資料總管提供事件中樞、IoT 中樞的擷取 (資料載入),及在 blob 容器寫入 blob。
在本文中,您會使用 Python 建立 Azure Synapse 數據總管的事件中樞數據連線。
必要條件
Azure 訂用帳戶。 建立免費的 Azure 帳戶。
使用 Synapse Studio 或 Azure 入口網站建立資料總管集區
建立資料總管資料庫。
在 Synapse Studio 的左側窗格上,選取 [資料]。
選取 + (新增資源) >[資料總管集區],並使用下列資訊:
設定 建議的值 描述 集區名稱 contosodataexplorer 要使用的資料總管集區名稱 名稱 TestDatabase 資料庫名稱在叢集內必須是唯一而不重複。 預設保留期限 365 保證資料持續可供查詢的時間範圍 (天)。 系統會從內嵌資料的時間開始測量時間範圍。 預設快取期間 31 在 SSD 儲存裝置或 RAM 中 (而非長期儲存裝置),讓受到頻繁查詢的資料維持可用狀態的時間範圍 (天)。 選取 [建立] 以建立資料庫。 建立時間通常不到一分鐘。
- 具有擷取數據的事件中樞。
- Python 3.4+。
在測試叢集上建立數據表
建立名為 StormEvents
的數據表,其符合檔案中 StormEvents.csv
數據的架構。
提示
下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
定義擷取對應
將傳入 CSV 數據對應至建立資料表時所使用的數據行名稱。 在該數據表上布建 CSV 數據行對應物件 。
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
安裝 Python 套件
若要安裝適用於 Azure Synapse 資料總管的 Python 套件,請開啟在其路徑中有 Python 的命令提示字元。 執行以下命令:
pip install azure-common
pip install azure-mgmt-kusto
驗證
若要執行下列範例,您需要可存取資源的Microsoft Entra 應用程式和服務主體。 若要建立免費的Microsoft Entra 應用程式,並在訂用帳戶層級新增角色指派,請參閱 建立Microsoft Entra 應用程式。 您也需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。
新增事件中樞數據連線
下列範例示範如何以程序設計方式新增事件中樞數據連線。 請參閱連線至事件中樞,以使用 Azure 入口網站 新增事件中樞數據連線。
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
設定 | 建議的值 | 欄位描述 |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 您的租用戶識別碼。 也稱為目錄標識碼。 |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 您用來建立資源的訂用帳戶標識碼。 |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 應用程式可存取租用戶中資源的用戶端標識碼。 |
client_secret | xxxxxxxx | 應用程式可存取租用戶中資源的客戶端密碼。 |
resource_group_name | testrg | 包含叢集的資源群組名稱。 |
cluster_name | mykustocluster | 您叢集的名稱。 |
database_name | mykustodatabase | 叢集中目標資料庫的名稱。 |
data_connection_name | myeventhubconnect | 數據連線所需的名稱。 |
table_name | StormEvents | 目標資料庫中的目標數據表名稱。 |
mapping_rule_name | StormEvents_CSV_Mapping | 與目標數據表相關的數據行對應名稱。 |
data_format | csv | 訊息的數據格式。 |
event_hub_resource_id | 資源識別碼 | 保留擷取數據的事件中樞資源標識碼。 |
consumer_group | $Default | 事件中樞的取用者群組。 |
location | Central US | 數據連線資源的位置。 |
清除資源
若要刪除資料連線,請使用下列命令:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)