Κοινή χρήση μέσω


Χρήση του API Livy για την υποβολή και εκτέλεση εργασιών δέσμης livy

Σημείωμα

Το Livy API για τη Μηχανική δεδομένων Fabric είναι σε προεπισκόπηση.

Ισχύει για:✅ Διαχείριση δεδομένων και Επιστήμη δεδομένων στο Microsoft Fabric

Υποβάλετε εργασίες δέσμης Spark χρησιμοποιώντας το Livy API για τη Μηχανική δεδομένων Fabric.

Προαπαιτούμενα στοιχεία

Το API Livy ορίζει ένα ενοποιημένο τελικό σημείο για λειτουργίες. Αντικαταστήστε τα σύμβολα κράτησης θέσης {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}και {Fabric_LakehouseID} με τις κατάλληλες τιμές σας, όταν ακολουθείτε τα παραδείγματα σε αυτό το άρθρο.

Ρύθμιση παραμέτρων του Visual Studio Code για τη δέσμη API Livy

  1. Επιλέξτε Ρυθμίσεις lakehouse στο Fabric Lakehouse σας.

    Στιγμιότυπο οθόνης που εμφανίζει τις ρυθμίσεις του Lakehouse.

  2. Μεταβείτε στην ενότητα τελικού σημείου Livy.

    στιγμιότυπο οθόνης που εμφανίζει τη συμβολοσειρά σύνδεσης τελικού σημείου Lakehouse Livy και τη συμβολοσειρά σύνδεσης εργασίας περιόδου λειτουργίας.

  3. Αντιγράψτε τη συμβολοσειρά σύνδεσης Εργασία δέσμης (δεύτερο κόκκινο πλαίσιο στην εικόνα) στον κώδικά σας.

  4. Μεταβείτε στο Κέντρο διαχείρισης του Microsoft Entra και αντιγράψτε το Αναγνωριστικό εφαρμογής (πρόγραμμα-πελάτη) και το Αναγνωριστικό καταλόγου (μισθωτής) στον κώδικά σας.

    Στιγμιότυπο οθόνης που εμφανίζει την επισκόπηση της εφαρμογής API Livy στο Κέντρο διαχείρισης Microsoft Entra.

Δημιουργήστε ένα ωφέλιμο φορτίο Spark και κάντε αποστολή στο Lakehouse σας

  1. Δημιουργήστε ένα σημειωματάριο .ipynb στο Visual Studio Code και εισαγάγετε τον ακόλουθο κώδικα

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Αποθηκεύστε το αρχείο Python τοπικά. Αυτό το ωφέλιμο φορτίο κώδικα Python περιέχει δύο προτάσεις Spark που λειτουργούν με δεδομένα σε ένα Lakehouse και πρέπει να αποσταλούν στο Lakehouse σας. Θα χρειαστείτε τη διαδρομή ABFS του ωφέλιμου φορτίου για αναφορά στην εργασία δέσμης API Livy στο Visual Studio Code και το όνομα του πίνακα Lakehouse στην πρόταση SQL Select..

    Στιγμιότυπο οθόνης που εμφανίζει το κελί ωφέλιμου φορτίου Python.

  3. Αποστείλετε το ωφέλιμο φορτίο Python στην ενότητα αρχείων του Lakehouse σας. > Κάντε κλικ στην επιλογή Λήψη δεδομένων > Αποστολή αρχείων > στο πλαίσιο εισόδου Αρχεία/Αρχεία.

    Στιγμιότυπο οθόνης που εμφανίζει το ωφέλιμο φορτίο στην ενότητα Αρχεία του Lakehouse.

  4. Αφού το αρχείο είναι στην ενότητα Αρχεία του Lakehouse, κάντε κλικ στις τρεις τελείες στα δεξιά του ονόματος αρχείου ωφέλιμου φορτίου και επιλέξτε Ιδιότητες.

    Στιγμιότυπο οθόνης που εμφανίζει τη διαδρομή ABFS ωφέλιμου φορτίου στις Ιδιότητες του αρχείου στο Lakehouse.

  5. Αντιγράψτε αυτήν τη διαδρομή ABFS στο κελί του Σημειωματάριου στο βήμα 1.

