แชร์ผ่าน


Microsoft Spark Utilities (MSSparkUtils) สําหรับ Fabric

Microsoft Spark Utilities (MSSparkUtils) เป็นแพคเกจที่มีอยู่แล้วภายใน เพื่อช่วยให้คุณทํางานทั่วไปได้อย่างง่ายดาย คุณสามารถใช้ MSSparkUtils เพื่อทํางานกับระบบไฟล์ รับตัวแปรสภาพแวดล้อม เพื่อเกี่ยวโยงสมุดบันทึกเข้าด้วยกัน และทํางานกับข้อมูลลับ แพคเกจ MSSparkUtils มีให้ใช้งานใน PySpark (Python) Scala, SparkR notebooks และ Fabric pipelines

หมายเหตุ

  • MsSparkUtils ถูกเปลี่ยนชื่อ เป็น NotebookUtils อย่างเป็นทางการแล้ว โค้ดที่มีอยู่จะยังคง เข้ากัน ได้ย้อนกลับและจะไม่ทําให้เกิดการเปลี่ยนแปลงที่เสียหาย ขอแนะนําให้อัปเกรดเป็นสมุดบันทึกเพื่อให้แน่ใจว่าการสนับสนุนอย่างต่อเนื่องและการเข้าถึงคุณลักษณะใหม่ เนมสเปซ mssparkutils จะถูกยกเลิกในอนาคต
  • NotebookUtils ออกแบบมาเพื่อทํางานกับ Spark 3.4 (Runtime v1.2) และสูงกว่า ฟีเจอร์และการอัปเดตใหม่ทั้งหมดจะได้รับการสนับสนุนเฉพาะกับ namespace ของ notebookutils ในอนาคต

โปรแกรมอรรถประโยชน์ระบบไฟล์

mssparkutils.fs ให้บริการสาธารณูปโภคสําหรับการทํางานกับระบบไฟล์ต่าง ๆ รวมถึง Azure Data Lake Storage (ADLS) Gen2 และ Azure Blob Storage ตรวจสอบให้แน่ใจว่าคุณได้กําหนดค่าการเข้าถึงไปยัง Azure Data Lake Storage รุ่น2 และ Azure Blob Storage อย่างเหมาะสม

เรียกใช้คําสั่งต่อไปนี้สําหรับภาพรวมของวิธีการที่พร้อมใช้งาน:

from notebookutils import mssparkutils
mssparkutils.fs.help()

เอาท์พุท

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils ทํางานกับระบบไฟล์ในลักษณะเดียวกับ Spark API ใช้ mssparkuitls.fs.mkdirs() และการใช้งาน Fabric lakehouse ตัวอย่างเช่น:

การใช้งาน เส้นทางสัมพัทธ์จากราก HDFS เส้นทางสัมบูรณ์สําหรับระบบไฟล์ ABFS เส้นทางสัมบูรณ์สําหรับระบบไฟล์ภายในเครื่องในโหนดโปรแกรมควบคุม
บ้านทะเลสาบโนนเดเฟลท์ ไม่ได้รับการสนับสนุน mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
เลคเฮ้าส์ค่าเริ่มต้น ไดเรกทอรีภายใต้ "Files" หรือ "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

แสดงรายการไฟล์

หากต้องการแสดงรายการเนื้อหาของไดเรกทอรี ให้ใช้ mssparkutils.fs.ls('เส้นทางไดเรกทอรีของคุณ') ตัวอย่างเช่น:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

ดูคุณสมบัติไฟล์

วิธีการนี้จะส่งกลับคุณสมบัติไฟล์ รวมถึงชื่อไฟล์ เส้นทางไฟล์ ขนาดไฟล์ และอาจเป็นไดเรกทอรีและไฟล์หรือไม่

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

สร้างไดเรกทอรีใหม่

วิธีนี้จะสร้างไดเรกทอรีที่ระบุถ้าไม่มีอยู่ และสร้างไดเรกทอรีหลักใด ๆ ที่จําเป็น

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

คัดลอกไฟล์

วิธีนี้จะคัดลอกไฟล์หรือไดเรกทอรี และรองรับการคัดลอกกิจกรรมข้ามระบบไฟล์

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

