แชร์ผ่าน


วิธีการสร้างและอัปเดต Spark Job Definition ด้วย Microsoft Fabric Rest API

Microsoft Fabric Rest API ให้บริการจุดสิ้นสุดการบริการสําหรับการดําเนินงาน CRUD ของรายการ Fabric ในบทช่วยสอนนี้ เราจะแนะนําผ่านสถานการณ์แบบครอบคลุมของวิธีการสร้างและอัปเดตสิ่งประดิษฐ์ Spark Job Definition มีสามขั้นตอนระดับสูงที่เกี่ยวข้อง:

  1. สร้างรายการ ข้อกําหนดงาน Spark ที่มีสถานะเริ่มต้นบางอย่าง
  2. อัปโหลดแฟ้มข้อกําหนดหลักและแฟ้ม Lib อื่นๆ
  3. ปรับปรุงรายการข้อกําหนดงาน Spark ด้วย URL ของ OneLake ของไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่นๆ

ข้อกำหนดเบื้องต้น

  1. โทเค็น Microsoft Entra จําเป็นสําหรับการเข้าถึง Fabric Rest API แนะนําให้ใช้ไลบรารี MSAL เพื่อรับโทเค็น สําหรับข้อมูลเพิ่มเติม ดูการสนับสนุนโฟลว์การรับรองความถูกต้องใน MSAL
  2. โทเค็นที่เก็บข้อมูลจําเป็นสําหรับการเข้าถึง OneLake API สําหรับข้อมูลเพิ่มเติม ให้ดู MSAL สําหรับ Python

สร้างรายการข้อกําหนดงาน Spark ที่มีสถานะเริ่มต้น

Microsoft Fabric Rest API กําหนดจุดสิ้นสุดแบบรวมสําหรับการดําเนินการ CRUD ของรายการ Fabric ปลายทางคือhttps://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items

มีการระบุรายละเอียดรายการภายในเนื้อความคําขอ นี่คือตัวอย่างของเนื้อความการร้องขอสําหรับการสร้างรายการ ข้อกําหนดงานของ Spark:

{
    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
                "payloadType": "InlineBase64"
            }
        ]
    }
}

ในตัวอย่างนี้ รายการข้อกําหนดงาน Spark มีชื่อว่าSJDHelloWorld เขตข้อมูล payload คือเนื้อหาที่เข้ารหัส base64 ของการตั้งค่ารายละเอียดหลังจากถอดรหัส เนื้อหาคือ:

{
    "executableFile":null,
    "defaultLakehouseArtifactId":"",
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":null,
    "commandLineArguments":"",
    "additionalLibraryUris":[],
    "language":"",
    "environmentArtifactId":null
}

ต่อไปนี้เป็นฟังก์ชันตัวช่วยเหลือสองฟังก์ชันในการเข้ารหัสและถอดรหัสการตั้งค่าโดยละเอียด:

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

นี่คือส่วนย่อยของโค้ดเพื่อสร้างรายการ ข้อกําหนดงานของ Spark:

import requests

bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"
            }
        ]
    }
}

# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)

อัปโหลดไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่นๆ

โทเค็นที่เก็บข้อมูลจําเป็นต้องอัปโหลดไฟล์ไปยัง OneLake นี่คือฟังก์ชันผู้ช่วยเหลือเพื่อรับโทเค็นที่เก็บข้อมูล:


import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "{client id}", # this filed should be the client id 
        authority="https://login.microsoftonline.com/microsoft.com")

    result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

ในตอนนี้เรามีรายการ Spark Job Definition ที่สร้างเพื่อให้สามารถใช้งานได้ เราจําเป็นต้องตั้งค่าไฟล์ข้อกําหนดหลักและคุณสมบัติที่จําเป็น จุดสิ้นสุดสําหรับการอัปโหลดไฟล์สําหรับรายการ SJD นี้คือhttps://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid} ควรใช้ "workspaceId" เดียวกันจากขั้นตอนก่อนหน้า ค่าของ "sjdartifactid" สามารถพบได้ในเนื้อหาคําตอบของขั้นตอนก่อนหน้า นี่คือส่วนย่อยของโค้ดเพื่อตั้งค่าไฟล์ข้อกําหนดหลัก:

import requests

# three steps are required: create file, append file, flush file

onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value


onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")

# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file

appendPosition = 0;
appendAction = "append";

### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type" : "text/plain"
}

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")

# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file

flushAction = "flush";

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
    print(onelakeFlushMainFileResponse.json())

ทําตามขั้นตอนเดียวกันเพื่ออัปโหลดไฟล์ Lib อื่น ๆ ถ้าจําเป็น

อัปเดตรายการข้อกําหนดงาน Spark ด้วย URL ของ OneLake ของไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่นๆ

จนถึงตอนนี้ เราได้สร้างรายการ Spark Job Definition โดยมีสถานะเริ่มต้นบางอย่างอัปโหลดไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่นๆ ขั้นตอนสุดท้ายคือการอัปเดตรายการ Spark Job Definition เพื่อตั้งค่าคุณสมบัติ URL ของไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่น ๆ จุดสิ้นสุดสําหรับการอัปเดตรายการข้อกําหนดงาน Spark คือhttps://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} ควรใช้ "workspaceId" และ "sjdartifactid" เดียวกันจากขั้นตอนก่อนหน้านี้ นี่คือส่วนย่อยของโค้ดเพื่ออัปเดตรายการ ข้อกําหนดงานของ Spark:


mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}"  # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id

updateRequestBodyJson = {
    "executableFile":mainAbfssPath,
    "defaultLakehouseArtifactId":defaultLakehouseId,
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":None,
    "commandLineArguments":"",
    "additionalLibraryUris":[libsAbfssPath],
    "language":"Python",
    "environmentArtifactId":None}

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)

# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
            {
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType
            }
        ]
    }
}


# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")
else:
    print(response.json())
    print(response.status_code)

เพื่อย้อนกลับกระบวนการทั้งหมด จําเป็นต้องใช้ทั้ง Fabric REST API และ OneLake API เพื่อสร้างและอัปเดตรายการ Spark Job Definition Fabric REST API ถูกใช้เพื่อสร้างและอัปเดตรายการ Spark Job Definition การใช้ OneLake API เพื่ออัปโหลดไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่นๆ ไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่นๆ ถูกอัปโหลดไปยัง OneLake ก่อน จากนั้นคุณสมบัติ URL ของไฟล์ข้อกําหนดหลักและไฟล์ Lib อื่น ๆ จะถูกตั้งค่าในรายการ ข้อกําหนดงานของ Spark