Κοινή χρήση μέσω


Πώς μπορείτε να δημιουργήσετε και να ενημερώσετε έναν ορισμό εργασίας Spark με το Microsoft Fabric Rest API

Το Microsoft Fabric Rest API παρέχει ένα τελικό σημείο υπηρεσίας για λειτουργίες CRUD στοιχείων Fabric. Σε αυτό το εκπαιδευτικό βοήθημα, θα σας καθοδηγήσουμε σε ένα αναλυτικό σενάριο για τον τρόπο δημιουργίας και ενημέρωσης ενός τεχνουργήματος Spark Job Definition. Εμπλέκονται τρία βήματα υψηλού επιπέδου:

  1. δημιουργήστε ένα στοιχείο Spark Job Definition με κάποια αρχική κατάσταση
  2. αποστολή του αρχείου κύριου ορισμού και άλλων αρχείων lib
  3. ενημερώστε το στοιχείο Spark Job Definition με τη διεύθυνση URL OneLake του αρχείου κύριου ορισμού και άλλων αρχείων lib

Προαπαιτούμενα στοιχεία

  1. Για να αποκτήσετε πρόσβαση στο Rest API Fabric, απαιτείται ένα διακριτικό Microsoft Entra. Η βιβλιοθήκη MSAL συνιστάται για τη λήψη του διακριτικού. Για περισσότερες πληροφορίες, ανατρέξτε στο θέμα Υποστήριξη ροής ελέγχου ταυτότητας στο MSAL.
  2. Απαιτείται ένα διακριτικό χώρου αποθήκευσης για πρόσβαση στο API OneLake. Για περισσότερες πληροφορίες, ανατρέξτε στο θέμα MSAL για Python.

Δημιουργία στοιχείου ορισμού εργασίας Spark με την αρχική κατάσταση

Το Microsoft Fabric Rest API ορίζει ένα ενοποιημένο τελικό σημείο για λειτουργίες CRUD στοιχείων Fabric. Το τελικό σημείο είναι https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Η λεπτομέρεια του στοιχείου καθορίζεται μέσα στο σώμα της αίτησης. Ακολουθεί ένα παράδειγμα του σώματος αίτησης για τη δημιουργία ενός στοιχείου Ορισμός εργασίας Spark:

{
    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
                "payloadType": "InlineBase64"
            }
        ]
    }
}

Σε αυτό το παράδειγμα, το στοιχείο ορισμού εργασίας Spark ονομάζεται ως SJDHelloWorld. Το payload πεδίο είναι το κωδικοποιημένο περιεχόμενο base64 της εγκατάστασης λεπτομερειών, μετά την αποκωδικοποίηση, το περιεχόμενο είναι:

{
    "executableFile":null,
    "defaultLakehouseArtifactId":"",
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":null,
    "commandLineArguments":"",
    "additionalLibraryUris":[],
    "language":"",
    "environmentArtifactId":null
}

Ακολουθούν δύο βοηθητικές συναρτήσεις για την κωδικοποίηση και την αποκωδικοποίηση της λεπτομερούς ρύθμισης:

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Ακολουθεί το τμήμα κώδικα για να δημιουργήσετε ένα στοιχείο Spark Job Definition:

import requests

bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"
            }
        ]
    }
}

# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)

Αποστολή του αρχείου κύριου ορισμού και άλλων αρχείων lib

Απαιτείται ένα διακριτικό χώρου αποθήκευσης για την αποστολή του αρχείου στο OneLake. Ακολουθεί μια συνάρτηση βοηθητικής εφαρμογής για να λάβετε το διακριτικό χώρου αποθήκευσης:


import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "{client id}", # this filed should be the client id 
        authority="https://login.microsoftonline.com/microsoft.com")

    result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

Τώρα έχουμε δημιουργήσει ένα στοιχείο Spark Job Definition, το οποίο μπορεί να εκτελεστεί, πρέπει να ρυθμίσουμε το αρχείο κύριου ορισμού και τις απαιτούμενες ιδιότητες. Το τελικό σημείο για την αποστολή του αρχείου για αυτό το στοιχείο SJD είναι https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}. Θα πρέπει να χρησιμοποιηθεί το ίδιο "workspaceId" από το προηγούμενο βήμα. Η τιμή "sjdartifactid" μπορεί να βρεθεί στο σώμα απόκρισης του προηγούμενου βήματος. Ακολουθεί το τμήμα κώδικα για να ρυθμίσετε το αρχείο κύριου ορισμού:

import requests

# three steps are required: create file, append file, flush file

onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value


onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")

# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file

appendPosition = 0;
appendAction = "append";

### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type" : "text/plain"
}

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")

# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file

flushAction = "flush";

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
    print(onelakeFlushMainFileResponse.json())

Ακολουθήστε την ίδια διαδικασία για να αποστείλετε τα άλλα αρχεία lib, εάν είναι απαραίτητο.

Ενημερώστε το στοιχείο Spark Job Definition με τη διεύθυνση URL OneLake του αρχείου κύριου ορισμού και άλλων αρχείων lib

Μέχρι τώρα, δημιουργήσαμε ένα στοιχείο Spark Job Definition με κάποια αρχική κατάσταση, αποστείλαμε το αρχείο κύριου ορισμού και άλλα αρχεία lib. Το τελευταίο βήμα είναι να ενημερώσετε το στοιχείο Spark Job Definition για να ορίσετε τις ιδιότητες διεύθυνσης URL του αρχείου κύριου ορισμού και άλλων αρχείων lib. Το τελικό σημείο για την ενημέρωση του στοιχείου Ορισμός εργασίας Spark είναι https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}. Θα πρέπει να χρησιμοποιείται η ίδια "workspaceId" και "sjdartifactid" από τα προηγούμενα βήματα. Ακολουθεί το τμήμα κώδικα για να ενημερώσετε το στοιχείο Spark Job Definition:


mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}"  # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id

updateRequestBodyJson = {
    "executableFile":mainAbfssPath,
    "defaultLakehouseArtifactId":defaultLakehouseId,
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":None,
    "commandLineArguments":"",
    "additionalLibraryUris":[libsAbfssPath],
    "language":"Python",
    "environmentArtifactId":None}

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)

# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
            {
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType
            }
        ]
    }
}


# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")
else:
    print(response.json())
    print(response.status_code)

Για να ανακεφαλαιώσουμε ολόκληρη τη διαδικασία, απαιτούνται και τα δύο Fabric REST API και OneLake API για τη δημιουργία και ενημέρωση ενός στοιχείου Spark Job Definition. Το REST API Fabric χρησιμοποιείται για τη δημιουργία και την ενημέρωση του στοιχείου Spark Job Definition, το API OneLake χρησιμοποιείται για την αποστολή του αρχείου κύριου ορισμού και άλλων αρχείων lib. Το αρχείο κύριου ορισμού και άλλα αρχεία lib αποστέλλονται πρώτα στο OneLake. Στη συνέχεια, οι ιδιότητες διεύθυνσης URL του αρχείου κύριου ορισμού και άλλων αρχείων lib ορίζονται στο στοιχείο Ορισμός εργασίας Spark.