Cómo crear una definición de trabajo con la API de REST de Spark en Microsoft Fabric
La API REST de Microsoft Fabric proporciona un punto de conexión de servicio para el funcionamiento de CRUD del elemento de Fabric. En este tutorial, se explica un escenario completo para crear y actualizar un artefacto de definición de trabajo de Spark. Tres pasos de alto nivel son necesarios:
- Crear un elemento de definición de trabajo de Spark con algún estado inicial
- Cargar el archivo de definición principal y otros archivos lib
- Actualizar el elemento de definición de trabajo de Spark con la dirección URL de OneLake del archivo de definición principal y otros archivos lib
Requisitos previos
- Un token de Microsoft Entra es necesario para acceder a la API REST de Fabric. Se recomienda la biblioteca MSAL para obtener el token. Para más información, consulte Compatibilidad con el flujo de autenticación de MSAL.
- Se requiere un token de almacenamiento para acceder a la API de OneLake. Para más información, consulte MSAL para Python.
Creación de un elemento de definición de trabajo de Spark con estado inicial
La API REST de Microsoft Fabric define un punto de conexión de servicio unificado para el funcionamiento de CRUD de elementos de Fabric. El extremo es https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items
El detalle del elemento se especifica dentro del cuerpo de la solicitud. Este es un ejemplo del cuerpo de la solicitud para crear un elemento de definición de trabajo de Spark:
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
"path": "SparkJobDefinitionV1.json",
"payloadType": "InlineBase64"
En este ejemplo, el elemento de definición de trabajo de Spark se denomina SJDHelloWorld
. El campo payload
es el contenido codificado en base64 de la configuración de detalles, después de la descodificación; el contenido es:
Estas son dos funciones de asistencia para codificar y descodificar la configuración detallada:
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Este es el fragmento de código para crear un elemento de definición de trabajo de Spark:
import requests
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
Cargar el archivo de definición principal y otros archivos lib
Se requiere un token de almacenamiento para cargar el archivo a OneLake. Esta es una función auxiliar para obtener el token de almacenamiento:
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"{client id}", # this filed should be the client id
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
Ahora tenemos un elemento de definición de trabajo de Spark creado y, para que sea ejecutable, es necesario configurar el archivo de definición principal y las propiedades necesarias. El punto de conexión para cargar el archivo para este elemento SJD es https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}
. Se debe usar el mismo "workspaceId" del paso anterior, el valor de "sjdartifactid" podría encontrarse en el cuerpo de la respuesta del paso anterior. Este es el fragmento de código para configurar el archivo de definición principal:
import requests
# three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")
# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file
appendPosition = 0;
appendAction = "append";
### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type" : "text/plain"
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")
# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file
flushAction = "flush";
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
Siga el mismo proceso para cargar los otros archivos lib si es necesario.
Actualizar el elemento de definición de trabajo de Spark con la dirección URL de OneLake del archivo de definición principal y otros archivos lib
Hasta ahora, hemos creado un elemento de definición de trabajo de Spark con algún estado inicial y cargado el archivo de definición principal y otros archivos lib. El último paso es actualizar el elemento de definición de trabajo de Spark para establecer las propiedades URL del archivo de definición principal y otros archivos lib. El punto de conexión para actualizar el elemento de definición de trabajo de Spark es https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}
. Se deben usar los mismos "workspaceId" y "sjdartifactid" de los pasos anteriores. Este es el fragmento de código para actualizar el elemento de definición de trabajo de Spark:
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id
updateRequestBodyJson = {
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
"path": path,
"payload": updatePayload,
"payloadType": payloadType
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
Para resumir todo el proceso, se necesitan tanto la API REST de Fabric como la API de OneLake para crear y actualizar un elemento de definición de trabajo de Spark. La API de REST de Fabric se usa para crear y actualizar el elemento de definición de trabajo de Spark, y la API de OneLake se usa para cargar el archivo de definición principal y otros archivos lib. El archivo de definición principal y otros archivos lib se cargan primero en OneLake. A continuación, las propiedades url del archivo de definición principal y otros archivos lib se establecen en el elemento de definición de trabajo de Spark.