ใช้ Livy API เพื่อส่งและดําเนินการงานเซสชัน
หมายเหตุ
Livy API สําหรับ Fabric วิศวกรข้อมูล ing อยู่ในตัวอย่าง
นําไปใช้กับ:✅ วิศวกรข้อมูลและวิทยาศาสตร์ข้อมูลใน Microsoft Fabric
ส่งชุดงาน Spark โดยใช้ Livy API สําหรับ Fabric วิศวกรข้อมูล
ข้อกำหนดเบื้องต้น
ไคลเอ็นต์ระยะไกล เช่น Visual Studio Code พร้อม Jupyter Notebooks, PySpark และไลบรารีการรับรองความถูกต้องของ Microsoft (MSAL) สําหรับ Python
โทเค็นแอป Microsoft Entra จําเป็นสําหรับการเข้าถึง Fabric Rest API ลงทะเบียนแอปพลิเคชันด้วยแพลตฟอร์มข้อมูลประจำตัวของ Microsoft
ข้อมูลบางอย่างในเลคเฮ้าส์ของคุณ ตัวอย่างนี้ใช้ แท็กซี่ NYC และ Limousine Commission green_tripdata_2022_08 ไฟล์ปาร์เกตที่โหลดไปยังเลคเฮาส์
Livy API กําหนดจุดสิ้นสุดแบบรวมสําหรับการดําเนินการ แทนที่พื้นที่ที่สํารองไว้ {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} และ {Fabric_LakehouseID} ด้วยค่าที่เหมาะสมเมื่อคุณทําตามตัวอย่างในบทความนี้
กําหนดค่ารหัส Visual Studio สําหรับเซสชัน Livy API ของคุณ
เลือก การตั้งค่า เลคเฮ้าส์ใน Fabric Lakehouse ของคุณ
นําทางไปยังส่วน จุด สิ้นสุด Livy
คัดลอกงานเซสชันสายอักขระการเชื่อมต่อ (กล่องสีแดงแรกในรูปภาพ) ไปยังโค้ดของคุณ
ไปที่ ศูนย์การจัดการ Microsoft Entra และคัดลอกทั้ง ID แอปพลิเคชัน (ไคลเอนต์) และ ID ไดเรกทอรี (ผู้เช่า) ไปยังรหัสของคุณ
สร้างเซสชัน Livy API Spark
.ipynb
สร้างสมุดบันทึกใน Visual Studio Code และแทรกโค้ดต่อไปนี้from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
เรียกใช้เซลล์ของสมุดบันทึก ป็อปอัพควรปรากฏในเบราว์เซอร์ของคุณ เพื่อให้คุณสามารถเลือกข้อมูลประจําตัวที่จะลงชื่อเข้าใช้ได้
หลังจากที่คุณเลือกข้อมูลประจําตัวที่จะลงชื่อเข้าใช้แล้ว คุณจะถูกขอให้อนุมัติสิทธิ์ API การลงทะเบียนแอป Microsoft Entra
ปิดหน้าต่างเบราว์เซอร์หลังจากเสร็จสิ้นการรับรองความถูกต้อง
ใน Visual Studio Code คุณควรเห็นโทเค็น Microsoft Entra ที่ส่งกลับมา
เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดหนึ่งบรรทัดที่พิมพ์เมื่อเซสชัน Livy ถูกสร้างขึ้น
คุณสามารถตรวจสอบว่าเซสชัน Livy ถูกสร้างขึ้นโดยใช้ [ดูงานของคุณในฮับการตรวจสอบ] (#View งานของคุณในฮับการตรวจสอบ)
ส่งคําสั่ง spark.sql โดยใช้เซสชัน Livy API Spark
เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดแบบเพิ่มหน่วยหลายบรรทัดที่พิมพ์เมื่อส่งงานและผลลัพธ์ที่ส่งกลับ
ส่งคําสั่ง spark.sql ที่สองโดยใช้เซสชัน Livy API Spark
เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดแบบเพิ่มหน่วยหลายบรรทัดที่พิมพ์เมื่อส่งงานและผลลัพธ์ที่ส่งกลับ
ปิดเซสชัน Livy ด้วยคําสั่งที่สาม
เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
ดูงานของคุณในฮับการตรวจสอบ
คุณสามารถเข้าถึงฮับการตรวจสอบเพื่อดูกิจกรรม Apache Spark ต่าง ๆ ได้โดยการเลือก การตรวจสอบ ในลิงก์การนําทางด้านซ้าย
เมื่อเซสชันกําลังดําเนินการหรืออยู่ในสถานะเสร็จสมบูรณ์ คุณสามารถดูสถานะเซสชันโดยนําทางไปยังการตรวจสอบได้
เลือกและเปิดชื่อกิจกรรมล่าสุด
ในกรณีเซสชัน Livy API นี้ คุณสามารถดูการส่งเซสชันก่อนหน้าของคุณ เรียกใช้รายละเอียด เวอร์ชัน Spark และการกําหนดค่าของคุณได้ สังเกตสถานะหยุดที่ด้านบนขวา
ในการสรุปกระบวนการทั้งหมด คุณต้องมีไคลเอ็นต์ระยะไกล เช่น Visual Studio Code โทเค็นแอป Microsoft Entra URL จุดสิ้นสุด Livy API การรับรองความถูกต้องกับเลคเฮ้าส์ของคุณ และสุดท้ายคือ Session Livy API