共用方式為


使用 Python 建立 Azure Synapse 數據總管的事件中樞數據連線 (預覽)

記錄和遙測資料時,Azure Synapse 資料總管是快速且調整彈性高的資料探索服務。 Azure Synapse 資料總管提供事件中樞、IoT 中樞的擷取 (資料載入),及在 blob 容器寫入 blob。

在本文中,您會使用 Python 建立 Azure Synapse 數據總管的事件中樞數據連線。

必要條件

  • Azure 訂用帳戶。 建立免費的 Azure 帳戶

  • 使用 Synapse StudioAzure 入口網站建立資料總管集區

  • 建立資料總管資料庫。

    1. 在 Synapse Studio 的左側窗格上,選取 [資料]

    2. 選取 + (新增資源) >[資料總管集區],並使用下列資訊:

      設定 建議的值 描述
      集區名稱 contosodataexplorer 要使用的資料總管集區名稱
      名稱 TestDatabase 資料庫名稱在叢集內必須是唯一而不重複。
      預設保留期限 365 保證資料持續可供查詢的時間範圍 (天)。 系統會從內嵌資料的時間開始測量時間範圍。
      預設快取期間 31 在 SSD 儲存裝置或 RAM 中 (而非長期儲存裝置),讓受到頻繁查詢的資料維持可用狀態的時間範圍 (天)。
    3. 選取 [建立] 以建立資料庫。 建立時間通常不到一分鐘。

在測試叢集上建立數據表

建立名為 StormEvents 的數據表,其符合檔案中 StormEvents.csv 數據的架構。

提示

下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

定義擷取對應

將傳入 CSV 數據對應至建立資料表時所使用的數據行名稱。 在該數據表上布建 CSV 數據行對應物件

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

安裝 Python 套件

若要安裝適用於 Azure Synapse 資料總管的 Python 套件,請開啟在其路徑中有 Python 的命令提示字元。 執行以下命令:

pip install azure-common
pip install azure-mgmt-kusto

驗證

若要執行下列範例,您需要可存取資源的Microsoft Entra 應用程式和服務主體。 若要建立免費的Microsoft Entra 應用程式,並在訂用帳戶層級新增角色指派,請參閱 建立Microsoft Entra 應用程式。 您也需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。

新增事件中樞數據連線

下列範例示範如何以程序設計方式新增事件中樞數據連線。 請參閱連線至事件中樞,以使用 Azure 入口網站 新增事件中樞數據連線。

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
設定 建議的值 欄位描述
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 您的租用戶識別碼。 也稱為目錄標識碼。
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 您用來建立資源的訂用帳戶標識碼。
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 應用程式可存取租用戶中資源的用戶端標識碼。
client_secret xxxxxxxx 應用程式可存取租用戶中資源的客戶端密碼。
resource_group_name testrg 包含叢集的資源群組名稱。
cluster_name mykustocluster 您叢集的名稱。
database_name mykustodatabase 叢集中目標資料庫的名稱。
data_connection_name myeventhubconnect 數據連線所需的名稱。
table_name StormEvents 目標資料庫中的目標數據表名稱。
mapping_rule_name StormEvents_CSV_Mapping 與目標數據表相關的數據行對應名稱。
data_format csv 訊息的數據格式。
event_hub_resource_id 資源識別碼 保留擷取數據的事件中樞資源標識碼。
consumer_group $Default 事件中樞的取用者群組。
location Central US 數據連線資源的位置。

清除資源

若要刪除資料連線,請使用下列命令:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

下一步