Χρήση του API Livy για την υποβολή και εκτέλεση εργασιών δέσμης livy
Σημείωμα
Το Livy API για τη Μηχανική δεδομένων Fabric είναι σε προεπισκόπηση.
Ισχύει για:✅ Διαχείριση δεδομένων και Επιστήμη δεδομένων στο Microsoft Fabric
Υποβάλετε εργασίες δέσμης Spark χρησιμοποιώντας το Livy API για τη Μηχανική δεδομένων Fabric.
Προαπαιτούμενα στοιχεία
Χωρητικότητα Premium fabric ή δοκιμαστικής έκδοσης με lakehouse.
Ένα απομακρυσμένο πρόγραμμα-πελάτη, όπως το Visual Studio Code με Jupyter Notebooks, PySpark και τη Βιβλιοθήκη ελέγχου ταυτότητας της Microsoft (MSAL) για Python.
Για να αποκτήσετε πρόσβαση στο Rest API Fabric, απαιτείται ένα διακριτικό εφαρμογής Microsoft Entra. Καταχωρήστε μια εφαρμογή στην πλατφόρμα ταυτότητας της Microsoft.
Ορισμένα δεδομένα στο lakehouse σας, αυτό το παράδειγμα χρησιμοποιεί NYC Taxi & Limousine Commission green_tripdata_2022_08 ένα αρχείο parquet που φορτώνεται στο lakehouse.
Το API Livy ορίζει ένα ενοποιημένο τελικό σημείο για λειτουργίες. Αντικαταστήστε τα σύμβολα κράτησης θέσης {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}και {Fabric_LakehouseID} με τις κατάλληλες τιμές σας, όταν ακολουθείτε τα παραδείγματα σε αυτό το άρθρο.
Ρύθμιση παραμέτρων του Visual Studio Code για τη δέσμη API Livy
Επιλέξτε Ρυθμίσεις lakehouse στο Fabric Lakehouse σας.
Μεταβείτε στην ενότητα τελικού σημείου Livy.
Αντιγράψτε τη συμβολοσειρά σύνδεσης Εργασία δέσμης (δεύτερο κόκκινο πλαίσιο στην εικόνα) στον κώδικά σας.
Μεταβείτε στο Κέντρο διαχείρισης του Microsoft Entra και αντιγράψτε το Αναγνωριστικό εφαρμογής (πρόγραμμα-πελάτη) και το Αναγνωριστικό καταλόγου (μισθωτής) στον κώδικά σας.
Δημιουργήστε ένα ωφέλιμο φορτίο Spark και κάντε αποστολή στο Lakehouse σας
Δημιουργήστε ένα σημειωματάριο
.ipynb
στο Visual Studio Code και εισαγάγετε τον ακόλουθο κώδικαimport sys import os from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import col if __name__ == "__main__": #Spark session builder spark_session = (SparkSession .builder .appName("livybatchdemo") .getOrCreate()) spark_context = spark_session.sparkContext spark_context.setLogLevel("DEBUG") targetLakehouse = spark_context.getConf().get("spark.targetLakehouse") if targetLakehouse is not None: print("targetLakehouse: " + str(targetLakehouse)) else: print("targetLakehouse is None") df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0") df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4)) deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions" df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
Αποθηκεύστε το αρχείο Python τοπικά. Αυτό το ωφέλιμο φορτίο κώδικα Python περιέχει δύο προτάσεις Spark που λειτουργούν με δεδομένα σε ένα Lakehouse και πρέπει να αποσταλούν στο Lakehouse σας. Θα χρειαστείτε τη διαδρομή ABFS του ωφέλιμου φορτίου για αναφορά στην εργασία δέσμης API Livy στο Visual Studio Code και το όνομα του πίνακα Lakehouse στην πρόταση SQL Select..
Αποστείλετε το ωφέλιμο φορτίο Python στην ενότητα αρχείων του Lakehouse σας. > Κάντε κλικ στην επιλογή Λήψη δεδομένων > Αποστολή αρχείων > στο πλαίσιο εισόδου Αρχεία/Αρχεία.
Αφού το αρχείο είναι στην ενότητα Αρχεία του Lakehouse, κάντε κλικ στις τρεις τελείες στα δεξιά του ονόματος αρχείου ωφέλιμου φορτίου και επιλέξτε Ιδιότητες.
Αντιγράψτε αυτήν τη διαδρομή ABFS στο κελί του Σημειωματάριου στο βήμα 1.
Δημιουργία περιόδου λειτουργίας δέσμης Livy API Spark
Δημιουργήστε ένα σημειωματάριο
.ipynb
στο Visual Studio Code και εισαγάγετε τον ακόλουθο κώδικα.from msal import PublicClientApplication import requests import time tenant_id = "<Entra_TenantID>" client_id = "<Entra_ClientID>" workspace_id = "<Fabric_WorkspaceID>" lakehouse_id = "<Fabric_LakehouseID>" app = PublicClientApplication( client_id, authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48" ) result = None # If no cached tokens or user interaction needed, acquire tokens interactively if not result: result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"]) # Print the access token (you can use it to call APIs) if "access_token" in result: print(f"Access token: {result['access_token']}") else: print("Authentication failed or no access token obtained.") if "access_token" in result: access_token = result['access_token'] api_base_url_mist='https://api.fabric.microsoft.com/v1' livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches" headers = {"Authorization": "Bearer " + access_token}
Εκτελέστε το κελί σημειωματάριου και θα πρέπει να εμφανιστεί ένα αναδυόμενο παράθυρο στο πρόγραμμα περιήγησής σας, το οποίο σας επιτρέπει να επιλέξετε την ταυτότητα με την οποία θα εισέλθετε.
Αφού επιλέξετε την ταυτότητα με την οποία θα εισέλθετε, θα σας ζητηθεί επίσης να εγκρίνετε τα δικαιώματα API καταχώρησης εφαρμογής Microsoft Entra.
Κλείστε το παράθυρο του προγράμματος περιήγησης μετά την ολοκλήρωση του ελέγχου ταυτότητας.
Στο Visual Studio Code θα πρέπει να δείτε να επιστρέφεται το διακριτικό Microsoft Entra.
Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.
# call get batch API get_livy_get_batch = livy_base_url get_batch_response = requests.get(get_livy_get_batch, headers=headers) if get_batch_response.status_code == 200: print("API call successful") print(get_batch_response.json()) else: print(f"API call failed with status code: {get_batch_response.status_code}") print(get_batch_response.text)
Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε δύο γραμμές εκτυπωμένες κατά τη δημιουργία της μαζικής εργασίας Livy.
Υποβολή πρότασης spark.sql με χρήση της περιόδου λειτουργίας δέσμης API Livy
Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.
# submit payload to existing batch session print('Submit a spark job via the livy batch API to ') newlakehouseName = "YourNewLakehouseName" create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items" create_lakehouse_payload = { "displayName": newlakehouseName, "type": 'Lakehouse' } create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload) print(create_lakehouse_response.json()) payload_data = { "name":"livybatchdemo_with"+ newlakehouseName, "file":"abfss://YourABFSPathToYourPayload.py", "conf": { "spark.targetLakehouse": "Fabric_LakehouseID" } } get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data) print("The Livy batch job submitted successful") print(get_batch_response.json())
Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε πολλές γραμμές εκτυπωμένες καθώς δημιουργείται και εκτελείται η εργασία δέσμης Livy.
Επιστρέψτε στο Lakehouse για να δείτε τις αλλαγές.
Προβολή των εργασιών σας στο Κέντρο παρακολούθησης
Μπορείτε να αποκτήσετε πρόσβαση στο Κέντρο παρακολούθησης για να προβάλετε διάφορες δραστηριότητες Apache Spark, επιλέγοντας Παρακολούθηση στις συνδέσεις περιήγησης στην αριστερή πλευρά.
Όταν ολοκληρωθεί η μαζική εργασία, μπορείτε να προβάλετε την κατάσταση της περιόδου λειτουργίας μεταβαίνοντας στην Παρακολούθηση.
Επιλέξτε και ανοίξτε το πιο πρόσφατο όνομα δραστηριότητας.
Σε αυτήν την περίπτωση περιόδου λειτουργίας Livy API, μπορείτε να δείτε την προηγούμενη υποβολή δέσμης, λεπτομέρειες εκτέλεσης, εκδόσεις Spark και ρυθμίσεις παραμέτρων. Παρατηρήστε την κατάσταση διακοπής στην επάνω δεξιά γωνία.
Για να ανακεφαλαιώσετε ολόκληρη τη διαδικασία, χρειάζεστε ένα απομακρυσμένο πρόγραμμα-πελάτη, όπως κώδικα Visual Studio, ένα διακριτικό εφαρμογής Microsoft Entra, μια διεύθυνση URL τελικού σημείου Livy API, έλεγχο ταυτότητας για το Lakehouse σας, ένα ωφέλιμο φορτίο Spark στο Lakehouse σας και, τέλος, μια μαζική συνεδρία API Livy.