Δημιουργία περιόδου λειτουργίας δέσμης Livy API Spark

  1. Δημιουργήστε ένα σημειωματάριο .ipynb στο Visual Studio Code και εισαγάγετε τον ακόλουθο κώδικα.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Εκτελέστε το κελί σημειωματάριου και θα πρέπει να εμφανιστεί ένα αναδυόμενο παράθυρο στο πρόγραμμα περιήγησής σας, το οποίο σας επιτρέπει να επιλέξετε την ταυτότητα με την οποία θα εισέλθετε.

    Στιγμιότυπο οθόνης που εμφανίζει την οθόνη σύνδεσης στην εφαρμογή Microsoft Entra.

  3. Αφού επιλέξετε την ταυτότητα με την οποία θα εισέλθετε, θα σας ζητηθεί επίσης να εγκρίνετε τα δικαιώματα API καταχώρησης εφαρμογής Microsoft Entra.

    Στιγμιότυπο οθόνης που εμφανίζει τα δικαιώματα API της εφαρμογής Microsoft Entra.

  4. Κλείστε το παράθυρο του προγράμματος περιήγησης μετά την ολοκλήρωση του ελέγχου ταυτότητας.

    Στιγμιότυπο οθόνης που εμφανίζει τον έλεγχο ταυτότητας ολοκληρώθηκε.

  5. Στο Visual Studio Code θα πρέπει να δείτε να επιστρέφεται το διακριτικό Microsoft Entra.

    Στιγμιότυπο οθόνης που εμφανίζει το διακριτικό Microsoft Entra που επιστράφηκε μετά την εκτέλεση του κελιού και την είσοδο.

  6. Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε δύο γραμμές εκτυπωμένες κατά τη δημιουργία της μαζικής εργασίας Livy.

    Στιγμιότυπο οθόνης που εμφανίζει τα αποτελέσματα της δημιουργίας περιόδου λειτουργίας δέσμης.

Υποβολή πρότασης spark.sql με χρήση της περιόδου λειτουργίας δέσμης API Livy

  1. Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε πολλές γραμμές εκτυπωμένες καθώς δημιουργείται και εκτελείται η εργασία δέσμης Livy.

    Στιγμιότυπο οθόνης που εμφανίζει τα αποτελέσματα στον κώδικα Visual Studio μετά την επιτυχημένη υποβολή της εργασίας δέσμης Livy.

  3. Επιστρέψτε στο Lakehouse για να δείτε τις αλλαγές.

Προβολή των εργασιών σας στο Κέντρο παρακολούθησης

Μπορείτε να αποκτήσετε πρόσβαση στο Κέντρο παρακολούθησης για να προβάλετε διάφορες δραστηριότητες Apache Spark, επιλέγοντας Παρακολούθηση στις συνδέσεις περιήγησης στην αριστερή πλευρά.

  1. Όταν ολοκληρωθεί η μαζική εργασία, μπορείτε να προβάλετε την κατάσταση της περιόδου λειτουργίας μεταβαίνοντας στην Παρακολούθηση.

    Στιγμιότυπο οθόνης που εμφανίζει προηγούμενες υποβολές API Livy στο Κέντρο παρακολούθησης.

  2. Επιλέξτε και ανοίξτε το πιο πρόσφατο όνομα δραστηριότητας.

    Στιγμιότυπο οθόνης που εμφανίζει την πιο πρόσφατη δραστηριότητα του Livy API στο Κέντρο παρακολούθησης.

  3. Σε αυτήν την περίπτωση περιόδου λειτουργίας Livy API, μπορείτε να δείτε την προηγούμενη υποβολή δέσμης, λεπτομέρειες εκτέλεσης, εκδόσεις Spark και ρυθμίσεις παραμέτρων. Παρατηρήστε την κατάσταση διακοπής στην επάνω δεξιά γωνία.

    Στιγμιότυπο οθόνης που εμφανίζει τις πιο πρόσφατες λεπτομέρειες δραστηριότητας του Livy API στο Κέντρο παρακολούθησης.

Για να ανακεφαλαιώσετε ολόκληρη τη διαδικασία, χρειάζεστε ένα απομακρυσμένο πρόγραμμα-πελάτη, όπως κώδικα Visual Studio, ένα διακριτικό εφαρμογής Microsoft Entra, μια διεύθυνση URL τελικού σημείου Livy API, έλεγχο ταυτότητας για το Lakehouse σας, ένα ωφέλιμο φορτίο Spark στο Lakehouse σας και, τέλος, μια μαζική συνεδρία API Livy.