다음을 통해 공유


Azure Data Explorer Python 라이브러리를 사용하여 데이터 수집

이 문서에서는 Azure Data Explorer Python 라이브러리를 사용하여 데이터를 수집합니다. Azure 데이터 탐색기는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장성이 우수한 데이터 탐색 서비스입니다. Azure 데이터 탐색기는 2개의 Python용 클라이언트 라이브러리, 수집 라이브러리데이터 라이브러리를 제공합니다. 이러한 라이브러리를 사용하면 데이터를 클러스터로 수집하거나 로드하고 코드에서 데이터를 쿼리할 수 있습니다.

먼저 클러스터에 테이블과 데이터 매핑을 만듭니다. 그런 다음, 클러스터 큐에 수집을 넣고 결과의 유효성을 검사합니다.

사전 요구 사항

데이터 및 수집 라이브러리 설치

azure-kusto-dataazure-kusto-ingest를 설치합니다.

pip install azure-kusto-data
pip install azure-kusto-ingest

import 문 및 상수 추가

azure-kusto-data에서 클래스를 가져옵니다.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

애플리케이션을 인증하기 위해 Azure Data Explorer Microsoft Entra 테넌트 ID를 사용합니다. 테넌트 ID를 찾으려면 다음 URL을 사용하여 YourDomain을 사용자 도메인으로 바꿉니다.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

예를 들어 도메인이 contoso.com인 경우 URL은 https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/입니다. 결과를 보려면 이 URL을 클릭합니다. 첫 번째 줄은 다음과 같습니다.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

이 경우의 테넌트 ID는 6babcaad-604b-40ac-a9d7-9fd97c0b779f입니다. 이 코드를 실행하기 전에 AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI 및 KUSTO_DATABASE의 값을 설정합니다.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

이제 연결 문자열을 구성합니다. 다음 예제에서는 디바이스 인증을 사용하여 클러스터에 액세스합니다. 관리 ID 인증, Microsoft Entra 애플리케이션 인증서, Microsoft Entra 애플리케이션 키Microsoft Entra 사용자 및 암호를 사용할 수도 있습니다.

이후 단계에서 대상 테이블 및 매핑을 만듭니다.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

소스 파일 정보 설정

추가 클래스를 가져오고 데이터 원본 파일에 대한 상수를 설정합니다. 이 예제에서는 Azure Blob Storage에 호스트된 예제 파일을 사용합니다. StormEvents 샘플 데이터 세트에는 국립 환경 정보 센터의 날씨 관련 데이터가 포함되어 있습니다.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

클러스터에 테이블 만들기

StormEvents.csv 파일에 있는 데이터 스키마와 일치하는 테이블을 만듭니다. 이 코드가 실행되면 다음 메시지와 같은 메시지가 반환됩니다. 로그인하려면 웹 브라우저를 사용하여 https://microsoft.com/devicelogin 페이지를 열고 코드 F3W4VWZDM을 입력하여 인증하세요. 단계에 따라 로그인한 후 돌아가서 다음 코드 블록을 실행합니다. 연결을 만드는 후속 코드 블록을 위해 다시 로그인해야 합니다.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

수집 매핑 정의

테이블을 만들 때 사용되는 데이터 형식과 열 이름에 들어오는 CSV 데이터를 매핑합니다. 원본 데이터 필드를 대상 테이블 열에 매핑합니다.

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

수집을 위해 메시지를 큐에 넣음

Blob Storage에서 데이터를 끌어온 후 Azure 데이터 탐색기에 수집하기 위해 메시지를 큐에 넣습니다.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

테이블에 수집된 데이터 쿼리

큐에 넣은 수집이 예약되고 데이터가 Azure Data Explorer에 로드될 때까지 5~10분 정도 기다립니다. 그런 후, 다음 코드를 실행하여 StormEvents 테이블의 레코드 수를 가져옵니다.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

쿼리 문제 해결 실행

https://dataexplorer.azure.com에 로그인하고 클러스터에 연결합니다. 데이터베이스에서 다음 명령을 실행하여 지난 4시간 동안 수집 실패가 있었는지 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

다음 명령을 실행하여 지난 4시간 동안 진행된 모든 수집 작업의 상태를 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

리소스 정리

다른 문서를 따르려는 경우 만든 리소스를 유지합니다. 그렇지 않으면 데이터베이스에서 다음 명령을 실행하여 StormEvents 테이블을 정리합니다.

.drop table StormEvents

다음 단계