Κοινή χρήση μέσω


Χρήση του API Livy για την υποβολή και εκτέλεση εργασιών περιόδου λειτουργίας

Σημείωμα

Το Livy API για τη Μηχανική δεδομένων Fabric είναι σε προεπισκόπηση.

Ισχύει για:✅ Διαχείριση δεδομένων και Επιστήμη δεδομένων στο Microsoft Fabric

Υποβάλετε εργασίες δέσμης Spark χρησιμοποιώντας το Livy API για τη Μηχανική δεδομένων Fabric.

Προαπαιτούμενα στοιχεία

Το API Livy ορίζει ένα ενοποιημένο τελικό σημείο για λειτουργίες. Αντικαταστήστε τα σύμβολα κράτησης θέσης {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID}και {Fabric_LakehouseID} με τις κατάλληλες τιμές σας, όταν ακολουθείτε τα παραδείγματα σε αυτό το άρθρο.

Ρύθμιση παραμέτρων του Visual Studio Code για την περίοδο λειτουργίας API Livy

  1. Επιλέξτε Ρυθμίσεις lakehouse στο Fabric Lakehouse σας.

    Στιγμιότυπο οθόνης που εμφανίζει τις ρυθμίσεις του Lakehouse.

  2. Μεταβείτε στην ενότητα τελικού σημείου Livy.

    στιγμιότυπο οθόνης που εμφανίζει τη συμβολοσειρά σύνδεσης τελικού σημείου Lakehouse Livy και τη συμβολοσειρά σύνδεσης εργασίας περιόδου λειτουργίας.

  3. Αντιγράψτε τη συμβολοσειρά σύνδεσης εργασίας περιόδου λειτουργίας (πρώτο κόκκινο πλαίσιο στην εικόνα) στον κώδικά σας.

  4. Μεταβείτε στο Κέντρο διαχείρισης του Microsoft Entra και αντιγράψτε το Αναγνωριστικό εφαρμογής (πρόγραμμα-πελάτη) και το Αναγνωριστικό καταλόγου (μισθωτής) στον κώδικά σας.

    Στιγμιότυπο οθόνης που εμφανίζει την επισκόπηση της εφαρμογής API Livy στο Κέντρο διαχείρισης Microsoft Entra.

