Udostępnij za pośrednictwem


Jak utworzyć i zaktualizować definicję zadania platformy Spark przy użyciu interfejsu API REST usługi Microsoft Fabric

Interfejs API REST usługi Microsoft Fabric udostępnia punkt końcowy usługi dla operacji CRUD elementów sieci szkieletowej. W tym samouczku przedstawiono kompleksowe scenariusze tworzenia i aktualizowania artefaktu definicji zadania platformy Spark. Obejmuje to trzy ogólne kroki:

  1. tworzenie elementu definicji zadania platformy Spark z pewnym stanem początkowym
  2. przekaż plik definicji głównej i inne pliki lib
  3. zaktualizuj element definicji zadania platformy Spark przy użyciu adresu URL oneLake głównego pliku definicji i innych plików lib

Wymagania wstępne

  1. Token entra firmy Microsoft jest wymagany do uzyskania dostępu do interfejsu API REST sieci szkieletowej. Zaleca się pobranie tokenu przez bibliotekę MSAL. Aby uzyskać więcej informacji, zobacz Obsługa przepływu uwierzytelniania w usłudze MSAL.
  2. Token magazynu jest wymagany do uzyskania dostępu do interfejsu API OneLake. Aby uzyskać więcej informacji, zobacz BIBLIOTEKA MSAL dla języka Python.

Tworzenie elementu definicji zadania platformy Spark ze stanem początkowym

Interfejs API REST usługi Microsoft Fabric definiuje ujednolicony punkt końcowy dla operacji CRUD elementów sieci szkieletowej. Punkt końcowy to https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Szczegóły elementu są określone wewnątrz treści żądania. Oto przykład treści żądania tworzenia elementu definicji zadania platformy Spark:

{
    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
                "payloadType": "InlineBase64"
            }
        ]
    }
}

W tym przykładzie element Definicji zadania platformy Spark ma nazwę SJDHelloWorld. Pole payload to zakodowana w formacie base64 zawartość konfiguracji szczegółów po dekodowaniu zawartości:

{
    "executableFile":null,
    "defaultLakehouseArtifactId":"",
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":null,
    "commandLineArguments":"",
    "additionalLibraryUris":[],
    "language":"",
    "environmentArtifactId":null
}

Poniżej przedstawiono dwie funkcje pomocnicze do kodowania i dekodowania szczegółowej konfiguracji:

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Oto fragment kodu umożliwiający utworzenie elementu definicji zadania platformy Spark:

import requests

bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"
            }
        ]
    }
}

# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)

Przekazywanie pliku definicji głównej i innych plików lib

Token magazynu jest wymagany do przekazania pliku do usługi OneLake. Oto funkcja pomocnika umożliwiająca uzyskanie tokenu magazynu:


import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "{client id}", # this filed should be the client id 
        authority="https://login.microsoftonline.com/microsoft.com")

    result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

Teraz mamy utworzony element definicji zadania platformy Spark, aby można go było uruchomić, musimy skonfigurować plik definicji głównej i wymagane właściwości. Punkt końcowy do przekazywania pliku dla tego elementu SJD to https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}. Należy użyć tego samego identyfikatora "workspaceId" z poprzedniego kroku. Wartość "sjdartifactid" można znaleźć w treści odpowiedzi poprzedniego kroku. Oto fragment kodu, który umożliwia skonfigurowanie pliku definicji głównej:

import requests

# three steps are required: create file, append file, flush file

onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value


onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")

# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file

appendPosition = 0;
appendAction = "append";

### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type" : "text/plain"
}

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")

# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file

flushAction = "flush";

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
    print(onelakeFlushMainFileResponse.json())

Postępuj zgodnie z tym samym procesem, aby w razie potrzeby przekazać inne pliki lib.

Zaktualizuj element Definicji zadania platformy Spark przy użyciu adresu URL oneLake głównego pliku definicji i innych plików lib

Do tej pory utworzyliśmy element definicji zadania platformy Spark z pewnym stanem początkowym, przekazano plik definicji głównej i inne pliki lib. Ostatnim krokiem jest zaktualizowanie elementu definicji zadania platformy Spark w celu ustawienia właściwości adresu URL głównego pliku definicji i innych plików lib. Punkt końcowy służą do aktualizowania elementu definicji zadania platformy Spark to https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}. Należy użyć tych samych parametrów "workspaceId" i "sjdartifactid" z poprzednich kroków. Oto fragment kodu służący do aktualizowania elementu definicji zadania platformy Spark:


mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}"  # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id

updateRequestBodyJson = {
    "executableFile":mainAbfssPath,
    "defaultLakehouseArtifactId":defaultLakehouseId,
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":None,
    "commandLineArguments":"",
    "additionalLibraryUris":[libsAbfssPath],
    "language":"Python",
    "environmentArtifactId":None}

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)

# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
            {
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType
            }
        ]
    }
}


# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")
else:
    print(response.json())
    print(response.status_code)

Aby podsumować cały proces, zarówno interfejs API REST sieci szkieletowej, jak i interfejs API OneLake są potrzebne do utworzenia i zaktualizowania elementu definicji zadania platformy Spark. Interfejs API REST sieci szkieletowej służy do tworzenia i aktualizowania elementu definicji zadania platformy Spark. Interfejs API OneLake służy do przekazywania głównego pliku definicji i innych plików lib. Główny plik definicji i inne pliki lib są najpierw przekazywane do usługi OneLake. Następnie właściwości adresu URL głównego pliku definicji i innych plików lib są ustawiane w elemencie Definicji zadania platformy Spark.