ไฟล์สําเนาที่มีประสิทธิภาพ

วิธีนี้เป็นวิธีที่รวดเร็วกว่าในการคัดลอกหรือย้ายไฟล์ โดยเฉพาะอย่างยิ่งปริมาณข้อมูลขนาดใหญ่

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

แสดงตัวอย่างเนื้อหาไฟล์

วิธีนี้ส่งกลับถึงไบต์ 'maxBytes' แรกของไฟล์ที่ระบุเป็นสตริงที่เข้ารหัสลับใน UTF-8

mssparkutils.fs.head('file path', maxBytes to read)

ย้ายไฟล์

วิธีนี้จะย้ายไฟล์หรือไดเรกทอรี และสนับสนุนการย้ายข้ามระบบไฟล์

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

เขียนไฟล์

วิธีนี้เขียนสตริงที่ระบุออกไปยังไฟล์เข้ารหัสใน UTF-8

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

ผนวกเนื้อหาไปยังไฟล์

เมธอดนี้ผนวกสตริงที่กําหนดไปยังไฟล์ที่เข้ารหัสไว้ใน UTF-8

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

หมายเหตุ

เมื่อใช้ mssparkutils.fs.append API ในการ for วนรอบเพื่อเขียนไปยังไฟล์เดียวกัน เราขอแนะนําให้เพิ่ม sleep คําสั่งประมาณ 0.5s ~ 1s ระหว่างการเขียนที่เกิดซ้ํา ทั้งนี้เนื่องจาก mssparkutils.fs.append การดําเนินการภายใน flush ของ API เป็นแบบอะซิงโครนัส ดังนั้นการหน่วงเวลาสั้น ๆ จะช่วยรับประกันความสมบูรณ์ของข้อมูล

ลบไฟล์หรือไดเรกทอรี

วิธีนี้จะลบไฟล์หรือไดเรกทอรีออก

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

ไดเรกทอรีการต่อเชื่อม/ยกเลิกการต่อเชื่อม

ค้นหาข้อมูลเพิ่มเติมเกี่ยวกับการใช้งานโดยละเอียดในแฟ้มที่ต่อเชื่อมและยกเลิกการเปิด

อรรถประโยชน์สมุดบันทึก

ใช้โปรแกรมอรรถประโยชน์ของสมุดบันทึก MSSparkUtils เพื่อเรียกใช้สมุดบันทึกหรือออกจากสมุดบันทึกที่มีค่า เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:

mssparkutils.notebook.help()

ผลิตภัณฑ์:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

หมายเหตุ

อรรถประโยชน์ของสมุดบันทึกไม่สามารถใช้ได้กับข้อกําหนดงาน Apache Spark (SJD)

การอ้างอิงสมุดบันทึก

วิธีการนี้จะอ้างอิงสมุดบันทึกและส่งกลับค่าออกจากสมุดบันทึก คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้ สมุดบันทึกที่กําลังอ้างอิงถูกอ้างอิงจะทํางานบนพูล Spark ของสมุดบันทึกที่เรียกใช้ฟังก์ชันนี้

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

ตัวอย่างเช่น:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

โน้ตบุ๊คผ้ายังสนับสนุนการอ้างอิงสมุดบันทึกทั่วทั้งพื้นที่ทํางานโดยการระบุ ID พื้นที่ทํางาน

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

คุณสามารถเปิดลิงก์สแนปช็อตของการเรียกใช้การอ้างอิงในผลลัพธ์ของเซลล์ สแนปช็อตจะจับผลลัพธ์การเรียกใช้โค้ดและช่วยให้คุณสามารถดีบักการอ้างอิงได้อย่างง่ายดาย

สกรีนช็อตที่แสดงผลลัพธ์การเรียกใช้การอ้างอิง

สกรีนช็อตของสแนปช็อตพร้อมผลลัพธ์การเรียกใช้โค้ด

หมายเหตุ

  • สมุดบันทึกอ้างอิงข้ามพื้นที่ทํางานได้รับการสนับสนุนโดยรันไทม์เวอร์ชัน 1.2 และสูงกว่า
  • ถ้าคุณใช้ไฟล์ภายใต้ ทรัพยากรสมุดบันทึก ให้ใช้ mssparkutils.nbResPath ในสมุดบันทึกอ้างอิงเพื่อตรวจสอบให้แน่ใจว่าจะชี้ไปยังโฟลเดอร์เดียวกันกับการเรียกใช้แบบโต้ตอบ

