자습서: 데이터 레이크 캡처 패턴을 구현하여 Databricks Delta 테이블 업데이트
이 자습서에서는 계층 구조 네임스페이스가 있는 스토리지 계정에서 이벤트를 처리하는 방법을 보여 줍니다.
판매 주문을 설명하는 csv(쉼표로 구분된 값) 파일을 업로드하여 사용자가 Databricks 델타 테이블을 채울 수 있도록 하는 작은 솔루션을 구축합니다. 이 솔루션은 Azure Databricks에서 Event Grid 구독, Azure Function 및 작업을 함께 연결하여 구축됩니다.
이 자습서에서는 다음을 수행합니다.
- Azure Function을 호출하는 Event Grid 구독을 만듭니다.
- 이벤트에서 알림을 받은 후 Azure Databricks에서 작업을 실행하는 Azure Function을 만듭니다.
- 고객 주문을 삽입하는 Databricks 작업을 스토리지 계정에 있는 Databricks 델타 테이블에 만듭니다.
Azure Databricks 작업 영역에서 시작하여 이 솔루션을 역순으로 구축합니다.
필수 조건
계층 구조 네임스페이스가 있는 스토리지 계정(Azure Data Lake Storage)을 만듭니다. 이 자습서에서는
contosoorders
라는 스토리지 계정을 사용합니다.사용자 계정에 Storage Blob 데이터 기여자 역할이 할당되었는지 확인합니다.
서비스 주체를 만들고 클라이언트 암호를 만든 다음, 서비스 주체에게 스토리지 계정에 대한 액세스 권한을 부여합니다.
자습서: Azure Data Lake Storage에 연결(1~3단계)을 참조하세요. 이러한 단계를 완료한 후 테넌트 ID, 앱 ID 및 클라이언트 암호 값을 텍스트 파일에 붙여넣습니다. 곧 이 값들이 필요합니다.
Azure 구독이 없는 경우 시작하기 전에 체험 계정을 만듭니다.
판매 주문 생성
먼저 판매 주문을 설명하는 csv 파일을 만든 다음, 해당 파일을 스토리지 계정에 업로드합니다. 나중에 이 파일의 데이터를 사용하여 Databricks 델타 테이블의 첫 번째 행을 채웁니다.
Azure Portal에서 새 스토리지 계정으로 이동합니다.
Storage browser->Blob 컨테이너->컨테이너 추가를 선택하고 데이터라는 새 컨테이너를 만듭니다.
data 컨테이너에서 input이라는 디렉터리를 만듭니다.
다음 텍스트를 텍스트 편집기에 붙여넣습니다.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
이 파일을 로컬 컴퓨터에 저장하고, data.csv라는 이름을 지정합니다.
스토리지 브라우저에서 이 파일을 input 폴더에 업로드합니다.
Azure Databricks에 작업 만들기
이 섹션에서 수행하는 작업은 다음과 같습니다.
- Azure Databricks 작업 영역 만들기
- Notebook을 만듭니다.
- Databricks 델타 테이블을 만들고 채웁니다.
- 행을 Databricks 델타 테이블에 삽입하는 코드를 추가합니다.
- 작업을 만듭니다.
Azure Databricks 작업 영역 만들기
이 섹션에서는 Azure Portal을 사용하여 Azure Databricks 작업 영역을 만듭니다.
Azure Databricks 작업 영역 만들기 작업 영역의 이름을
contoso-orders
로 지정합니다. Azure Databricks 작업 영역 만들기를 참조하세요.클러스터를 만듭니다. 클러스터 이름을
customer-order-cluster
로 지정합니다. 클러스터 만들기를 참조하세요.Notebook을 만듭니다. Notebook 이름을
configure-customer-table
로 지정하고 Python을 Notebook의 기본 언어로 선택합니다. Notebook 만들기를 참조하세요.
Databricks 델타 테이블 만들기 및 채우기
만든 Notebook에서 다음 코드 블록을 복사하여 첫 번째 셀에 붙여넣습니다. 하지만 이 코드는 아직 실행하지 마세요.
appId
,password
,tenant
자리 표시자 값을 이 자습서의 필수 조건을 수행하는 동안 수집한 값으로 바꿉니다.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
이 코드는 source_file이라는 위젯을 만듭니다. 나중에 이 코드를 호출하고 파일 경로를 해당 위젯에 전달하는 Azure Function을 만듭니다. 또한 이 코드는 스토리지 계정을 사용하여 서비스 주체를 인증하고, 다른 셀에서 사용할 몇 가지 변수를 만듭니다.
참고 항목
프로덕션 설정에서 Azure Databricks에서 인증 키를 저장하는 것이 좋습니다. 그런 다음, 인증 키 대신 코드 블록에 조회 키를 추가합니다.
예를 들어spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
코드 줄을 사용하는 대신spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
코드 줄을 사용합니다.
이 자습서 를 완료한 후 Azure Databricks 웹 사이트의 Azure Data Lake Storage 문서를 참조하여 이 방법의 예를 확인하세요.이 블록에서 코드를 실행하려면 SHIFT + ENTER 키를 누릅니다.
다음 코드 블록을 복사하여 다른 셀에 붙여넣은 다음, Shift+Enter 키를 눌러 이 블록에서 해당 코드를 실행합니다.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
이 코드는 Databricks 델타 테이블을 스토리지 계정에 만든 다음, 이전에 업로드한 csv 파일에서 몇 가지 초기 데이터를 로드합니다.
이 코드 블록이 성공적으로 실행되면 Notebook에서 이 코드 블록을 제거합니다.
행을 Databricks 델타 테이블에 삽입하는 코드 추가
다음 코드 블록을 복사하여 다른 셀에 붙여넣습니다. 하지만 이 코드는 실행하지 마세요.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
이 코드는 csv 파일의 데이터를 사용하여 데이터를 임시 테이블 보기에 삽입합니다. 해당 csv 파일의 경로는 이전 단계에서 만든 입력 위젯에서 가져옵니다.
다음 코드 블록을 복사하여 다른 셀에 붙여넣습니다. 이 코드는 임시 테이블 보기의 내용을 Databricks 델타 테이블과 병합합니다.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
작업 만들기
이전에 만든 Notebook을 실행하는 작업을 만듭니다. 나중에 이벤트가 발생할 때 이 작업을 실행하는 Azure Function을 만듭니다.
새->작업을 선택합니다.
작업에 이름을 지정하고 만든 Notebook 및 클러스터를 선택합니다. 만들기를 선택하여 작업을 만듭니다.
Azure Function 만들기
작업을 실행하는 Azure Function을 만듭니다.
Azure Databricks 작업 영역의 위쪽 표시줄에서 Azure Databricks 사용자 이름을 클릭한 다음 드롭다운에서 사용자 설정을 선택합니다.
액세스 토큰 탭에서 새 토큰 생성을 선택합니다.
표시되는 토큰을 복사한 다음 완료를 클릭합니다.
Databricks 작업 영역의 위쪽 모서리에서 사용자 아이콘을 선택한 다음, 사용자 설정을 선택합니다.
새 토큰 생성 단추, 생성 단추를 차례로 선택합니다.
토큰은 안전한 장소에 복사해야 합니다. Azure Function에는 작업을 실행할 수 있도록 Databricks를 인증하기 위해 이 토큰이 필요합니다.
Azure Portal 메뉴 또는 홈 페이지에서 리소스 만들기를 선택합니다.
새로 만들기 페이지에서 컴퓨팅>함수 앱을 선택합니다.
함수 앱 만들기 페이지의 기본 탭에서 리소스 그룹을 선택한 다음, 다음 설정을 변경하거나 확인합니다.
설정 값 함수 앱 이름 contosoorder 런타임 스택 .NET 게시 코드 운영 체제 Windows 플랜 유형 사용량(서버리스) 검토 및 생성를 선택한 후 생성를 선택합니다.
배포가 완료되면 리소스로 이동을 선택하여 함수 앱의 개요 페이지를 엽니다.
설정 그룹에서 구성을 선택합니다.
애플리케이션 설정 페이지에서 새 애플리케이션 설정 단추를 선택하여 각 설정을 추가합니다.
다음 설정을 추가합니다.
설정 이름 값 DBX_INSTANCE Databricks 작업 영역의 지역입니다. 예: westus2.azuredatabricks.net
DBX_PAT 이전에 생성한 개인 액세스 토큰입니다. DBX_JOB_ID 실행 중인 작업의 식별자입니다. 저장을 선택하여 이러한 설정을 커밋합니다.
함수 그룹에서 함수를 선택한 다음 만들기를 선택합니다.
Azure Event Grid 트리거를 선택합니다.
Microsoft.Azure.WebJobs.Extensions.EventGrid확장을 설치하라는 메시지가 표시되면 이 확장을 설치합니다. 설치해야 하는 경우 Azure Event Grid 트리거를 다시 선택하여 함수를 만들어야 합니다.
새 함수 창이 표시됩니다.
새 함수 창에서 함수 이름을 UpsertOrder로 지정한 다음, 만들기 단추를 선택합니다.
코드 파일의 내용을 다음 코드로 바꾼 다음, 저장 단추를 선택합니다.
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
이 코드는 발생된 스토리지 이벤트에 대한 정보를 구문 분석한 다음, 이벤트를 트리거한 파일의 URL을 사용하여 요청 메시지를 만듭니다. 메시지의 일부인 이 함수는 이전에 만든 source_file 위젯에 값을 전달합니다. 함수 코드는 Databricks 작업으로 메시지를 보내고 이전에 얻은 토큰을 인증으로 사용합니다.
Event Grid 구독 만들기
이 섹션에서는 파일이 스토리지 계정에 업로드되면 Azure Function을 호출하는 Event Grid 구독을 만듭니다.
통합을 선택한 다음 통합 페이지에서 Event Grid 트리거를 선택합니다.
트리거 편집 창에서 이벤트 이름을
eventGridEvent
로 지정한 다음, 이벤트 구독 만들기를 선택합니다.참고 항목
이름
eventGridEvent
은 Azure 함수에 전달되는 매개 변수와 일치합니다.이벤트 구독 만들기 페이지의 기본 탭에서 다음 설정을 변경하거나 확인합니다.
설정 값 속성 contoso-order-event-subscription 토픽 유형 스토리지 계정 원본 리소스 contosoorders 시스템 토픽 이름 <create any name>
이벤트 유형 필터 Blob 만들어짐, Blob 삭제됨 생성 단추를 선택합니다.
Event Grid 구독 테스트
customer-order.csv
라는 파일을 만들고, 다음 정보를 해당 파일에 붙여넣고, 로컬 컴퓨터에 이를 저장합니다.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
Storage Explorer에서 이 파일을 스토리지 계정의 input 폴더에 업로드합니다.
파일이 업로드되면 Microsoft.Storage.BlobCreated 이벤트가 발생합니다. Event Grid는 모든 구독자에게 해당 이벤트를 알립니다. 여기서는 Azure Function이 유일한 구독자입니다. Azure Function은 이벤트 매개 변수를 구문 분석하여 발생한 이벤트를 확인합니다. 그런 다음, 파일의 URL을 Databricks 작업에 전달합니다. Databricks 작업에서 파일을 읽고, 스토리지 계정에 있는 Databricks 델타 테이블에 행을 추가합니다.
작업이 성공했는지 검사하려면 작업에 대한 실행을 확인합니다. 완료 상태가 표시됩니다. 작업에 대한 실행을 보는 방법에 대한 자세한 내용은 작업에 대한 보기 실행을 참조하세요
새 통합 문서 셀의 셀에서 이 쿼리를 실행하여 업데이트된 델타 테이블을 확인합니다.
%sql select * from customer_data
반환된 테이블에서 최신 레코드를 보여 줍니다.
이 레코드를 업데이트하려면
customer-order-update.csv
라는 파일을 만들고, 다음 정보를 해당 파일에 붙여넣은 후, 로컬 컴퓨터에 이를 저장합니다.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
이 csv 파일은 주문 수량이
228
에서22
로 변경되는 것을 제외하고 이전 파일과 거의 동일합니다.Storage Explorer에서 이 파일을 스토리지 계정의 input 폴더에 업로드합니다.
select
쿼리를 다시 실행하여 업데이트된 델타 테이블을 확인합니다.%sql select * from customer_data
반환된 테이블에 최신 레코드가 표시됩니다.
리소스 정리
더 이상 필요하지 않으면 리소스 그룹 및 모든 관련 리소스를 삭제합니다. 이렇게 하려면 스토리지 계정에 대한 리소스 그룹을 선택하고 삭제를 선택합니다.