แชร์ผ่าน


ใช้ Livy API เพื่อส่งและดําเนินการงานเซสชัน

หมายเหตุ

Livy API สําหรับ Fabric วิศวกรข้อมูล ing อยู่ในตัวอย่าง

นําไปใช้กับ:✅ วิศวกรข้อมูลและวิทยาศาสตร์ข้อมูลใน Microsoft Fabric

ส่งชุดงาน Spark โดยใช้ Livy API สําหรับ Fabric วิศวกรข้อมูล

ข้อกำหนดเบื้องต้น

Livy API กําหนดจุดสิ้นสุดแบบรวมสําหรับการดําเนินการ แทนที่พื้นที่ที่สํารองไว้ {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} และ {Fabric_LakehouseID} ด้วยค่าที่เหมาะสมเมื่อคุณทําตามตัวอย่างในบทความนี้

กําหนดค่ารหัส Visual Studio สําหรับเซสชัน Livy API ของคุณ

  1. เลือก การตั้งค่า เลคเฮ้าส์ใน Fabric Lakehouse ของคุณ

    สกรีนช็อตที่แสดงการตั้งค่าของเลคเฮ้าส์

  2. นําทางไปยังส่วน จุด สิ้นสุด Livy

    สกรีนช็อตที่แสดงจุดสิ้นสุดของ Lakehouse Livy และสายอักขระการเชื่อมต่องานเซสชัน

  3. คัดลอกงานเซสชันสายอักขระการเชื่อมต่อ (กล่องสีแดงแรกในรูปภาพ) ไปยังโค้ดของคุณ

  4. ไปที่ ศูนย์การจัดการ Microsoft Entra และคัดลอกทั้ง ID แอปพลิเคชัน (ไคลเอนต์) และ ID ไดเรกทอรี (ผู้เช่า) ไปยังรหัสของคุณ

    สกรีนช็อตแสดงภาพรวมแอป Livy API ในศูนย์การจัดการ Microsoft Entra

สร้างเซสชัน Livy API Spark

  1. .ipynbสร้างสมุดบันทึกใน Visual Studio Code และแทรกโค้ดต่อไปนี้

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. เรียกใช้เซลล์ของสมุดบันทึก ป็อปอัพควรปรากฏในเบราว์เซอร์ของคุณ เพื่อให้คุณสามารถเลือกข้อมูลประจําตัวที่จะลงชื่อเข้าใช้ได้

    สกรีนช็อตที่แสดงหน้าจอเข้าสู่ระบบไปยังแอป Microsoft Entra

  3. หลังจากที่คุณเลือกข้อมูลประจําตัวที่จะลงชื่อเข้าใช้แล้ว คุณจะถูกขอให้อนุมัติสิทธิ์ API การลงทะเบียนแอป Microsoft Entra

    สกรีนช็อตที่แสดงสิทธิ์ API ของแอป Microsoft Entra

  4. ปิดหน้าต่างเบราว์เซอร์หลังจากเสร็จสิ้นการรับรองความถูกต้อง

    สกรีนช็อตที่แสดงการรับรองความถูกต้องเสร็จสมบูรณ์

  5. ใน Visual Studio Code คุณควรเห็นโทเค็น Microsoft Entra ที่ส่งกลับมา

    สกรีนช็อตแสดงโทเค็น Microsoft Entra ที่ส่งกลับหลังจากเรียกใช้เซลล์และเข้าสู่ระบบ

  6. เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดหนึ่งบรรทัดที่พิมพ์เมื่อเซสชัน Livy ถูกสร้างขึ้น

    สกรีนช็อตแสดงผลลัพธ์ของการดําเนินการเซลล์ของสมุดบันทึกแรก

  8. คุณสามารถตรวจสอบว่าเซสชัน Livy ถูกสร้างขึ้นโดยใช้ [ดูงานของคุณในฮับการตรวจสอบ] (#View งานของคุณในฮับการตรวจสอบ)

ส่งคําสั่ง spark.sql โดยใช้เซสชัน Livy API Spark

  1. เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดแบบเพิ่มหน่วยหลายบรรทัดที่พิมพ์เมื่อส่งงานและผลลัพธ์ที่ส่งกลับ

    สกรีนช็อตแสดงผลลัพธ์ของเซลล์สมุดบันทึกแรกที่มีการดําเนินการ Spark.sql

ส่งคําสั่ง spark.sql ที่สองโดยใช้เซสชัน Livy API Spark

  1. เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. เรียกใช้เซลล์ของสมุดบันทึก คุณควรเห็นบรรทัดแบบเพิ่มหน่วยหลายบรรทัดที่พิมพ์เมื่อส่งงานและผลลัพธ์ที่ส่งกลับ

    สกรีนช็อตแสดงผลลัพธ์ของการดําเนินการเซลล์สมุดบันทึกที่สอง

ปิดเซสชัน Livy ด้วยคําสั่งที่สาม

  1. เพิ่มเซลล์สมุดบันทึกอีกเซลล์และแทรกโค้ดนี้

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

ดูงานของคุณในฮับการตรวจสอบ

คุณสามารถเข้าถึงฮับการตรวจสอบเพื่อดูกิจกรรม Apache Spark ต่าง ๆ ได้โดยการเลือก การตรวจสอบ ในลิงก์การนําทางด้านซ้าย

  1. เมื่อเซสชันกําลังดําเนินการหรืออยู่ในสถานะเสร็จสมบูรณ์ คุณสามารถดูสถานะเซสชันโดยนําทางไปยังการตรวจสอบได้

    สกรีนช็อตที่แสดงการส่ง Livy API ก่อนหน้านี้ในฮับการตรวจสอบ

  2. เลือกและเปิดชื่อกิจกรรมล่าสุด

    สกรีนช็อตแสดงกิจกรรม Livy API ล่าสุดในฮับการตรวจสอบ

  3. ในกรณีเซสชัน Livy API นี้ คุณสามารถดูการส่งเซสชันก่อนหน้าของคุณ เรียกใช้รายละเอียด เวอร์ชัน Spark และการกําหนดค่าของคุณได้ สังเกตสถานะหยุดที่ด้านบนขวา

    สกรีนช็อตที่แสดงรายละเอียดกิจกรรม Livy API ล่าสุดในฮับการตรวจสอบ

ในการสรุปกระบวนการทั้งหมด คุณต้องมีไคลเอ็นต์ระยะไกล เช่น Visual Studio Code โทเค็นแอป Microsoft Entra URL จุดสิ้นสุด Livy API การรับรองความถูกต้องกับเลคเฮ้าส์ของคุณ และสุดท้ายคือ Session Livy API