Χρήση του API Livy για την υποβολή και εκτέλεση εργασιών περιόδου λειτουργίας
Σημείωμα
Το Livy API για τη Μηχανική δεδομένων Fabric είναι σε προεπισκόπηση.
Ισχύει για:✅ Διαχείριση δεδομένων και Επιστήμη δεδομένων στο Microsoft Fabric
Υποβάλετε εργασίες δέσμης Spark χρησιμοποιώντας το Livy API για τη Μηχανική δεδομένων Fabric.
Προαπαιτούμενα στοιχεία
Χωρητικότητα Premium fabric ή δοκιμαστικής έκδοσης με lakehouse.
Ένα απομακρυσμένο πρόγραμμα-πελάτη, όπως το Visual Studio Code με Jupyter Notebooks, PySpark και τη Βιβλιοθήκη ελέγχου ταυτότητας της Microsoft (MSAL) για Python.
Για να αποκτήσετε πρόσβαση στο Rest API Fabric, απαιτείται ένα διακριτικό εφαρμογής Microsoft Entra. Καταχωρήστε μια εφαρμογή στην πλατφόρμα ταυτότητας της Microsoft.
Ορισμένα δεδομένα στο lakehouse σας, αυτό το παράδειγμα χρησιμοποιεί NYC Taxi & Limousine Commission green_tripdata_2022_08 ένα αρχείο parquet που φορτώνεται στο lakehouse.
Το API Livy ορίζει ένα ενοποιημένο τελικό σημείο για λειτουργίες. Αντικαταστήστε τα σύμβολα κράτησης θέσης {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}και {Fabric_LakehouseID} με τις κατάλληλες τιμές σας, όταν ακολουθείτε τα παραδείγματα σε αυτό το άρθρο.
Ρύθμιση παραμέτρων του Visual Studio Code για την περίοδο λειτουργίας API Livy
Επιλέξτε Ρυθμίσεις lakehouse στο Fabric Lakehouse σας.
Μεταβείτε στην ενότητα τελικού σημείου Livy.
Αντιγράψτε τη συμβολοσειρά σύνδεσης εργασίας περιόδου λειτουργίας (πρώτο κόκκινο πλαίσιο στην εικόνα) στον κώδικά σας.
Μεταβείτε στο Κέντρο διαχείρισης του Microsoft Entra και αντιγράψτε το Αναγνωριστικό εφαρμογής (πρόγραμμα-πελάτη) και το Αναγνωριστικό καταλόγου (μισθωτής) στον κώδικά σας.
Δημιουργία περιόδου λειτουργίας Livy API Spark
Δημιουργήστε ένα σημειωματάριο
.ipynb
στο Visual Studio Code και εισαγάγετε τον ακόλουθο κώδικα.from msal import PublicClientApplication import requests import time tenant_id = "Entra_TenantID" client_id = "Entra_ClientID" workspace_id = "Fabric_WorkspaceID" lakehouse_id = "Fabric_LakehouseID" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions" headers = {"Authorization": "Bearer " + access_token}
Εκτελέστε το κελί σημειωματάριου και θα πρέπει να εμφανιστεί ένα αναδυόμενο παράθυρο στο πρόγραμμα περιήγησής σας, το οποίο σας επιτρέπει να επιλέξετε την ταυτότητα με την οποία θα εισέλθετε.
Αφού επιλέξετε την ταυτότητα με την οποία θα εισέλθετε, θα σας ζητηθεί επίσης να εγκρίνετε τα δικαιώματα API καταχώρησης εφαρμογής Microsoft Entra.
Κλείστε το παράθυρο του προγράμματος περιήγησης μετά την ολοκλήρωση του ελέγχου ταυτότητας.
Στο Visual Studio Code, θα πρέπει να δείτε να επιστρέφεται το διακριτικό Microsoft Entra.
Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.
create_livy_session = requests.post(livy_base_url, headers=headers, json={}) print('The request to create the Livy session is submitted:' + str(create_livy_session.json())) livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json())
Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε μία γραμμή εκτυπωμένη καθώς δημιουργείται η περίοδος λειτουργίας Livy.
Μπορείτε να επαληθεύσετε ότι η περίοδος λειτουργίας Livy δημιουργείται χρησιμοποιώντας την [Προβολή των εργασιών σας στο Κέντρο παρακολούθησης](#View εργασιών σας στο Κέντρο παρακολούθησης).
Υποβολή πρότασης spark.sql με χρήση της περιόδου λειτουργίας Livy API Spark
Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε πολλές επαυξητικές γραμμές εκτυπωμένες κατά την υποβολή της εργασίας και την επιστροφή των αποτελεσμάτων.
Υποβάλετε μια δεύτερη πρόταση spark.sql χρησιμοποιώντας την περίοδο λειτουργίας Livy API Spark
Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.
# call get session API livy_session_id = create_livy_session.json()['id'] livy_session_url = livy_base_url + "/" + livy_session_id get_session_response = requests.get(livy_session_url, headers=headers) print(get_session_response.json()) while get_session_response.json()["state"] != "idle": time.sleep(5) get_session_response = requests.get(livy_session_url, headers=headers) execute_statement = livy_session_url + "/statements" payload_data = { "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()", "kind": "spark" } execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data) print('the statement code is submitted as: ' + str(execute_statement_response.json())) statement_id = str(execute_statement_response.json()['id']) get_statement = livy_session_url+ "/statements/" + statement_id get_statement_response = requests.get(get_statement, headers=headers) while get_statement_response.json()["state"] != "available": # Sleep for 5 seconds before making the next request time.sleep(5) print('the statement code is submitted and running : ' + str(execute_statement_response.json())) # Make the next request get_statement_response = requests.get(get_statement, headers=headers) rst = get_statement_response.json()['output']['data']['text/plain'] print(rst)
Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε πολλές επαυξητικές γραμμές εκτυπωμένες κατά την υποβολή της εργασίας και την επιστροφή των αποτελεσμάτων.
Κλείσιμο της συνεδρίας Livy με τρίτη δήλωση
Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.
# call get session API with a delete session statement get_session_response = requests.get(livy_session_url, headers=headers) print('Livy statement URL ' + livy_session_url) response = requests.delete(livy_session_url, headers=headers) print (response)
Προβολή των εργασιών σας στο Κέντρο παρακολούθησης
Μπορείτε να αποκτήσετε πρόσβαση στο Κέντρο παρακολούθησης για να προβάλετε διάφορες δραστηριότητες Apache Spark, επιλέγοντας Παρακολούθηση στις συνδέσεις περιήγησης στην αριστερή πλευρά.
Όταν η περίοδος λειτουργίας είναι σε εξέλιξη ή σε κατάσταση ολοκλήρωσης, μπορείτε να προβάλετε την κατάσταση της περιόδου λειτουργίας μεταβαίνοντας στην Παρακολούθηση.
Επιλέξτε και ανοίξτε το πιο πρόσφατο όνομα δραστηριότητας.
Σε αυτή την περίπτωση περιόδου λειτουργίας Livy API, μπορείτε να δείτε τις προηγούμενες υποβολές περιόδων λειτουργίας, λεπτομέρειες εκτέλεσης, εκδόσεις Spark και ρύθμιση παραμέτρων. Παρατηρήστε την κατάσταση διακοπής στην επάνω δεξιά γωνία.
Για να ανακεφαλαιώσετε ολόκληρη τη διαδικασία, χρειάζεστε ένα απομακρυσμένο πρόγραμμα-πελάτη, όπως κώδικα Visual Studio, ένα διακριτικό εφαρμογής Microsoft Entra, μια διεύθυνση URL τελικού σημείου Livy API, έλεγχο ταυτότητας για το Lakehouse σας και, τέλος, ένα API Session Livy.