C#을 사용하여 Azure Synapse Data Explorer에 대한 Event Hubs 데이터 연결 만들기(미리 보기)
Azure Synapse Data Explorer는 로그 및 원격 분석 데이터를 위한 빠르고 확장성이 뛰어난 데이터 검색 서비스입니다. Azure Synapse Data Explorer는 Event Hubs, IoT Hubs 및 Blob 컨테이너에 기록된 Blob에서 수집(데이터 로딩)을 제공합니다.
이 문서에서는 C#을 사용하여 Azure Synapse Data Explorer에 대한 Event Hubs 데이터 연결을 만듭니다.
필수 구성 요소
Azure 구독 평가판 Azure 계정을 만듭니다.
Synapse Studio 또는 Azure Portal을 사용하여 Data Explorer 풀 만들기
Data Explorer 데이터베이스를 만듭니다.
Synapse Studio의 왼쪽 창에서 데이터를 선택합니다.
+(새 리소스 추가) >Data Explorer 풀을 선택하고 다음 정보를 사용합니다.
설정 제안 값 설명 풀 이름 contosodataexplorer 사용할 Data Explorer 풀의 이름 이름 TestDatabase 데이터베이스 이름은 클러스터 내에서 고유해야 합니다. 기본 보존 기간 365 데이터를 쿼리에 사용할 수 있도록 보장되는 시간 범위(일)입니다. 시간 범위는 데이터가 수집된 시간부터 측정됩니다. 기본 캐시 기간 31 자주 쿼리되는 데이터를 장기 스토리지가 아닌 SSD 스토리지 또는 RAM에 보관할 수 있는 시간 범위(일)입니다. 만들기를 선택하여 데이터베이스를 만듭니다. 만들기에는 일반적으로 채 1분이 소요되지 않습니다.
참고 항목
Synapse 작업 영역에서 데이터 반출 보호가 사용된 관리형 가상 네트워크를 사용하는 경우 Event Hub에서 데이터 탐색기 풀로의 데이터 수집이 작동하지 않습니다.
- Visual Studio 2019, 무료Visual Studio 2019 Community Edition을 다운로드하여 사용합니다. Visual Studio 설정 중에 Azure 개발을 사용하도록 설정합니다.
테스트 클러스터에 테이블 만들기
StormEvents.csv
파일에 있는 데이터 스키마와 일치하는 StormEvents
라는 테이블을 만듭니다.
팁
다음 코드 조각은 거의 모든 호출에 대한 클라이언트 인스턴스를 만듭니다. 이 작업은 각 조각을 개별적으로 실행할 수 있도록 하기 위해 수행됩니다. 프로덕션에서 클라이언트 인스턴스는 재진입되며 필요한 한 유지되어야 합니다. 여러 데이터베이스로 작업하는 경우에도 URI당 단일 클라이언트 인스턴스로 충분합니다(데이터베이스는 명령 수준에서 지정할 수 있음).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
수집 매핑 정의
테이블을 만들 때 사용되는 열 이름에 들어오는 CSV 데이터를 매핑합니다. 해당 테이블에서 CSV 열 매핑 개체를 프로비전합니다.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
C# NuGet 설치
인증
다음 예제를 실행하려면 리소스에 액세스할 수 있는 Microsoft Entra 애플리케이션 및 서비스 주체가 필요합니다. 무료 Microsoft Entra 애플리케이션을 만들고 구독 수준에서 역할 할당을 추가하려면 Microsoft Entra 애플리케이션 만들기를 참조하세요. 디렉터리(테넌트) ID, 애플리케이션 ID 및 클라이언트 암호도 필요합니다.
Event Hubs 데이터 연결 추가
다음 예에서는 프로그래밍 방식으로 Event Hubs 데이터 연결을 추가하는 방법을 보여 줍니다. Azure Portal을 사용하여 Event Hubs 데이터 연결을 추가하는 방법에 대한 자세한 내용은 Event Hubs에 대한 연결을 참조하세요.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
설정 | 제안 값 | 필드 설명 |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 테넌트 ID 디렉터리 ID라고도 합니다. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 리소스를 만드는 데 사용하는 구독 ID입니다. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 ID입니다. |
clientSecret | xxxxxxxxxxxxxx | 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 암호입니다. |
resourceGroupName | testrg | 클러스터가 포함된 리소스 그룹의 이름입니다. |
clusterName | mykustocluster | 클러스터의 이름입니다. |
databaseName | mykustodatabase | 클러스터에 있는 대상 데이터베이스의 이름입니다. |
dataConnectionName | myeventhubconnect | 원하는 데이터 연결 이름입니다. |
tableName | StormEvents | 대상 데이터베이스의 대상 테이블 이름입니다. |
mappingRuleName | StormEvents_CSV_Mapping | 대상 테이블과 관련된 열 매핑의 이름입니다. |
dataFormat | csv | 메시지의 데이터 형식입니다. |
eventHubResourceId | 리소스 ID | 수집할 데이터를 보유하는 이벤트 허브의 리소스 ID입니다. |
consumerGroup | $Default | 이벤트 허브의 소비자 그룹입니다. |
위치 | 미국 중부 | 데이터 연결 리소스의 위치입니다. |
압축 | Gzip 또는 없음 | 데이터 압축 형식입니다. |
데이터 생성
데이터를 생성하고 이벤트 허브로 보내는 샘플 앱을 참조하세요.
이벤트는 크기 한도까지 하나 이상의 레코드를 포함할 수 있습니다. 다음 샘플에서는 각각 5개의 레코드가 추가된 두 개의 이벤트를 보냅니다.
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
리소스 정리
데이터 연결을 삭제하려면 다음 명령을 사용합니다.
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);