การอ้างอิงจะเรียกใช้สมุดบันทึกหลายเล่มพร้อมกัน

สำคัญ

คุณลักษณะนี้อยู่ในตัวอย่าง

วิธีนี้ mssparkutils.notebook.runMultiple() ช่วยให้คุณสามารถเรียกใช้สมุดบันทึกหลายเล่มพร้อมกันหรือด้วยโครงสร้างโทโพโลยีที่กําหนดไว้ล่วงหน้า API กําลังใช้กลไกการใช้งานแบบหลายเธรดภายในเซสชัน Spark ซึ่งหมายความว่าทรัพยากรการคํานวณจะถูกแชร์โดยเรียกใช้สมุดบันทึกอ้างอิง

ด้วย mssparkutils.notebook.runMultiple()คุณสามารถ:

  • ใช้สมุดบันทึกหลายเล่มพร้อมกันโดยไม่ต้องรอให้แต่ละสมุดบันทึกเสร็จสิ้น

  • ระบุการขึ้นต่อกันและลําดับการดําเนินการสําหรับสมุดบันทึกของคุณ โดยใช้รูปแบบ JSON แบบง่าย

  • ปรับการใช้ทรัพยากรการคํานวณ Spark ให้เหมาะสมและลดค่าใช้จ่ายของโครงการ Fabric ของคุณ

  • ดูสแนปช็อตของแต่ละบันทึกการเรียกใช้สมุดบันทึกในผลลัพธ์ และแก้จุดบกพร่อง/ตรวจสอบงานสมุดบันทึกของคุณอย่างสะดวก

  • รับค่าออกจากแต่ละกิจกรรมของผู้บริหาร และใช้ในงานปลายทาง

คุณยังสามารถลองเรียกใช้ mssparkutils.notebook.help("runMultiple") เพื่อค้นหาตัวอย่างและการใช้งานโดยละเอียด

นี่คือตัวอย่างง่าย ๆ ของการเรียกใช้รายการของสมุดบันทึกในแบบขนานโดยใช้วิธีนี้:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:

ภาพหน้าจอของการอ้างอิงรายการสมุดบันทึก

ต่อไปนี้คือตัวอย่างของการเรียกใช้สมุดบันทึกที่มีโครงสร้าง mssparkutils.notebook.runMultiple()ด้านบนโดยใช้ ใช้วิธีนี้เพื่อปรับโน้ตบุ๊กผ่านประสบการณ์การใช้รหัสได้อย่างง่ายดาย

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:

สกรีนช็อตของรายการอ้างอิงของสมุดบันทึกที่มีพารามิเตอร์

หมายเหตุ

  • ระดับการทํางานแบบขนานของการเรียกใช้สมุดบันทึกหลายรายการถูกจํากัดไว้ที่ทรัพยากรการคํานวณที่พร้อมใช้งานทั้งหมดของเซสชัน Spark
  • ขีดจํากัดสูงสุดสําหรับกิจกรรมสมุดบันทึกหรือสมุดบันทึกพร้อมกันคือ 50 การใช้งานเกินขีดจํากัดนี้อาจนําไปสู่ความเสถียรและปัญหาด้านประสิทธิภาพเนื่องจากการใช้ทรัพยากรการคํานวณสูง ถ้ามีปัญหาเกิดขึ้น ให้ลองแยกสมุดบันทึกออกเป็นการโทรหลาย runMultiple รายการ หรือลดภาวะพร้อมกันโดยการปรับ เขตข้อมูลภาวะ พร้อมกันในพารามิเตอร์ DAG
  • การหมดเวลาเริ่มต้นสําหรับ DAG ทั้งหมดคือ 12 ชั่วโมง และการหมดเวลาเริ่มต้นสําหรับแต่ละเซลล์ในสมุดบันทึกย่อยคือ 90 วินาที คุณสามารถเปลี่ยนการหมดเวลาได้โดยการตั้งค่า เขตข้อมูล timeoutInSeconds และ timeoutPerCellInSeconds ในพารามิเตอร์ DAG ได้

