Microsoft Spark Utilities (MSSparkUtils) สําหรับ Fabric
Microsoft Spark Utilities (MSSparkUtils) เป็นแพคเกจที่มีอยู่แล้วภายใน เพื่อช่วยให้คุณทํางานทั่วไปได้อย่างง่ายดาย คุณสามารถใช้ MSSparkUtils เพื่อทํางานกับระบบไฟล์ รับตัวแปรสภาพแวดล้อม เพื่อเกี่ยวโยงสมุดบันทึกเข้าด้วยกัน และทํางานกับข้อมูลลับ แพคเกจ MSSparkUtils มีให้ใช้งานใน PySpark (Python) Scala, SparkR notebooks และ Fabric pipelines
หมายเหตุ
- MsSparkUtils ถูกเปลี่ยนชื่อ เป็น NotebookUtils อย่างเป็นทางการแล้ว โค้ดที่มีอยู่จะยังคง เข้ากัน ได้ย้อนกลับและจะไม่ทําให้เกิดการเปลี่ยนแปลงที่เสียหาย ขอแนะนําให้อัปเกรดเป็นสมุดบันทึกเพื่อให้แน่ใจว่าการสนับสนุนอย่างต่อเนื่องและการเข้าถึงคุณลักษณะใหม่ เนมสเปซ mssparkutils จะถูกยกเลิกในอนาคต
- NotebookUtils ออกแบบมาเพื่อทํางานกับ Spark 3.4 (Runtime v1.2) และสูงกว่า ฟีเจอร์และการอัปเดตใหม่ทั้งหมดจะได้รับการสนับสนุนเฉพาะกับ namespace ของ notebookutils ในอนาคต
โปรแกรมอรรถประโยชน์ระบบไฟล์
mssparkutils.fs ให้บริการสาธารณูปโภคสําหรับการทํางานกับระบบไฟล์ต่าง ๆ รวมถึง Azure Data Lake Storage (ADLS) Gen2 และ Azure Blob Storage ตรวจสอบให้แน่ใจว่าคุณได้กําหนดค่าการเข้าถึงไปยัง Azure Data Lake Storage รุ่น2 และ Azure Blob Storage อย่างเหมาะสม
เรียกใช้คําสั่งต่อไปนี้สําหรับภาพรวมของวิธีการที่พร้อมใช้งาน:
from notebookutils import mssparkutils
mssparkutils.fs.help()
เอาท์พุท
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
MSSparkUtils ทํางานกับระบบไฟล์ในลักษณะเดียวกับ Spark API ใช้ mssparkuitls.fs.mkdirs() และการใช้งาน Fabric lakehouse ตัวอย่างเช่น:
การใช้งาน | เส้นทางสัมพัทธ์จากราก HDFS | เส้นทางสัมบูรณ์สําหรับระบบไฟล์ ABFS | เส้นทางสัมบูรณ์สําหรับระบบไฟล์ภายในเครื่องในโหนดโปรแกรมควบคุม |
---|---|---|---|
บ้านทะเลสาบโนนเดเฟลท์ | ไม่ได้รับการสนับสนุน | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
เลคเฮ้าส์ค่าเริ่มต้น | ไดเรกทอรีภายใต้ "Files" หรือ "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
แสดงรายการไฟล์
หากต้องการแสดงรายการเนื้อหาของไดเรกทอรี ให้ใช้ mssparkutils.fs.ls('เส้นทางไดเรกทอรีของคุณ') ตัวอย่างเช่น:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
ดูคุณสมบัติไฟล์
วิธีการนี้จะส่งกลับคุณสมบัติไฟล์ รวมถึงชื่อไฟล์ เส้นทางไฟล์ ขนาดไฟล์ และอาจเป็นไดเรกทอรีและไฟล์หรือไม่
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
สร้างไดเรกทอรีใหม่
วิธีนี้จะสร้างไดเรกทอรีที่ระบุถ้าไม่มีอยู่ และสร้างไดเรกทอรีหลักใด ๆ ที่จําเป็น
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
คัดลอกไฟล์
วิธีนี้จะคัดลอกไฟล์หรือไดเรกทอรี และรองรับการคัดลอกกิจกรรมข้ามระบบไฟล์
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
ไฟล์สําเนาที่มีประสิทธิภาพ
วิธีนี้เป็นวิธีที่รวดเร็วกว่าในการคัดลอกหรือย้ายไฟล์ โดยเฉพาะอย่างยิ่งปริมาณข้อมูลขนาดใหญ่
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
แสดงตัวอย่างเนื้อหาไฟล์
วิธีนี้ส่งกลับถึงไบต์ 'maxBytes' แรกของไฟล์ที่ระบุเป็นสตริงที่เข้ารหัสลับใน UTF-8
mssparkutils.fs.head('file path', maxBytes to read)
ย้ายไฟล์
วิธีนี้จะย้ายไฟล์หรือไดเรกทอรี และสนับสนุนการย้ายข้ามระบบไฟล์
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
เขียนไฟล์
วิธีนี้เขียนสตริงที่ระบุออกไปยังไฟล์เข้ารหัสใน UTF-8
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
ผนวกเนื้อหาไปยังไฟล์
เมธอดนี้ผนวกสตริงที่กําหนดไปยังไฟล์ที่เข้ารหัสไว้ใน UTF-8
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
หมายเหตุ
เมื่อใช้ mssparkutils.fs.append
API ในการ for
วนรอบเพื่อเขียนไปยังไฟล์เดียวกัน เราขอแนะนําให้เพิ่ม sleep
คําสั่งประมาณ 0.5s ~ 1s ระหว่างการเขียนที่เกิดซ้ํา ทั้งนี้เนื่องจาก mssparkutils.fs.append
การดําเนินการภายใน flush
ของ API เป็นแบบอะซิงโครนัส ดังนั้นการหน่วงเวลาสั้น ๆ จะช่วยรับประกันความสมบูรณ์ของข้อมูล
ลบไฟล์หรือไดเรกทอรี
วิธีนี้จะลบไฟล์หรือไดเรกทอรีออก
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
ไดเรกทอรีการต่อเชื่อม/ยกเลิกการต่อเชื่อม
ค้นหาข้อมูลเพิ่มเติมเกี่ยวกับการใช้งานโดยละเอียดในแฟ้มที่ต่อเชื่อมและยกเลิกการเปิด
อรรถประโยชน์สมุดบันทึก
ใช้โปรแกรมอรรถประโยชน์ของสมุดบันทึก MSSparkUtils เพื่อเรียกใช้สมุดบันทึกหรือออกจากสมุดบันทึกที่มีค่า เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:
mssparkutils.notebook.help()
ผลิตภัณฑ์:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
หมายเหตุ
อรรถประโยชน์ของสมุดบันทึกไม่สามารถใช้ได้กับข้อกําหนดงาน Apache Spark (SJD)
การอ้างอิงสมุดบันทึก
วิธีการนี้จะอ้างอิงสมุดบันทึกและส่งกลับค่าออกจากสมุดบันทึก คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้ สมุดบันทึกที่กําลังอ้างอิงถูกอ้างอิงจะทํางานบนพูล Spark ของสมุดบันทึกที่เรียกใช้ฟังก์ชันนี้
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
ตัวอย่างเช่น:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
โน้ตบุ๊คผ้ายังสนับสนุนการอ้างอิงสมุดบันทึกทั่วทั้งพื้นที่ทํางานโดยการระบุ ID พื้นที่ทํางาน
mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
คุณสามารถเปิดลิงก์สแนปช็อตของการเรียกใช้การอ้างอิงในผลลัพธ์ของเซลล์ สแนปช็อตจะจับผลลัพธ์การเรียกใช้โค้ดและช่วยให้คุณสามารถดีบักการอ้างอิงได้อย่างง่ายดาย
หมายเหตุ
- สมุดบันทึกอ้างอิงข้ามพื้นที่ทํางานได้รับการสนับสนุนโดยรันไทม์เวอร์ชัน 1.2 และสูงกว่า
- ถ้าคุณใช้ไฟล์ภายใต้ ทรัพยากรสมุดบันทึก ให้ใช้
mssparkutils.nbResPath
ในสมุดบันทึกอ้างอิงเพื่อตรวจสอบให้แน่ใจว่าจะชี้ไปยังโฟลเดอร์เดียวกันกับการเรียกใช้แบบโต้ตอบ
การอ้างอิงจะเรียกใช้สมุดบันทึกหลายเล่มพร้อมกัน
สำคัญ
คุณลักษณะนี้อยู่ในตัวอย่าง
วิธีนี้ mssparkutils.notebook.runMultiple()
ช่วยให้คุณสามารถเรียกใช้สมุดบันทึกหลายเล่มพร้อมกันหรือด้วยโครงสร้างโทโพโลยีที่กําหนดไว้ล่วงหน้า API กําลังใช้กลไกการใช้งานแบบหลายเธรดภายในเซสชัน Spark ซึ่งหมายความว่าทรัพยากรการคํานวณจะถูกแชร์โดยเรียกใช้สมุดบันทึกอ้างอิง
ด้วย mssparkutils.notebook.runMultiple()
คุณสามารถ:
ใช้สมุดบันทึกหลายเล่มพร้อมกันโดยไม่ต้องรอให้แต่ละสมุดบันทึกเสร็จสิ้น
ระบุการขึ้นต่อกันและลําดับการดําเนินการสําหรับสมุดบันทึกของคุณ โดยใช้รูปแบบ JSON แบบง่าย
ปรับการใช้ทรัพยากรการคํานวณ Spark ให้เหมาะสมและลดค่าใช้จ่ายของโครงการ Fabric ของคุณ
ดูสแนปช็อตของแต่ละบันทึกการเรียกใช้สมุดบันทึกในผลลัพธ์ และแก้จุดบกพร่อง/ตรวจสอบงานสมุดบันทึกของคุณอย่างสะดวก
รับค่าออกจากแต่ละกิจกรรมของผู้บริหาร และใช้ในงานปลายทาง
คุณยังสามารถลองเรียกใช้ mssparkutils.notebook.help("runMultiple") เพื่อค้นหาตัวอย่างและการใช้งานโดยละเอียด
นี่คือตัวอย่างง่าย ๆ ของการเรียกใช้รายการของสมุดบันทึกในแบบขนานโดยใช้วิธีนี้:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:
ต่อไปนี้คือตัวอย่างของการเรียกใช้สมุดบันทึกที่มีโครงสร้าง mssparkutils.notebook.runMultiple()
ด้านบนโดยใช้ ใช้วิธีนี้เพื่อปรับโน้ตบุ๊กผ่านประสบการณ์การใช้รหัสได้อย่างง่ายดาย
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:
หมายเหตุ
- ระดับการทํางานแบบขนานของการเรียกใช้สมุดบันทึกหลายรายการถูกจํากัดไว้ที่ทรัพยากรการคํานวณที่พร้อมใช้งานทั้งหมดของเซสชัน Spark
- ขีดจํากัดสูงสุดสําหรับกิจกรรมสมุดบันทึกหรือสมุดบันทึกพร้อมกันคือ 50 การใช้งานเกินขีดจํากัดนี้อาจนําไปสู่ความเสถียรและปัญหาด้านประสิทธิภาพเนื่องจากการใช้ทรัพยากรการคํานวณสูง ถ้ามีปัญหาเกิดขึ้น ให้ลองแยกสมุดบันทึกออกเป็นการโทรหลาย
runMultiple
รายการ หรือลดภาวะพร้อมกันโดยการปรับ เขตข้อมูลภาวะ พร้อมกันในพารามิเตอร์ DAG - การหมดเวลาเริ่มต้นสําหรับ DAG ทั้งหมดคือ 12 ชั่วโมง และการหมดเวลาเริ่มต้นสําหรับแต่ละเซลล์ในสมุดบันทึกย่อยคือ 90 วินาที คุณสามารถเปลี่ยนการหมดเวลาได้โดยการตั้งค่า เขตข้อมูล timeoutInSeconds และ timeoutPerCellInSeconds ในพารามิเตอร์ DAG ได้
ออกจากสมุดบันทึก
วิธีนี้จะออกจากสมุดบันทึกที่มีค่า คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้
เมื่อคุณเรียกใช้ ฟังก์ชัน exit() จากสมุดบันทึกแบบโต้ตอบ สมุดบันทึก Fabric จะแสดงข้อยกเว้น ข้ามการเรียกใช้เซลล์ที่ตามมา และทําให้เซสชัน Spark มีชีวิต
เมื่อคุณแก้ไขสมุดบันทึกในไปป์ไลน์ที่เรียกใช้ ฟังก์ชัน exit() กิจกรรมสมุดบันทึกจะส่งกลับด้วยค่าการออก ทําให้การทํางานของไปป์ไลน์เสร็จสมบูรณ์ และหยุดเซสชัน Spark
เมื่อคุณเรียกใช้ฟังก์ชัน exit() ในสมุดบันทึกที่กําลังอ้างอิง Fabric Spark จะหยุดการดําเนินการเพิ่มเติมของสมุดบันทึกที่อ้างอิง และดําเนินการต่อเพื่อเรียกใช้เซลล์ถัดไปในสมุดบันทึกหลักที่เรียกใช้ฟังก์ชัน run() ตัวอย่างเช่น: Notebook1 มีเซลล์สามเซลล์และเรียก ฟังก์ชัน exit() ในเซลล์ที่สอง Notebook2 มีเซลล์ห้าเซลล์และเรียกฟังก์ชัน run(notebook1) ในเซลล์ที่สาม เมื่อคุณเรียกใช้ Notebook2 Notebook2 Notebook1 จะหยุดที่เซลล์ที่สองเมื่อกดฟังก์ชัน exit() Notebook2 ยังคงเรียกใช้เซลล์ที่สี่และเซลล์ที่ห้า
mssparkutils.notebook.exit("value string")
ตัวอย่างเช่น:
ตัวอย่าง 1 สมุดบันทึกที่มีเซลล์สองเซลล์ต่อไปนี้:
Cell 1 กําหนดพารามิเตอร์การ ป้อนข้อมูล ด้วยค่าเริ่มต้นที่ตั้งค่าเป็น 10
เซลล์ 2 ออกจากสมุดบันทึกที่มี ข้อมูลป้อนเข้า เป็นค่าออก
คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่นด้วยค่าเริ่มต้น:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
ผลิตภัณฑ์:
Notebook executed successfully with exit value 10
คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่น และตั้งค่าการป้อนข้อมูลเป็น 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
ผลิตภัณฑ์:
Notebook executed successfully with exit value 20
อรรถประโยชน์ข้อมูลประจําตัว
คุณสามารถใช้โปรแกรมอรรถประโยชน์ข้อมูลประจําตัว MSSparkUtils เพื่อรับโทเค็นการเข้าถึง และจัดการความลับใน Azure Key Vault ได้
เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:
mssparkutils.credentials.help()
ผลิตภัณฑ์:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name
รับโทเค็น
getToken จะส่งกลับโทเค็น Microsoft Entra สําหรับผู้ชมและชื่อที่กําหนด (ไม่บังคับ) รายการต่อไปนี้แสดงคีย์ผู้ชมที่พร้อมใช้งานในปัจจุบัน:
- ทรัพยากรผู้ชมที่เก็บข้อมูล: "ที่เก็บข้อมูล"
- ทรัพยากร Power BI: "pbi"
- ทรัพยากรชุดเก็บคีย์ของ Azure: "keyvault"
- ทรัพยากร Synapse RTA KQL DB: "kusto"
เรียกใช้คําสั่งต่อไปนี้เพื่อรับโทเค็น:
mssparkutils.credentials.getToken('audience Key')
รับข้อมูลลับโดยใช้ข้อมูลประจําตัวผู้ใช้
getSecret ส่งกลับข้อมูลลับชุดเก็บคีย์ของ Azure สําหรับจุดสิ้นสุด Azure Key Vault และชื่อลับที่กําหนดโดยใช้ข้อมูลประจําตัวของผู้ใช้
mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
การต่อเชื่อมไฟล์และยกเลิกการเปิดไฟล์
Fabric รองรับสถานการณ์การเมาท์ต่อไปนี้ในแพคเกจ Microsoft Spark Utilities คุณสามารถใช้เมาท์ unmount getMountPath() และ mounts() API เพื่อแนบพื้นที่เก็บข้อมูลระยะไกล (ADLS Gen2) กับโหนดการทํางานทั้งหมด (โหนดไดรเวอร์และโหนดผู้ปฏิบัติงาน) หลังจากจุดต่อเชื่อมที่เก็บข้อมูลอยู่ในตําแหน่งให้ใช้ API ไฟล์ภายในเครื่องเพื่อเข้าถึงข้อมูลราวกับว่าจัดเก็บไว้ในระบบไฟล์ภายใน
วิธีการต่อเชื่อมบัญชี ADLS Gen2
ตัวอย่างต่อไปนี้แสดงให้เห็นวิธีการกําหนดใช้ Azure Data Lake Storage รุ่น2 การติดตั้งที่เก็บข้อมูล Blob ในทํานองเดียวกัน
ตัวอย่างนี้ถือว่า คุณมีบัญชี Data Lake Storage รุ่น2 หนึ่งบัญชีที่ชื่อว่า storegen2 และบัญชีมีคอนเทนเนอร์หนึ่งชื่อว่า mycontainer ที่คุณต้องการติดตั้ง /test ลงในเซสชัน Spark ของสมุดบันทึกของคุณ
ในการติดภาชนะที่เรียกว่า mycontainer, mssparkutils ก่อนอื่นจําเป็นต้องตรวจสอบว่าคุณมีสิทธิ์ในการเข้าถึงคอนเทนเนอร์หรือไม่ ในขณะนี้ Fabric สนับสนุนวิธีการรับรองความถูกต้องสองวิธีสําหรับการดําเนินการเมานท์ทริกเกอร์: accountKey และ sastoken
เมานท์ผ่านโทเค็นลายเซ็นสําหรับการเข้าถึงที่ใช้ร่วมกันหรือคีย์บัญชี
MSSparkUtils สนับสนุนอย่างชัดเจนผ่านคีย์บัญชีหรือ โทเค็นลายเซ็นการเข้าถึงที่ใช้ร่วมกัน (SAS) เป็นพารามิเตอร์เพื่อยึดเป้าหมาย
เพื่อเหตุผลด้านความปลอดภัย เราขอแนะนําให้คุณจัดเก็บคีย์บัญชีหรือโทเค็น SAS ใน Azure Key Vault (ตามสกรีนช็อตต่อไปนี้แสดง) จากนั้นคุณสามารถเรียกใช้ได้โดยใช้ mssparkutils.credentials.getSecret API สําหรับข้อมูลเพิ่มเติมเกี่ยวกับ Azure Key Vault โปรดดู เกี่ยวกับคีย์บัญชีที่จัดการโดย Azure Key Vault
รหัสตัวอย่างสําหรับ เมธอด accountKey :
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
โค้ดตัวอย่างสําหรับ sastoken:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
หมายเหตุ
คุณอาจจําเป็นต้องนําเข้า mssparkutils
หากไม่พร้อมใช้งาน:
from notebookutils import mssparkutils
ติดพารามิเตอร์:
- fileCacheTimeout: Blobs จะถูกแคชในโฟลเดอร์ temp ภายในเครื่องเป็นเวลา 120 วินาทีตามค่าเริ่มต้น ในช่วงเวลานี้ blobfuse จะไม่ตรวจสอบว่าไฟล์นั้นเป็นไฟล์ล่าสุดหรือไม่ สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อไคลเอ็นต์หลายไคลเอ็นต์ปรับเปลี่ยนแฟ้มในเวลาเดียวกัน เพื่อหลีกเลี่ยงความไม่สอดคล้องกันระหว่างแฟ้มภายในเครื่องและระยะไกล เราขอแนะนําให้ย่อเวลาแคช หรือแม้กระทั่งเปลี่ยนเป็น 0 และรับไฟล์ล่าสุดจากเซิร์ฟเวอร์เสมอ
- หมดเวลา: การหมดเวลาของการดําเนินการเมาท์คือ 120 วินาทีตามค่าเริ่มต้น สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อมีผู้ปฏิบัติการมากเกินไปหรือเมื่อหมดเวลาในการติดเราขอแนะนําให้เพิ่มค่า
คุณสามารถใช้พารามิเตอร์เหล่านี้ได้ดังนี้:
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
หมายเหตุ
เพื่อเหตุผลด้านความปลอดภัย เราไม่แนะนําให้คุณจัดเก็บข้อมูลประจําตัวไว้ในรหัส เพื่อปกป้องข้อมูลประจําตัวของคุณเพิ่มเติม เราจะทําสําเนาข้อมูลลับของคุณในผลลัพธ์สมุดบันทึก สําหรับข้อมูลเพิ่มเติม ให้ดู การปฏิกริยาลับ
วิธีติดตั้งเลคเฮ้าส์
ตัวอย่างรหัสสําหรับการติดตั้งเลคเฮ้าส์เพื่อ /ทดสอบ:
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
หมายเหตุ
ไม่รองรับการติดตั้งจุดสิ้นสุดภูมิภาค ผ้าสนับสนุนเฉพาะการติดตั้งจุดสิ้นสุดสากล, onelake.dfs.fabric.microsoft.com
ของ
เข้าถึงไฟล์ภายใต้จุดต่อเชื่อมโดยใช้ mssparktuils fs API
วัตถุประสงค์หลักของการดําเนินการติดตั้งคือการอนุญาตให้ลูกค้าเข้าถึงข้อมูลที่จัดเก็บไว้ในบัญชีเก็บข้อมูลระยะไกลด้วย API ระบบไฟล์ภายในเครื่อง คุณยังสามารถเข้าถึงข้อมูลโดยใช้ mssparkutils fs API ด้วยเส้นทางที่ติดอยู่เป็นพารามิเตอร์ได้ รูปแบบเส้นทางนี้จะแตกต่างกันเล็กน้อย
สมมติว่าคุณได้ติดตั้งคอนเทนเนอร์ Data Lake Storage รุ่น2 mycontainer to /test โดยใช้ api การต่อเชื่อม เมื่อคุณเข้าถึงข้อมูลด้วย API ระบบไฟล์ภายใน รูปแบบเส้นทางจะมีลักษณะดังนี้:
/synfs/notebook/{sessionId}/test/{filename}
เมื่อคุณต้องการเข้าถึงข้อมูล โดยใช้ mssparkutils fs API เราขอแนะนําให้ใช้ getMountPath() เพื่อให้ได้เส้นทางที่ถูกต้อง:
path = mssparkutils.fs.getMountPath("/test")
ไดเรกทอรีรายการ:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
อ่านเนื้อหาไฟล์:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
สร้างไดเรกทอรี:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
เข้าถึงไฟล์ภายใต้จุดยึดผ่านเส้นทางท้องถิ่น
คุณสามารถอ่านและเขียนไฟล์ได้อย่างง่ายดายในจุดต่อเชื่อมโดยใช้ระบบไฟล์มาตรฐาน นี่คือตัวอย่าง Python:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
วิธีการตรวจสอบจุดยึดที่มีอยู่
คุณสามารถใช้ mssparkutils.fs.mounts() API เพื่อตรวจสอบข้อมูลจุดต่อเชื่อมที่มีอยู่ทั้งหมด:
mssparkutils.fs.mounts()
วิธีการยกเลิกการต่อเชื่อมจุดยึด
ใช้รหัสต่อไปนี้เพื่อยกเลิกการต่อเชื่อมจุด ยึดของคุณ (/ทดสอบ ในตัวอย่างนี้):
mssparkutils.fs.unmount("/test")
ข้อจำกัดที่ทราบ
การติดปัจจุบันคือการกําหนดค่าระดับงาน เราขอแนะนําให้คุณใช้เ มาท์ API เพื่อตรวจสอบว่ามีจุดต่อเชื่อมอยู่หรือไม่พร้อมใช้งาน
กลไกยกเลิกการเมาท์ไม่ได้เป็นไปโดยอัตโนมัติ เมื่อแอปพลิเคชันทํางานเสร็จสิ้นหากต้องการยกเลิกเมาท์จุดและปล่อยพื้นที่ดิสก์ คุณต้องเรียกใช้ API ที่ไม่ได้ลงทะเบียนในโค้ดของคุณอย่างชัดเจน มิฉะนั้นจุดยึดจะยังคงอยู่ในโหนดหลังจากการทํางานของแอปพลิเคชันเสร็จสิ้น
ไม่รองรับการต่อเชื่อมบัญชีที่เก็บข้อมูล ADLS Gen1
สาธารณูปโภคของเลคเฮ้าส์
mssparkutils.lakehouse
ให้บริการสาธารณูปโภคที่ปรับให้เหมาะสมสําหรับการจัดการวัตถุของ Lakehouse โดยเฉพาะ โปรแกรมอรรถประโยชน์เหล่านี้ช่วยให้ผู้ใช้สร้าง เรียกใช้ อัปเดต และลบวัตถุ Lakehouse ได้อย่างง่ายดาย
หมายเหตุ
Lakehouse API ได้รับการสนับสนุนเฉพาะในรันไทม์เวอร์ชัน 1.2+ เท่านั้น
ภาพรวมของวิธีการ
ด้านล่างคือภาพรวมของวิธีการที่พร้อมใช้งานซึ่งจัดหาโดย mssparkutils.lakehouse
:
# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]
ตัวอย่างการใช้
หากต้องการใช้วิธีการเหล่านี้อย่างมีประสิทธิภาพ ให้พิจารณาตัวอย่างการใช้งานต่อไปนี้:
การสร้างสิ่งประดิษฐ์ของเลคเฮ้าส์
artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
เรียกข้อมูลวัตถุของเลคเฮ้าส์
artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")
ปรับปรุงสิ่งประดิษฐ์ของเลคเฮ้าส์
updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
การลบสิ่งประดิษฐ์ของ Lakehouse
is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")
รายการวัตถุของเลคเฮ้าส์
artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")
ข้อมูลเพิ่มเติม
สําหรับข้อมูลรายละเอียดเพิ่มเติมเกี่ยวกับแต่ละวิธีและพารามิเตอร์ ให้ใช้ mssparkutils.lakehouse.help("methodName")
ฟังก์ชัน
ด้วยโปรแกรมอรรถประโยชน์ของเลคเฮ้าส์ของ MSSparkUtils การจัดการวัตถุ Lakehouse จะมีประสิทธิภาพมากขึ้นและรวมเข้ากับไปป์ไลน์ Fabric ของคุณ เพิ่มประสบการณ์การจัดการข้อมูลโดยรวมของคุณ
อย่าลังเลที่จะสํารวจโปรแกรมอรรถประโยชน์เหล่านี้และรวมเข้ากับเวิร์กโฟลว์ Fabric ของคุณเพื่อการจัดการอาร์ติแฟกต์ของ Lakehouse แบบไร้รอยต่อ
โปรแกรมอรรถประโยชน์รันไทม์
แสดงข้อมูลบริบทเซสชัน
ด้วย mssparkutils.runtime.context
คุณสามารถรับข้อมูลบริบทของเซสชันสดปัจจุบัน รวมถึงชื่อสมุดบันทึก ค่าเริ่มต้นของ lakehouse ข้อมูลพื้นที่ทํางาน ถ้าเป็นไปป์ไลน์ที่เรียกใช้ ฯลฯ
mssparkutils.runtime.context
หมายเหตุ
mssparkutils.env
ไม่ได้รับการสนับสนุนอย่างเป็นทางการบน Fabric โปรดใช้ notebookutils.runtime.context
เป็นทางเลือก
ปัญหาที่ทราบแล้ว
เมื่อใช้รันไทม์เวอร์ชันที่สูงกว่า 1.2 และเรียกใช้ mssparkutils.help()
, fabricClient, warehouse และพื้นที่ทํางาน APIs ไม่ได้รับการสนับสนุนสําหรับตอนนี้จะพร้อมใช้งานเพิ่มเติม