Δημιουργία περιόδου λειτουργίας Livy API Spark

  1. Δημιουργήστε ένα σημειωματάριο .ipynb στο Visual Studio Code και εισαγάγετε τον ακόλουθο κώδικα.

    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "Entra_TenantID"
    client_id = "Entra_ClientID"
    
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    app = PublicClientApplication(
        client_id,
        authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
    # If no cached tokens or user interaction needed, acquire tokens interactively
    if not result:
        result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                   "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                   "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
        print(f"Access token: {result['access_token']}")
    else:
        print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
        access_token = result['access_token']
        api_base_url_mist='https://api.fabric.microsoft.com/v1'
        livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/sessions"
        headers = {"Authorization": "Bearer " + access_token}
    
  2. Εκτελέστε το κελί σημειωματάριου και θα πρέπει να εμφανιστεί ένα αναδυόμενο παράθυρο στο πρόγραμμα περιήγησής σας, το οποίο σας επιτρέπει να επιλέξετε την ταυτότητα με την οποία θα εισέλθετε.

    Στιγμιότυπο οθόνης που εμφανίζει την οθόνη σύνδεσης στην εφαρμογή Microsoft Entra.

  3. Αφού επιλέξετε την ταυτότητα με την οποία θα εισέλθετε, θα σας ζητηθεί επίσης να εγκρίνετε τα δικαιώματα API καταχώρησης εφαρμογής Microsoft Entra.

    Στιγμιότυπο οθόνης που εμφανίζει τα δικαιώματα API της εφαρμογής Microsoft Entra.

  4. Κλείστε το παράθυρο του προγράμματος περιήγησης μετά την ολοκλήρωση του ελέγχου ταυτότητας.

    Στιγμιότυπο οθόνης που εμφανίζει τον έλεγχο ταυτότητας ολοκληρώθηκε.

  5. Στο Visual Studio Code, θα πρέπει να δείτε να επιστρέφεται το διακριτικό Microsoft Entra.

    Στιγμιότυπο οθόνης που εμφανίζει το διακριτικό Microsoft Entra που επιστράφηκε μετά την εκτέλεση του κελιού και την είσοδο.

  6. Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.

    create_livy_session = requests.post(livy_base_url, headers=headers, json={})
    print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
  7. Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε μία γραμμή εκτυπωμένη καθώς δημιουργείται η περίοδος λειτουργίας Livy.

    Στιγμιότυπο οθόνης που εμφανίζει τα αποτελέσματα της πρώτης εκτέλεσης κελιού σημειωματάριου.

  8. Μπορείτε να επαληθεύσετε ότι η περίοδος λειτουργίας Livy δημιουργείται χρησιμοποιώντας την [Προβολή των εργασιών σας στο Κέντρο παρακολούθησης](#View εργασιών σας στο Κέντρο παρακολούθησης).

Υποβολή πρότασης spark.sql με χρήση της περιόδου λειτουργίας Livy API Spark

  1. Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.

    # call get session API
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data =    {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where fare_amount = 60\").show()",
        "kind": "spark"
        }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
        # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε πολλές επαυξητικές γραμμές εκτυπωμένες κατά την υποβολή της εργασίας και την επιστροφή των αποτελεσμάτων.

    Στιγμιότυπο οθόνης που εμφανίζει τα αποτελέσματα του πρώτου κελιού σημειωματάριου με Spark.sql εκτέλεση.

Υποβάλετε μια δεύτερη πρόταση spark.sql χρησιμοποιώντας την περίοδο λειτουργίας Livy API Spark

  1. Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.

    # call get session API
    
    livy_session_id = create_livy_session.json()['id']
    livy_session_url = livy_base_url + "/" + livy_session_id
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    while get_session_response.json()["state"] != "idle":
        time.sleep(5)
        get_session_response = requests.get(livy_session_url, headers=headers)
    
    execute_statement = livy_session_url + "/statements"
    payload_data = {
        "code": "spark.sql(\"SELECT * FROM green_tripdata_2022_08 where tip_amount = 10\").show()",
        "kind": "spark"
    }
    execute_statement_response = requests.post(execute_statement, headers=headers, json=payload_data)
    print('the statement code is submitted as: ' + str(execute_statement_response.json()))
    
    statement_id = str(execute_statement_response.json()['id'])
    get_statement = livy_session_url+ "/statements/" + statement_id
    get_statement_response = requests.get(get_statement, headers=headers)
    
    while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
        time.sleep(5)
        print('the statement code is submitted and running : ' + str(execute_statement_response.json()))
    
        # Make the next request
        get_statement_response = requests.get(get_statement, headers=headers)
    
    rst = get_statement_response.json()['output']['data']['text/plain']
    print(rst)
    
  2. Εκτελέστε το κελί σημειωματάριου, θα πρέπει να δείτε πολλές επαυξητικές γραμμές εκτυπωμένες κατά την υποβολή της εργασίας και την επιστροφή των αποτελεσμάτων.

    Στιγμιότυπο οθόνης που εμφανίζει τα αποτελέσματα της εκτέλεσης του δεύτερου κελιού σημειωματάριου.

Κλείσιμο της συνεδρίας Livy με τρίτη δήλωση

  1. Προσθέστε ένα άλλο κελί σημειωματάριου και εισαγάγετε αυτόν τον κώδικα.

    # call get session API with a delete session statement
    
    get_session_response = requests.get(livy_session_url, headers=headers)
    print('Livy statement URL ' + livy_session_url)
    
    response = requests.delete(livy_session_url, headers=headers)
    print (response)
    

Προβολή των εργασιών σας στο Κέντρο παρακολούθησης

Μπορείτε να αποκτήσετε πρόσβαση στο Κέντρο παρακολούθησης για να προβάλετε διάφορες δραστηριότητες Apache Spark, επιλέγοντας Παρακολούθηση στις συνδέσεις περιήγησης στην αριστερή πλευρά.

  1. Όταν η περίοδος λειτουργίας είναι σε εξέλιξη ή σε κατάσταση ολοκλήρωσης, μπορείτε να προβάλετε την κατάσταση της περιόδου λειτουργίας μεταβαίνοντας στην Παρακολούθηση.

    Στιγμιότυπο οθόνης που εμφανίζει προηγούμενες υποβολές API Livy στο Κέντρο παρακολούθησης.

  2. Επιλέξτε και ανοίξτε το πιο πρόσφατο όνομα δραστηριότητας.

    Στιγμιότυπο οθόνης που εμφανίζει την πιο πρόσφατη δραστηριότητα του Livy API στο Κέντρο παρακολούθησης.

  3. Σε αυτή την περίπτωση περιόδου λειτουργίας Livy API, μπορείτε να δείτε τις προηγούμενες υποβολές περιόδων λειτουργίας, λεπτομέρειες εκτέλεσης, εκδόσεις Spark και ρύθμιση παραμέτρων. Παρατηρήστε την κατάσταση διακοπής στην επάνω δεξιά γωνία.

    Στιγμιότυπο οθόνης που εμφανίζει τις πιο πρόσφατες λεπτομέρειες δραστηριότητας του Livy API στο Κέντρο παρακολούθησης.

Για να ανακεφαλαιώσετε ολόκληρη τη διαδικασία, χρειάζεστε ένα απομακρυσμένο πρόγραμμα-πελάτη, όπως κώδικα Visual Studio, ένα διακριτικό εφαρμογής Microsoft Entra, μια διεύθυνση URL τελικού σημείου Livy API, έλεγχο ταυτότητας για το Lakehouse σας και, τέλος, ένα API Session Livy.