ออกจากสมุดบันทึก

วิธีนี้จะออกจากสมุดบันทึกที่มีค่า คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้

  • เมื่อคุณเรียกใช้ ฟังก์ชัน exit() จากสมุดบันทึกแบบโต้ตอบ สมุดบันทึก Fabric จะแสดงข้อยกเว้น ข้ามการเรียกใช้เซลล์ที่ตามมา และทําให้เซสชัน Spark มีชีวิต

  • เมื่อคุณแก้ไขสมุดบันทึกในไปป์ไลน์ที่เรียกใช้ ฟังก์ชัน exit() กิจกรรมสมุดบันทึกจะส่งกลับด้วยค่าการออก ทําให้การทํางานของไปป์ไลน์เสร็จสมบูรณ์ และหยุดเซสชัน Spark

  • เมื่อคุณเรียกใช้ฟังก์ชัน exit() ในสมุดบันทึกที่กําลังอ้างอิง Fabric Spark จะหยุดการดําเนินการเพิ่มเติมของสมุดบันทึกที่อ้างอิง และดําเนินการต่อเพื่อเรียกใช้เซลล์ถัดไปในสมุดบันทึกหลักที่เรียกใช้ฟังก์ชัน run() ตัวอย่างเช่น: Notebook1 มีเซลล์สามเซลล์และเรียก ฟังก์ชัน exit() ในเซลล์ที่สอง Notebook2 มีเซลล์ห้าเซลล์และเรียกฟังก์ชัน run(notebook1) ในเซลล์ที่สาม เมื่อคุณเรียกใช้ Notebook2 Notebook2 Notebook1 จะหยุดที่เซลล์ที่สองเมื่อกดฟังก์ชัน exit() Notebook2 ยังคงเรียกใช้เซลล์ที่สี่และเซลล์ที่ห้า

mssparkutils.notebook.exit("value string")

ตัวอย่างเช่น:

ตัวอย่าง 1 สมุดบันทึกที่มีเซลล์สองเซลล์ต่อไปนี้:

  • Cell 1 กําหนดพารามิเตอร์การ ป้อนข้อมูล ด้วยค่าเริ่มต้นที่ตั้งค่าเป็น 10

  • เซลล์ 2 ออกจากสมุดบันทึกที่มี ข้อมูลป้อนเข้า เป็นค่าออก

สกรีนช็อตที่แสดงสมุดบันทึกตัวอย่างของฟังก์ชันจบการทํางาน

คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่นด้วยค่าเริ่มต้น:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

ผลิตภัณฑ์:

Notebook executed successfully with exit value 10

คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่น และตั้งค่าการป้อนข้อมูลเป็น 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

ผลิตภัณฑ์:

Notebook executed successfully with exit value 20

อรรถประโยชน์ข้อมูลประจําตัว

คุณสามารถใช้โปรแกรมอรรถประโยชน์ข้อมูลประจําตัว MSSparkUtils เพื่อรับโทเค็นการเข้าถึง และจัดการความลับใน Azure Key Vault ได้

เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:

mssparkutils.credentials.help()

ผลิตภัณฑ์:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

รับโทเค็น

getToken จะส่งกลับโทเค็น Microsoft Entra สําหรับผู้ชมและชื่อที่กําหนด (ไม่บังคับ) รายการต่อไปนี้แสดงคีย์ผู้ชมที่พร้อมใช้งานในปัจจุบัน:

  • ทรัพยากรผู้ชมที่เก็บข้อมูล: "ที่เก็บข้อมูล"
  • ทรัพยากร Power BI: "pbi"
  • ทรัพยากรชุดเก็บคีย์ของ Azure: "keyvault"
  • ทรัพยากร Synapse RTA KQL DB: "kusto"

เรียกใช้คําสั่งต่อไปนี้เพื่อรับโทเค็น:

mssparkutils.credentials.getToken('audience Key')

รับข้อมูลลับโดยใช้ข้อมูลประจําตัวผู้ใช้

getSecret ส่งกลับข้อมูลลับชุดเก็บคีย์ของ Azure สําหรับจุดสิ้นสุด Azure Key Vault และชื่อลับที่กําหนดโดยใช้ข้อมูลประจําตัวของผู้ใช้

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

