Jak utworzyć i zaktualizować definicję zadania platformy Spark przy użyciu interfejsu API REST usługi Microsoft Fabric
Interfejs API REST usługi Microsoft Fabric udostępnia punkt końcowy usługi dla operacji CRUD elementów sieci szkieletowej. W tym samouczku przedstawiono kompleksowe scenariusze tworzenia i aktualizowania artefaktu definicji zadania platformy Spark. Obejmuje to trzy ogólne kroki:
- tworzenie elementu definicji zadania platformy Spark z pewnym stanem początkowym
- przekaż plik definicji głównej i inne pliki lib
- zaktualizuj element definicji zadania platformy Spark przy użyciu adresu URL oneLake głównego pliku definicji i innych plików lib
Wymagania wstępne
- Token entra firmy Microsoft jest wymagany do uzyskania dostępu do interfejsu API REST sieci szkieletowej. Zaleca się pobranie tokenu przez bibliotekę MSAL. Aby uzyskać więcej informacji, zobacz Obsługa przepływu uwierzytelniania w usłudze MSAL.
- Token magazynu jest wymagany do uzyskania dostępu do interfejsu API OneLake. Aby uzyskać więcej informacji, zobacz BIBLIOTEKA MSAL dla języka Python.
Tworzenie elementu definicji zadania platformy Spark ze stanem początkowym
Interfejs API REST usługi Microsoft Fabric definiuje ujednolicony punkt końcowy dla operacji CRUD elementów sieci szkieletowej. Punkt końcowy to https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items
.
Szczegóły elementu są określone wewnątrz treści żądania. Oto przykład treści żądania tworzenia elementu definicji zadania platformy Spark:
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
"payloadType": "InlineBase64"
}
]
}
}
W tym przykładzie element Definicji zadania platformy Spark ma nazwę SJDHelloWorld
. Pole payload
to zakodowana w formacie base64 zawartość konfiguracji szczegółów po dekodowaniu zawartości:
{
"executableFile":null,
"defaultLakehouseArtifactId":"",
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":null,
"commandLineArguments":"",
"additionalLibraryUris":[],
"language":"",
"environmentArtifactId":null
}
Poniżej przedstawiono dwie funkcje pomocnicze do kodowania i dekodowania szczegółowej konfiguracji:
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Oto fragment kodu umożliwiający utworzenie elementu definicji zadania platformy Spark:
import requests
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
}
]
}
}
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
Przekazywanie pliku definicji głównej i innych plików lib
Token magazynu jest wymagany do przekazania pliku do usługi OneLake. Oto funkcja pomocnika umożliwiająca uzyskanie tokenu magazynu:
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"{client id}", # this filed should be the client id
authority="https://login.microsoftonline.com/microsoft.com")
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
Teraz mamy utworzony element definicji zadania platformy Spark, aby można go było uruchomić, musimy skonfigurować plik definicji głównej i wymagane właściwości. Punkt końcowy do przekazywania pliku dla tego elementu SJD to https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}
. Należy użyć tego samego identyfikatora "workspaceId" z poprzedniego kroku. Wartość "sjdartifactid" można znaleźć w treści odpowiedzi poprzedniego kroku. Oto fragment kodu, który umożliwia skonfigurowanie pliku definicji głównej:
import requests
# three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")
# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file
appendPosition = 0;
appendAction = "append";
### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type" : "text/plain"
}
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")
# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file
flushAction = "flush";
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
print(onelakeFlushMainFileResponse.json())
Postępuj zgodnie z tym samym procesem, aby w razie potrzeby przekazać inne pliki lib.
Zaktualizuj element Definicji zadania platformy Spark przy użyciu adresu URL oneLake głównego pliku definicji i innych plików lib
Do tej pory utworzyliśmy element definicji zadania platformy Spark z pewnym stanem początkowym, przekazano plik definicji głównej i inne pliki lib. Ostatnim krokiem jest zaktualizowanie elementu definicji zadania platformy Spark w celu ustawienia właściwości adresu URL głównego pliku definicji i innych plików lib. Punkt końcowy służą do aktualizowania elementu definicji zadania platformy Spark to https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}
. Należy użyć tych samych parametrów "workspaceId" i "sjdartifactid" z poprzednich kroków. Oto fragment kodu służący do aktualizowania elementu definicji zadania platformy Spark:
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id
updateRequestBodyJson = {
"executableFile":mainAbfssPath,
"defaultLakehouseArtifactId":defaultLakehouseId,
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":None,
"commandLineArguments":"",
"additionalLibraryUris":[libsAbfssPath],
"language":"Python",
"environmentArtifactId":None}
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
{
"path": path,
"payload": updatePayload,
"payloadType": payloadType
}
]
}
}
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
else:
print(response.json())
print(response.status_code)
Aby podsumować cały proces, zarówno interfejs API REST sieci szkieletowej, jak i interfejs API OneLake są potrzebne do utworzenia i zaktualizowania elementu definicji zadania platformy Spark. Interfejs API REST sieci szkieletowej służy do tworzenia i aktualizowania elementu definicji zadania platformy Spark. Interfejs API OneLake służy do przekazywania głównego pliku definicji i innych plików lib. Główny plik definicji i inne pliki lib są najpierw przekazywane do usługi OneLake. Następnie właściwości adresu URL głównego pliku definicji i innych plików lib są ustawiane w elemencie Definicji zadania platformy Spark.