연습 - Azure Synapse 파이프라인 내에 Notebook 통합
이 단원에서는 매핑 데이터 흐름에서 로드한 데이터를 분석 및 변환하고 데이터를 데이터 레이크에 저장하는 Azure Synapse Spark Notebook을 만듭니다. Notebook이 데이터 레이크에 쓰는 데이터의 폴더 이름을 정의하는 문자열 매개 변수를 허용하는 매개 변수 셀을 만듭니다.
그런 다음 Synapse 파이프라인에 이 Notebook을 추가하고 고유한 파이프라인 실행 ID를 Notebook 매개 변수에 전달하여 나중에 파이프라인 실행을 Notebook 작업에서 저장한 데이터와 상호 연결할 수 있게 합니다.
마지막으로 Synapse Studio에서 모니터 허브를 사용하여 파이프라인 실행을 모니터링하고 실행 ID를 가져온 다음 데이터 레이크에 저장된 해당 파일을 찾습니다.
Apache Spark 및 Notebook 정보
Apache Spark는 메모리 내 처리를 지원하여 빅 데이터 분석 애플리케이션의 성능을 향상하는 병렬 처리 프레임워크입니다. Azure Synapse Analytics의 Apache Spark는 Microsoft가 구현한 클라우드의 Apache Spark 중 하나입니다.
Synapse Studio의 Apache Spark Notebook은 라이브 코드, 시각화, 내레이션 텍스트를 포함하는 파일을 만들 수 있는 웹 인터페이스입니다. Notebook은 아이디어를 검증하고 빠른 실험을 사용하여 데이터를 통해 인사이트를 확보하기 좋은 도구입니다. Notebook은 데이터 준비, 데이터 시각화, 기계 학습 및 기타 빅 데이터 시나리오에서도 널리 사용됩니다.
Synapse Spark Notebook 만들기
Synapse Analytics에서 사용자 프로필 데이터를 처리, 조인, 가져오는 매핑 데이터 흐름을 만들었다고 가정해보겠습니다. 이제 지난 12개월 중 가장 선호되어 선택 받은 제품과 최다 구매된 제품 둘 다를 기준으로 각 사용자의 상위 5개 제품을 찾으려고 합니다. 그런 다음 전체 상위 5개 제품을 계산하려고 합니다.
이 연습에서는 Synapse Spark Notebook을 만들어 이러한 계산을 수행합니다.
Synapse Analytics Studio(https://web.azuresynapse.net/)를 열고 데이터 허브로 이동합니다.
연결됨 탭(1)을 선택하고 Azure Data Lake Storage Gen2 아래에서 기본 데이터 레이크 스토리지 계정(2)을 확장합니다. wwi-02 컨테이너(3)를 선택하고 top-products 폴더(4)를 엽니다. Parquet 파일(5)을 마우스 오른쪽 단추로 클릭하고 새 Notebook 메뉴 항목(6), DataFrame에 로드(7)를 차례로 선택합니다. 폴더가 표시되지 않으면
Refresh
를 선택합니다.Notebook이 Spark 풀에 연결되어 있는지 확인합니다.
Parquet 파일 이름을
*.parquet
(1)로 바꿔서top-products
폴더의 모든 Parquet 파일을 선택합니다. 예를 들어 경로는abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
와 유사합니다.Notebook 도구 모음에서 모두 실행을 선택하여 Notebook을 실행합니다.
참고
Spark 풀에서 Notebook을 처음 실행하면 Synapse가 새 세션을 생성합니다. 이 작업은 약 3~5분이 걸릴 수 있습니다.
참고
셀만 실행하려면 셀을 마우스로 가리키고 셀 왼쪽에 있는 ‘셀 실행’ 아이콘을 선택하거나 셀을 선택한 후 Ctrl+Enter를 누릅니다.
+ 단추, 코드 셀 항목을 차례로 선택하여 아래에 새 셀을 만듭니다. + 단추는 왼쪽의 Notebook 셀 아래에 있습니다. 또는 Notebook 도구 모음에서 + 셀 메뉴를 펼치고 코드 셀 항목을 선택할 수도 있습니다.
새 셀에서 다음 명령을 실행하여
topPurchases
라는 새 데이터 프레임을 채우고,top_purchases
라는 새 임시 보기를 만들고, 처음 100개 행을 표시합니다.topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
출력은 다음과 비슷해야 합니다.
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
SQL을 사용하여 새 셀에서 다음 명령을 실행하여 새 임시 보기를 만듭니다.
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
참고
이 쿼리는 출력이 없습니다.
이 쿼리는
top_purchases
임시 뷰를 원본으로 사용하고row_number() over
메서드를 적용하여ItemsPurchasedLast12Months
가 가장 큰 각 사용자 레코드의 행 번호를 적용합니다.where
절은 결과를 필터링하므로IsTopProduct
및IsPreferredProduct
가 모두 true로 설정된 제품을 최대 5개만 검색합니다. 따라서 Azure Cosmos DB에 저장된 사용자 프로필에 따라 사용자가 즐겨 사용하는 제품으로‘도’ 식별되는 제품 중 각 사용자의 최다 구매 상위 5개 제품을 알 수 있습니다.새 셀에서 다음 명령을 실행하여 이전 셀에서 만든
top_5_products
임시 보기의 결과를 저장하는 새 DataFrame을 만들고 표시합니다.top5Products = sqlContext.table("top_5_products") top5Products.show(100)
다음과 유사하게 사용자별 상위 5개 선호 제품을 표시하는 출력이 표시됩니다.
고객이 선호하고 가장 많이 구매한 제품을 기준으로 전체 상위 5개 제품을 계산합니다. 이렇게 하려면 새 셀에서 다음 명령을 실행합니다.
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
이 셀에서는 제품 ID별로 상위 5개 선호 제품을 그룹화하고, 지난 12개월 동안 구매한 총 항목을 합산하고, 해당 값을 내림차순으로 정렬하고, 상위 5개 결과를 반환했습니다. 다음과 유사하게 출력될 것입니다.
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
매개 변수 셀 만들기
Azure Synapse 파이프라인은 매개 변수 셀을 찾고 이 셀을 실행할 때 전달되는 매개 변수의 기본값으로 처리합니다. 실행 엔진은 기본값을 덮어쓰기 위해 입력 매개 변수를 사용하여 매개 변수 셀 아래에 새 셀을 추가합니다. 매개 변수 셀이 지정되지 않은 경우 삽입된 셀이 Notebook의 맨 위에 삽입됩니다.
이 Notebook을 파이프라인에서 실행할 것입니다. Parquet 파일의 이름을 지정하는 데 사용할
runId
변수 값을 설정하는 매개 변수를 전달하려고 합니다. 새 셀에서 다음 명령을 실행합니다.import uuid # Generate random GUID runId = uuid.uuid4()
Spark와 함께 제공되는
uuid
라이브러리를 사용하여 임의 GUID를 생성합니다. 파이프라인에서 전달하는 매개 변수로runId
변수를 재정의하려고 합니다. 이렇게 하려면 이 변수를 매개 변수 셀로 토글해야 합니다.셀의 오른쪽 맨 위 모서리에서 작업 줄임표(...)(1)를 선택하고 Toggle parameter cell(매개 변수 셀 설정/해제)(2)를 선택합니다.
이 옵션을 토글하면 셀에 매개 변수 태그가 표시됩니다.
다음 코드를 새 셀에 붙여넣어
runId
변수를 기본 데이터 레이크 계정의/top5-products/
경로에 있는 Parquet 파일 이름으로 사용합니다. 경로의YOUR_DATALAKE_NAME
을 기본 데이터 레이크 계정의 이름으로 바꿉니다. 이 이름을 찾으려면 페이지 맨 위에 있는 셀 1(1)까지 위로 스크롤합니다. 경로에서 데이터 레이크 스토리지 계정(2)을 복사합니다. 이 값을 새 셀 안의 경로 (3)에서YOUR_DATALAKE_NAME
을 대체하는 값으로 붙여넣고 셀에서 명령을 실행합니다.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
파일이 데이터 레이크에 기록되었는지 확인합니다. 데이터 허브로 이동하여 연결됨 탭(1)을 선택합니다. 기본 데이터 레이크 스토리지 계정을 확장한 다음 wwi-02 컨테이너(2)를 선택합니다. top5-products 폴더(3)로 이동합니다. GUID를 파일 이름으로 사용하는 디렉터리(4)에 Parquet 파일의 폴더가 표시됩니다.
이 디렉터리가 이전에 없었으므로 Notebook 셀의 데이터 프레임에서 Parquet 쓰기 메서드가 해당 디렉터리를 만들었습니다.
Synapse 파이프라인에 Notebook 추가
연습 시작 부분에서 설명한 매핑 데이터 흐름을 다시 참조하여 이 Notebook을 데이터 흐름 실행 후 오케스트레이션 프로세스의 일부로 실행한다고 가정해보겠습니다. 이렇게 하려면 이 Notebook을 파이프라인에 새 Notebook 작업으로 추가합니다.
Notebook으로 돌아갑니다. Notebook의 오른쪽 위 모서리에서 속성(1)을 선택한 다음 이름(2)에
Calculate Top 5 Products
를 입력합니다.Notebook의 오른쪽 위 모서리에 있는 파이프라인에 추가(1)를 선택한 다음 기존 파이프라인(2)을 선택합니다.
Write User Profile Data to ASA 파이프라인(1)을 선택하고 추가*(2)를 선택합니다.
Synapse Studio에서 Notebook 작업을 파이프라인에 추가합니다. Notebook 작업이 데이터 흐름 작업 오른쪽에 오게 다시 정렬합니다. 데이터 흐름 작업을 선택하고 성공 작업 파이프라인 연결 녹색 상자를 Notebook 작업으로 끕니다.
성공 작업 화살표는 데이터 흐름 작업이 성공적으로 실행된 후 Notebook 작업을 실행하도록 파이프라인에 지시합니다.
Notebook 작업(1), 설정 탭(2)을 차례로 선택하고 기본 매개 변수(3)를 확장한 다음 + 새로 만들기(4)를 선택합니다. 이름 필드(5)에
runId
를 입력합니다. 형식(6)에서 문자열을 선택합니다. 값에서 동적 콘텐츠 추가(7)를 선택합니다.시스템 변수(1)에서 파이프라인 실행 ID를 선택합니다. 그러면 동적 콘텐츠 상자(2)에
@pipeline().RunId
가 추가됩니다. 마침(3)을 선택하여 대화 상자를 닫습니다.파이프라인 실행 ID 값은 각 파이프라인 실행에 할당된 고유 GUID입니다. 여기서는 이 값을
runId
Notebook 매개 변수로 전달하여 Parquet 파일의 이름으로 사용합니다. 그러면 파이프라인 실행 기록에서 각 파이프라인 실행에 대해 생성된 특정 Parquet 파일을 찾을 수 있습니다.모두 게시, 게시를 차례로 선택하여 변경 내용을 저장합니다.
게시가 완료된 후 트리거 추가(1), 지금 트리거(2)를 차례로 선택하여 업데이트된 파이프라인을 실행합니다.
확인을 선택하여 트리거를 실행합니다.
파이프라인 실행 모니터링
모니터 허브를 사용하여 SQL, Apache Spark, Pipelines에 대한 현재 및 이전 작업을 모니터링할 수 있습니다.
모니터 허브로 이동합니다.
파이프라인 실행(1)을 선택하고 파이프라인 실행이 완료(2)될 때까지 기다립니다. 뷰를 새로 고쳐야 할 수도 있습니다(3).
파이프라인의 작업 실행을 보려는 파이프라인의 이름을 선택합니다.
데이터 흐름 작업과 새 Notebook 작업(1)을 확인합니다. 파이프라인 실행 ID 값(2)을 적어 둡니다. 이 이름을 Notebook에서 생성되는 Parquet 파일 이름과 비교할 것입니다. 세부 정보를 보려면 Calculate Top 5 Products Notebook 이름을 선택합니다(3).
여기서 Notebook 실행 세부 정보를 확인합니다. 재생(1)을 선택하여 작업(2)의 진행을 재생할 수 있습니다. 아래쪽에서 다른 필터 옵션이 있는 진단 및 로그를 볼 수 있습니다(3). 오른쪽에서는 기간, Livy ID, Spark 풀 세부 정보와 같은 실행 세부 정보를 볼 수 있습니다. 작업에서 세부 정보 보기 링크를 선택하면 해당 세부 정보(5)를 볼 수 있습니다.
Spark 애플리케이션 UI가 새 탭에서 열리고, 여기서 스테이지 세부 정보를 볼 수 있습니다. 스테이지 세부 정보를 보려면 DAG 시각화를 확장합니다.
데이터 허브로 다시 돌아갑니다.
연결됨 탭(1)을 선택한 다음 기본 데이터 레이크 계정에서 wwi-02 컨테이너(2)를 선택하고 top5-products 폴더(3)로 이동하고, 이름이 파이프라인 실행 ID와 일치하는 Parquet 파일의 폴더가 있는지 확인합니다.
여기에서 볼 수 있듯이 이름이 앞에서 메모한 파이프라인 실행 ID와 일치하는 파일이 있습니다.
해당 값이 일치하는 이유는 파이프라인 실행 ID를 Notebook 작업의
runId
매개 변수에 전달했기 때문입니다.