การต่อเชื่อมไฟล์และยกเลิกการเปิดไฟล์

Fabric รองรับสถานการณ์การเมาท์ต่อไปนี้ในแพคเกจ Microsoft Spark Utilities คุณสามารถใช้เมาท์ unmount getMountPath() และ mounts() API เพื่อแนบพื้นที่เก็บข้อมูลระยะไกล (ADLS Gen2) กับโหนดการทํางานทั้งหมด (โหนดไดรเวอร์และโหนดผู้ปฏิบัติงาน) หลังจากจุดต่อเชื่อมที่เก็บข้อมูลอยู่ในตําแหน่งให้ใช้ API ไฟล์ภายในเครื่องเพื่อเข้าถึงข้อมูลราวกับว่าจัดเก็บไว้ในระบบไฟล์ภายใน

วิธีการต่อเชื่อมบัญชี ADLS Gen2

ตัวอย่างต่อไปนี้แสดงให้เห็นวิธีการกําหนดใช้ Azure Data Lake Storage รุ่น2 การติดตั้งที่เก็บข้อมูล Blob ในทํานองเดียวกัน

ตัวอย่างนี้ถือว่า คุณมีบัญชี Data Lake Storage รุ่น2 หนึ่งบัญชีที่ชื่อว่า storegen2 และบัญชีมีคอนเทนเนอร์หนึ่งชื่อว่า mycontainer ที่คุณต้องการติดตั้ง /test ลงในเซสชัน Spark ของสมุดบันทึกของคุณ

สกรีนช็อตแสดงตําแหน่งที่จะเลือกคอนเทนเนอร์ที่จะต่อเชื่อม

ในการติดภาชนะที่เรียกว่า mycontainer, mssparkutils ก่อนอื่นจําเป็นต้องตรวจสอบว่าคุณมีสิทธิ์ในการเข้าถึงคอนเทนเนอร์หรือไม่ ในขณะนี้ Fabric สนับสนุนวิธีการรับรองความถูกต้องสองวิธีสําหรับการดําเนินการเมานท์ทริกเกอร์: accountKey และ sastoken

เมานท์ผ่านโทเค็นลายเซ็นสําหรับการเข้าถึงที่ใช้ร่วมกันหรือคีย์บัญชี

MSSparkUtils สนับสนุนอย่างชัดเจนผ่านคีย์บัญชีหรือ โทเค็นลายเซ็นการเข้าถึงที่ใช้ร่วมกัน (SAS) เป็นพารามิเตอร์เพื่อยึดเป้าหมาย

เพื่อเหตุผลด้านความปลอดภัย เราขอแนะนําให้คุณจัดเก็บคีย์บัญชีหรือโทเค็น SAS ใน Azure Key Vault (ตามสกรีนช็อตต่อไปนี้แสดง) จากนั้นคุณสามารถเรียกใช้ได้โดยใช้ mssparkutils.credentials.getSecret API สําหรับข้อมูลเพิ่มเติมเกี่ยวกับ Azure Key Vault โปรดดู เกี่ยวกับคีย์บัญชีที่จัดการโดย Azure Key Vault

สกรีนช็อตแสดงตําแหน่งที่จัดเก็บข้อมูลลับในชุดเก็บคีย์ของ Azure

รหัสตัวอย่างสําหรับ เมธอด accountKey :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

โค้ดตัวอย่างสําหรับ sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

หมายเหตุ

คุณอาจจําเป็นต้องนําเข้า mssparkutils หากไม่พร้อมใช้งาน:

from notebookutils import mssparkutils

ติดพารามิเตอร์:

  • fileCacheTimeout: Blobs จะถูกแคชในโฟลเดอร์ temp ภายในเครื่องเป็นเวลา 120 วินาทีตามค่าเริ่มต้น ในช่วงเวลานี้ blobfuse จะไม่ตรวจสอบว่าไฟล์นั้นเป็นไฟล์ล่าสุดหรือไม่ สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อไคลเอ็นต์หลายไคลเอ็นต์ปรับเปลี่ยนแฟ้มในเวลาเดียวกัน เพื่อหลีกเลี่ยงความไม่สอดคล้องกันระหว่างแฟ้มภายในเครื่องและระยะไกล เราขอแนะนําให้ย่อเวลาแคช หรือแม้กระทั่งเปลี่ยนเป็น 0 และรับไฟล์ล่าสุดจากเซิร์ฟเวอร์เสมอ
  • หมดเวลา: การหมดเวลาของการดําเนินการเมาท์คือ 120 วินาทีตามค่าเริ่มต้น สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อมีผู้ปฏิบัติการมากเกินไปหรือเมื่อหมดเวลาในการติดเราขอแนะนําให้เพิ่มค่า

