Python을 사용하여 Azure Synapse Data Explorer용 Event Hub 데이터 연결 만들기(미리 보기)
Azure Synapse Data Explorer는 로그 및 원격 분석 데이터를 위한 빠르고 확장성이 뛰어난 데이터 검색 서비스입니다. Azure Synapse Data Explorer는 Event Hubs, IoT Hubs 및 Blob 컨테이너에 기록된 Blob에서 수집(데이터 로딩)을 제공합니다.
이 문서에서는 Python을 사용하여 Azure Synapse Data Explorer용 Event Hub 데이터 연결을 만듭니다.
필수 구성 요소
Azure 구독 평가판 Azure 계정을 만듭니다.
Synapse Studio 또는 Azure Portal을 사용하여 Data Explorer 풀 만들기
Data Explorer 데이터베이스를 만듭니다.
Synapse Studio의 왼쪽 창에서 데이터를 선택합니다.
+(새 리소스 추가) >Data Explorer 풀을 선택하고 다음 정보를 사용합니다.
설정 제안 값 설명 풀 이름 contosodataexplorer 사용할 Data Explorer 풀의 이름 이름 TestDatabase 데이터베이스 이름은 클러스터 내에서 고유해야 합니다. 기본 보존 기간 365 데이터를 쿼리에 사용할 수 있도록 보장되는 시간 범위(일)입니다. 시간 범위는 데이터가 수집된 시간부터 측정됩니다. 기본 캐시 기간 31 자주 쿼리되는 데이터를 장기 스토리지가 아닌 SSD 스토리지 또는 RAM에 보관할 수 있는 시간 범위(일)입니다. 만들기를 선택하여 데이터베이스를 만듭니다. 만들기에는 일반적으로 채 1분이 소요되지 않습니다.
테스트 클러스터에 테이블 만들기
StormEvents.csv
파일에 있는 데이터 스키마와 일치하는 StormEvents
라는 테이블을 만듭니다.
팁
다음 코드 조각은 거의 모든 호출에 대한 클라이언트 인스턴스를 만듭니다. 이 작업은 각 조각을 개별적으로 실행할 수 있도록 하기 위해 수행됩니다. 프로덕션에서 클라이언트 인스턴스는 재진입되며 필요한 한 유지되어야 합니다. 여러 데이터베이스로 작업하는 경우에도 URI당 단일 클라이언트 인스턴스로 충분합니다(데이터베이스는 명령 수준에서 지정할 수 있음).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
수집 매핑 정의
테이블을 만들 때 사용되는 열 이름에 들어오는 CSV 데이터를 매핑합니다. 해당 테이블에서 CSV 열 매핑 개체를 프로비전합니다.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Python 패키지 설치
Azure Synapse Data Explorer용 Python 패키지를 설치하려면 경로에 Python이 있는 명령 프롬프트를 엽니다. 다음 명령 실행:
pip install azure-common
pip install azure-mgmt-kusto
인증
다음 예제를 실행하려면 리소스에 액세스할 수 있는 Microsoft Entra 애플리케이션 및 서비스 주체가 필요합니다. 무료 Microsoft Entra 애플리케이션을 만들고 구독 수준에서 역할 할당을 추가하려면 Microsoft Entra 애플리케이션 만들기를 참조하세요. 디렉터리(테넌트) ID, 애플리케이션 ID 및 클라이언트 암호도 필요합니다.
Event Hub 데이터 연결 추가
다음 예에서는 프로그래밍 방식으로 Event Hub 데이터 연결을 추가하는 방법을 보여 줍니다. Azure Portal을 사용하여 Event Hub 데이터 연결을 추가하는 방법은 이벤트 허브에 연결을 참조하세요.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
설정 | 제안 값 | 필드 설명 |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 테넌트 ID 디렉터리 ID라고도 합니다. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 리소스를 만드는 데 사용하는 구독 ID입니다. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 ID입니다. |
client_secret | xxxxxxxxxxxxxx | 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 암호입니다. |
resource_group_name | testrg | 클러스터가 포함된 리소스 그룹의 이름입니다. |
cluster_name | mykustocluster | 클러스터의 이름입니다. |
database_name | mykustodatabase | 클러스터에 있는 대상 데이터베이스의 이름입니다. |
data_connection_name | myeventhubconnect | 원하는 데이터 연결 이름입니다. |
table_name | StormEvents | 대상 데이터베이스의 대상 테이블 이름입니다. |
mapping_rule_name | StormEvents_CSV_Mapping | 대상 테이블과 관련된 열 매핑의 이름입니다. |
data_format | csv | 메시지의 데이터 형식입니다. |
event_hub_resource_id | 리소스 ID | 수집할 데이터를 보유하는 Event Hub의 리소스 ID입니다. |
consumer_group | $Default | Event Hub의 소비자 그룹입니다. |
위치 | 미국 중부 | 데이터 연결 리소스의 위치입니다. |
리소스 정리
데이터 연결을 삭제하려면 다음 명령을 사용합니다.
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)