คุณสามารถใช้พารามิเตอร์เหล่านี้ได้ดังนี้:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

หมายเหตุ

เพื่อเหตุผลด้านความปลอดภัย เราไม่แนะนําให้คุณจัดเก็บข้อมูลประจําตัวไว้ในรหัส เพื่อปกป้องข้อมูลประจําตัวของคุณเพิ่มเติม เราจะทําสําเนาข้อมูลลับของคุณในผลลัพธ์สมุดบันทึก สําหรับข้อมูลเพิ่มเติม ให้ดู การปฏิกริยาลับ

วิธีติดตั้งเลคเฮ้าส์

ตัวอย่างรหัสสําหรับการติดตั้งเลคเฮ้าส์เพื่อ /ทดสอบ:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

หมายเหตุ

ไม่รองรับการติดตั้งจุดสิ้นสุดภูมิภาค ผ้าสนับสนุนเฉพาะการติดตั้งจุดสิ้นสุดสากล, onelake.dfs.fabric.microsoft.comของ

เข้าถึงไฟล์ภายใต้จุดต่อเชื่อมโดยใช้ mssparktuils fs API

วัตถุประสงค์หลักของการดําเนินการติดตั้งคือการอนุญาตให้ลูกค้าเข้าถึงข้อมูลที่จัดเก็บไว้ในบัญชีเก็บข้อมูลระยะไกลด้วย API ระบบไฟล์ภายในเครื่อง คุณยังสามารถเข้าถึงข้อมูลโดยใช้ mssparkutils fs API ด้วยเส้นทางที่ติดอยู่เป็นพารามิเตอร์ได้ รูปแบบเส้นทางนี้จะแตกต่างกันเล็กน้อย

สมมติว่าคุณได้ติดตั้งคอนเทนเนอร์ Data Lake Storage รุ่น2 mycontainer to /test โดยใช้ api การต่อเชื่อม เมื่อคุณเข้าถึงข้อมูลด้วย API ระบบไฟล์ภายใน รูปแบบเส้นทางจะมีลักษณะดังนี้:

/synfs/notebook/{sessionId}/test/{filename}

เมื่อคุณต้องการเข้าถึงข้อมูล โดยใช้ mssparkutils fs API เราขอแนะนําให้ใช้ getMountPath() เพื่อให้ได้เส้นทางที่ถูกต้อง:

path = mssparkutils.fs.getMountPath("/test")
  • ไดเรกทอรีรายการ:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • อ่านเนื้อหาไฟล์:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • สร้างไดเรกทอรี:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

เข้าถึงไฟล์ภายใต้จุดยึดผ่านเส้นทางท้องถิ่น

คุณสามารถอ่านและเขียนไฟล์ได้อย่างง่ายดายในจุดต่อเชื่อมโดยใช้ระบบไฟล์มาตรฐาน นี่คือตัวอย่าง Python:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

วิธีการตรวจสอบจุดยึดที่มีอยู่

คุณสามารถใช้ mssparkutils.fs.mounts() API เพื่อตรวจสอบข้อมูลจุดต่อเชื่อมที่มีอยู่ทั้งหมด:

mssparkutils.fs.mounts()

วิธีการยกเลิกการต่อเชื่อมจุดยึด

ใช้รหัสต่อไปนี้เพื่อยกเลิกการต่อเชื่อมจุด ยึดของคุณ (/ทดสอบ ในตัวอย่างนี้):

mssparkutils.fs.unmount("/test")

ข้อจำกัดที่ทราบ

  • การติดปัจจุบันคือการกําหนดค่าระดับงาน เราขอแนะนําให้คุณใช้เ มาท์ API เพื่อตรวจสอบว่ามีจุดต่อเชื่อมอยู่หรือไม่พร้อมใช้งาน

  • กลไกยกเลิกการเมาท์ไม่ได้เป็นไปโดยอัตโนมัติ เมื่อแอปพลิเคชันทํางานเสร็จสิ้นหากต้องการยกเลิกเมาท์จุดและปล่อยพื้นที่ดิสก์ คุณต้องเรียกใช้ API ที่ไม่ได้ลงทะเบียนในโค้ดของคุณอย่างชัดเจน มิฉะนั้นจุดยึดจะยังคงอยู่ในโหนดหลังจากการทํางานของแอปพลิเคชันเสร็จสิ้น

  • ไม่รองรับการต่อเชื่อมบัญชีที่เก็บข้อมูล ADLS Gen1

สาธารณูปโภคของเลคเฮ้าส์

mssparkutils.lakehouse ให้บริการสาธารณูปโภคที่ปรับให้เหมาะสมสําหรับการจัดการวัตถุของ Lakehouse โดยเฉพาะ โปรแกรมอรรถประโยชน์เหล่านี้ช่วยให้ผู้ใช้สร้าง เรียกใช้ อัปเดต และลบวัตถุ Lakehouse ได้อย่างง่ายดาย

หมายเหตุ

Lakehouse API ได้รับการสนับสนุนเฉพาะในรันไทม์เวอร์ชัน 1.2+ เท่านั้น

ภาพรวมของวิธีการ

ด้านล่างคือภาพรวมของวิธีการที่พร้อมใช้งานซึ่งจัดหาโดย mssparkutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

ตัวอย่างการใช้

หากต้องการใช้วิธีการเหล่านี้อย่างมีประสิทธิภาพ ให้พิจารณาตัวอย่างการใช้งานต่อไปนี้:

การสร้างสิ่งประดิษฐ์ของเลคเฮ้าส์

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

เรียกข้อมูลวัตถุของเลคเฮ้าส์

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

ปรับปรุงสิ่งประดิษฐ์ของเลคเฮ้าส์

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

การลบสิ่งประดิษฐ์ของ Lakehouse

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

รายการวัตถุของเลคเฮ้าส์

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

ข้อมูลเพิ่มเติม

สําหรับข้อมูลรายละเอียดเพิ่มเติมเกี่ยวกับแต่ละวิธีและพารามิเตอร์ ให้ใช้ mssparkutils.lakehouse.help("methodName") ฟังก์ชัน

ด้วยโปรแกรมอรรถประโยชน์ของเลคเฮ้าส์ของ MSSparkUtils การจัดการวัตถุ Lakehouse จะมีประสิทธิภาพมากขึ้นและรวมเข้ากับไปป์ไลน์ Fabric ของคุณ เพิ่มประสบการณ์การจัดการข้อมูลโดยรวมของคุณ

อย่าลังเลที่จะสํารวจโปรแกรมอรรถประโยชน์เหล่านี้และรวมเข้ากับเวิร์กโฟลว์ Fabric ของคุณเพื่อการจัดการอาร์ติแฟกต์ของ Lakehouse แบบไร้รอยต่อ

โปรแกรมอรรถประโยชน์รันไทม์

แสดงข้อมูลบริบทเซสชัน

ด้วย mssparkutils.runtime.context คุณสามารถรับข้อมูลบริบทของเซสชันสดปัจจุบัน รวมถึงชื่อสมุดบันทึก ค่าเริ่มต้นของ lakehouse ข้อมูลพื้นที่ทํางาน ถ้าเป็นไปป์ไลน์ที่เรียกใช้ ฯลฯ

mssparkutils.runtime.context

หมายเหตุ

mssparkutils.env ไม่ได้รับการสนับสนุนอย่างเป็นทางการบน Fabric โปรดใช้ notebookutils.runtime.context เป็นทางเลือก

ปัญหาที่ทราบแล้ว

เมื่อใช้รันไทม์เวอร์ชันที่สูงกว่า 1.2 และเรียกใช้ mssparkutils.help(), fabricClient, warehouse และพื้นที่ทํางาน APIs ไม่ได้รับการสนับสนุนสําหรับตอนนี้จะพร้อมใช้งานเพิ่มเติม