NotebookUtils (เดิม MSSparkUtils) สําหรับ Fabric
Notebook Utilities (NotebookUtils) เป็นแพคเกจในตัวเพื่อช่วยให้คุณทํางานทั่วไปได้อย่างง่ายดายใน Fabric Notebook คุณสามารถใช้ NotebookUtils เพื่อทํางานกับระบบไฟล์ เพื่อรับตัวแปรสภาพแวดล้อม เพื่อเกี่ยวโยงสมุดบันทึกเข้าด้วยกัน และทํางานกับข้อมูลลับ แพคเกจ NotebookUtils มีให้ใช้งานใน PySpark (Python) Scala, SparkR notebooks และ Fabric pipelines
หมายเหตุ
- MsSparkUtils ถูกเปลี่ยนชื่อ เป็น NotebookUtils อย่างเป็นทางการแล้ว โค้ดที่มีอยู่จะยังคง เข้ากัน ได้ย้อนกลับและจะไม่ทําให้เกิดการเปลี่ยนแปลงที่เสียหาย ขอแนะนําให้อัปเกรดเป็นสมุดบันทึกเพื่อให้แน่ใจว่าการสนับสนุนอย่างต่อเนื่องและการเข้าถึงคุณลักษณะใหม่ เนมสเปซ mssparkutils จะถูกยกเลิกในอนาคต
- NotebookUtils ออกแบบมาเพื่อทํางานกับ Spark 3.4 (Runtime v1.2) และสูงกว่า ฟีเจอร์และการอัปเดตใหม่ทั้งหมดจะได้รับการสนับสนุนเฉพาะกับ namespace ของ notebookutils ในอนาคต
โปรแกรมอรรถประโยชน์ระบบไฟล์
notebookutils.fs ให้บริการสาธารณูปโภคสําหรับการทํางานกับระบบไฟล์ต่าง ๆ รวมถึง Azure Data Lake Storage (ADLS) Gen2 และ Azure Blob Storage ตรวจสอบให้แน่ใจว่าคุณได้กําหนดค่าการเข้าถึงไปยัง Azure Data Lake Storage รุ่น2 และ Azure Blob Storage อย่างเหมาะสม
เรียกใช้คําสั่งต่อไปนี้สําหรับภาพรวมของวิธีการที่พร้อมใช้งาน:
notebookutils.fs.help()
เอาท์พุท
notebookutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use notebookutils.fs.help("methodName") for more info about a method.
NotebookUtils ทํางานกับระบบไฟล์ในลักษณะเดียวกับ Spark API ใช้ notebookutils.fs.mkdirs() และการใช้งาน Fabric lakehouse ตัวอย่างเช่น:
การใช้งาน | เส้นทางสัมพัทธ์จากราก HDFS | เส้นทางสัมบูรณ์สําหรับระบบไฟล์ ABFS | เส้นทางสัมบูรณ์สําหรับระบบไฟล์ภายในเครื่องในโหนดโปรแกรมควบคุม |
---|---|---|---|
เลคเฮ้าส์ที่ไม่ได้เป็นค่าเริ่มต้น | ไม่ได้รับการสนับสนุน | notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
เลคเฮ้าส์ค่าเริ่มต้น | ไดเรกทอรีภายใต้ "Files" หรือ "Tables": notebookutils.fs.mkdirs("Files/<new_dir>") | notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
แสดงรายการไฟล์
หากต้องการแสดงรายการเนื้อหาของไดเรกทอรี ให้ใช้ notebookutils.fs.ls('เส้นทางไดเรกทอรีของคุณ') ตัวอย่างเช่น:
notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp") # The full path of the local file system of driver node
API notebookutils.fs.ls()
จะทํางานแตกต่างกันเมื่อใช้เส้นทางสัมพัทธ์ โดยขึ้นอยู่กับชนิดของสมุดบันทึก
ในสมุดบันทึก Spark : เส้นทางสัมพัทธ์จะสัมพันธ์กับเส้นทาง ABFSS ของ Lakehouse ตามค่าเริ่มต้น ตัวอย่างเช่น
notebookutils.fs.ls("Files")
ชี้ไปที่ไดเรกทอรีFiles
ในเลคเฮ้าส์เริ่มต้นตัวอย่างเช่น:
notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
ในสมุดบันทึก Python : เส้นทางสัมพัทธ์จะสัมพันธ์กับไดเรกทอรีการทํางานของระบบไฟล์ภายในเครื่อง ซึ่งค่าเริ่มต้นคือ /home/trusted-service-user/work ดังนั้น คุณควรใช้เส้นทางแบบเต็มแทนที่จะเป็นเส้นทางสัมพัทธ์
notebookutils.fs.ls("/lakehouse/default/Files")
ในการเข้าถึงไดเรกทอรีFiles
ในเลคเฮ้าส์เริ่มต้นตัวอย่างเช่น:
notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
ดูคุณสมบัติไฟล์
วิธีการนี้จะส่งกลับคุณสมบัติไฟล์ รวมถึงชื่อไฟล์ เส้นทางไฟล์ ขนาดไฟล์ และอาจเป็นไดเรกทอรีและไฟล์หรือไม่
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
สร้างไดเรกทอรีใหม่
วิธีนี้จะสร้างไดเรกทอรีที่ระบุถ้าไม่มีอยู่ และสร้างไดเรกทอรีหลักใด ๆ ที่จําเป็น
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
notebookutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
คัดลอกไฟล์
วิธีนี้จะคัดลอกไฟล์หรือไดเรกทอรี และรองรับการคัดลอกกิจกรรมข้ามระบบไฟล์
notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
ไฟล์สําเนาที่มีประสิทธิภาพ
วิธีนี้เป็นวิธีที่มีประสิทธิภาพในการคัดลอกหรือย้ายไฟล์โดยเฉพาะอย่างยิ่งเมื่อทํางานกับข้อมูลขนาดใหญ่ เพื่อเพิ่มประสิทธิภาพการทํางานที่ดีขึ้นบน Fabric จะแนะนําให้ใช้fastcp
แทนวิธีการแบบดั้งเดิมcp
หมายเหตุ
notebookutils.fs.fastcp()
ไม่รองรับการคัดลอกไฟล์ใน OneLake ทั่วทั้งภูมิภาค ในกรณีนี้ คุณสามารถใช้ notebookutils.fs.cp()
แทนได้
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
แสดงตัวอย่างเนื้อหาไฟล์
วิธีนี้ส่งกลับถึงไบต์ 'maxBytes' แรกของไฟล์ที่ระบุเป็นสตริงที่เข้ารหัสลับใน UTF-8
notebookutils.fs.head('file path', maxBytes to read)
ย้ายไฟล์
วิธีนี้จะย้ายไฟล์หรือไดเรกทอรี และสนับสนุนการย้ายข้ามระบบไฟล์
notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
เขียนไฟล์
วิธีนี้เขียนสตริงที่ระบุออกไปยังไฟล์เข้ารหัสใน UTF-8
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
ผนวกเนื้อหาไปยังไฟล์
เมธอดนี้ผนวกสตริงที่กําหนดไปยังไฟล์ที่เข้ารหัสไว้ใน UTF-8
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
หมายเหตุ
-
notebookutils.fs.append()
และnotebookutils.fs.put()
ไม่สนับสนุนการเขียนพร้อมกันในไฟล์เดียวกันเนื่องจากขาดการรับประกันความเป็นอะตอม - เมื่อใช้
notebookutils.fs.append
API ในการfor
วนรอบเพื่อเขียนไปยังไฟล์เดียวกัน เราขอแนะนําให้เพิ่มsleep
คําสั่งประมาณ 0.5s ~ 1s ระหว่างการเขียนที่เกิดซ้ํา ทั้งนี้เนื่องจากnotebookutils.fs.append
การดําเนินการภายในflush
ของ API เป็นแบบอะซิงโครนัส ดังนั้นการหน่วงเวลาสั้น ๆ จะช่วยรับประกันความสมบูรณ์ของข้อมูล
ลบไฟล์หรือไดเรกทอรี
วิธีนี้จะลบไฟล์หรือไดเรกทอรีออก
notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
ไดเรกทอรีการต่อเชื่อม/ยกเลิกการต่อเชื่อม
ค้นหาข้อมูลเพิ่มเติมเกี่ยวกับการใช้งานโดยละเอียดในแฟ้มที่ต่อเชื่อมและยกเลิกการเปิด
อรรถประโยชน์สมุดบันทึก
ใช้อรรถประโยชน์สมุดบันทึกเพื่อเรียกใช้สมุดบันทึกหรือออกจากสมุดบันทึกด้วยค่า เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:
notebookutils.notebook.help()
ผลิตภัณฑ์:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.
[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.
Use notebookutils.notebook.help("methodName") for more info about a method.
หมายเหตุ
อรรถประโยชน์ของสมุดบันทึกไม่สามารถใช้ได้กับข้อกําหนดงาน Apache Spark (SJD)
การอ้างอิงสมุดบันทึก
วิธีการนี้จะอ้างอิงสมุดบันทึกและส่งกลับค่าออกจากสมุดบันทึก คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้ สมุดบันทึกที่กําลังอ้างอิงถูกอ้างอิงจะทํางานบนพูล Spark ของสมุดบันทึกที่เรียกใช้ฟังก์ชันนี้
notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
ตัวอย่างเช่น:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
โน้ตบุ๊คผ้ายังสนับสนุนการอ้างอิงสมุดบันทึกทั่วทั้งพื้นที่ทํางานโดยการระบุ ID พื้นที่ทํางาน
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
คุณสามารถเปิดลิงก์สแนปช็อตของการเรียกใช้การอ้างอิงในผลลัพธ์ของเซลล์ สแนปช็อตจะจับผลลัพธ์การเรียกใช้โค้ดและช่วยให้คุณสามารถดีบักการอ้างอิงได้อย่างง่ายดาย
หมายเหตุ
- สมุดบันทึกอ้างอิงข้ามพื้นที่ทํางานได้รับการสนับสนุนโดยรันไทม์เวอร์ชัน 1.2 และสูงกว่า
- ถ้าคุณใช้ไฟล์ภายใต้ ทรัพยากรสมุดบันทึก ให้ใช้
notebookutils.nbResPath
ในสมุดบันทึกอ้างอิงเพื่อตรวจสอบให้แน่ใจว่าจะชี้ไปยังโฟลเดอร์เดียวกันกับการเรียกใช้แบบโต้ตอบ
การอ้างอิงจะเรียกใช้สมุดบันทึกหลายเล่มพร้อมกัน
สำคัญ
คุณลักษณะนี้อยู่ในตัวอย่าง
วิธีนี้ notebookutils.notebook.runMultiple()
ช่วยให้คุณสามารถเรียกใช้สมุดบันทึกหลายเล่มพร้อมกันหรือด้วยโครงสร้างโทโพโลยีที่กําหนดไว้ล่วงหน้า API กําลังใช้กลไกการใช้งานแบบหลายเธรดภายในเซสชัน Spark ซึ่งหมายความว่าสมุดบันทึกอ้างอิงจะเรียกใช้การแชร์ทรัพยากรการคํานวณ
ด้วย notebookutils.notebook.runMultiple()
คุณสามารถ:
ใช้สมุดบันทึกหลายเล่มพร้อมกันโดยไม่ต้องรอให้แต่ละสมุดบันทึกเสร็จสิ้น
ระบุการขึ้นต่อกันและลําดับการดําเนินการสําหรับสมุดบันทึกของคุณ โดยใช้รูปแบบ JSON แบบง่าย
ปรับการใช้ทรัพยากรการคํานวณ Spark ให้เหมาะสมและลดค่าใช้จ่ายของโครงการ Fabric ของคุณ
ดูสแนปช็อตของแต่ละบันทึกการเรียกใช้สมุดบันทึกในผลลัพธ์ และแก้จุดบกพร่อง/ตรวจสอบงานสมุดบันทึกของคุณอย่างสะดวก
รับค่าออกจากแต่ละกิจกรรมของผู้บริหาร และใช้ในงานปลายทาง
คุณยังสามารถลองเรียกใช้ notebookutils.notebook.help("runMultiple") เพื่อค้นหาตัวอย่างและการใช้งานโดยละเอียด
นี่คือตัวอย่างง่าย ๆ ของการเรียกใช้รายการของสมุดบันทึกในแบบขนานโดยใช้วิธีนี้:
notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:
ต่อไปนี้คือตัวอย่างของการเรียกใช้สมุดบันทึกที่มีโครงสร้าง notebookutils.notebook.runMultiple()
ด้านบนโดยใช้ ใช้วิธีนี้เพื่อปรับโน้ตบุ๊กผ่านประสบการณ์การใช้รหัสได้อย่างง่ายดาย
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:
นอกจากนี้เรายังมีวิธีการตรวจสอบว่ามีการกําหนด DAG อย่างถูกต้องหรือไม่
notebookutils.notebook.validateDAG(DAG)
หมายเหตุ
- ระดับการทํางานแบบขนานของการเรียกใช้สมุดบันทึกหลายรายการถูกจํากัดไว้ที่ทรัพยากรการคํานวณที่พร้อมใช้งานทั้งหมดของเซสชัน Spark
- ขีดจํากัดสูงสุดสําหรับกิจกรรมสมุดบันทึกหรือสมุดบันทึกพร้อมกันคือ 50 การใช้งานเกินขีดจํากัดนี้อาจนําไปสู่ความเสถียรและปัญหาด้านประสิทธิภาพเนื่องจากการใช้ทรัพยากรการคํานวณสูง ถ้ามีปัญหาเกิดขึ้น ให้ลองแยกสมุดบันทึกออกเป็นการโทรหลาย
runMultiple
รายการ หรือลดภาวะพร้อมกันโดยการปรับ เขตข้อมูลภาวะ พร้อมกันในพารามิเตอร์ DAG - การหมดเวลาเริ่มต้นสําหรับ DAG ทั้งหมดคือ 12 ชั่วโมง และการหมดเวลาเริ่มต้นสําหรับแต่ละเซลล์ในสมุดบันทึกย่อยคือ 90 วินาที คุณสามารถเปลี่ยนการหมดเวลาได้โดยการตั้งค่า เขตข้อมูล timeoutInSeconds และ timeoutPerCellInSeconds ในพารามิเตอร์ DAG ได้
ออกจากสมุดบันทึก
วิธีนี้จะออกจากสมุดบันทึกที่มีค่า คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้
เมื่อคุณเรียกใช้ ฟังก์ชัน exit() จากสมุดบันทึกแบบโต้ตอบ สมุดบันทึก Fabric จะแสดงข้อยกเว้น ข้ามการเรียกใช้เซลล์ที่ตามมา และทําให้เซสชัน Spark มีชีวิต
เมื่อคุณแก้ไขสมุดบันทึกในไปป์ไลน์ที่เรียกใช้ ฟังก์ชัน exit() กิจกรรมสมุดบันทึกจะส่งกลับด้วยค่าการออก ทําให้การทํางานของไปป์ไลน์เสร็จสมบูรณ์ และหยุดเซสชัน Spark
เมื่อคุณเรียกใช้ฟังก์ชัน exit() ในสมุดบันทึกที่กําลังอ้างอิง Fabric Spark จะหยุดการดําเนินการเพิ่มเติมของสมุดบันทึกที่อ้างอิง และดําเนินการต่อเพื่อเรียกใช้เซลล์ถัดไปในสมุดบันทึกหลักที่เรียกใช้ฟังก์ชัน run() ตัวอย่างเช่น: Notebook1 มีเซลล์สามเซลล์และเรียก ฟังก์ชัน exit() ในเซลล์ที่สอง Notebook2 มีเซลล์ห้าเซลล์และเรียกฟังก์ชัน run(notebook1) ในเซลล์ที่สาม เมื่อคุณเรียกใช้ Notebook2 Notebook2 Notebook1 จะหยุดที่เซลล์ที่สองเมื่อกดฟังก์ชัน exit() Notebook2 ยังคงเรียกใช้เซลล์ที่สี่และเซลล์ที่ห้า
notebookutils.notebook.exit("value string")
หมายเหตุ
ฟังก์ชัน exit() จะเขียนทับเอาต์พุตเซลล์ปัจจุบันเพื่อหลีกเลี่ยงการสูญเสียเอาต์พุตของคําสั่งรหัสอื่น ๆ โปรดเรียกใช้ notebookutils.notebook.exit()
ในเซลล์แยกต่างหาก
ตัวอย่างเช่น:
ตัวอย่าง 1 สมุดบันทึกที่มีเซลล์สองเซลล์ต่อไปนี้:
Cell 1 กําหนดพารามิเตอร์การ ป้อนข้อมูล ด้วยค่าเริ่มต้นที่ตั้งค่าเป็น 10
เซลล์ 2 ออกจากสมุดบันทึกที่มี ข้อมูลป้อนเข้า เป็นค่าออก
คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่นด้วยค่าเริ่มต้น:
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
ผลิตภัณฑ์:
Notebook is executed successfully with exit value 10
คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่น และตั้งค่าการป้อนข้อมูลเป็น 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
ผลิตภัณฑ์:
Notebook is executed successfully with exit value 20
จัดการสิ่งประดิษฐ์สมุดบันทึก
notebookutils.notebook
มีโปรแกรมอรรถประโยชน์พิเศษสําหรับการจัดการรายการสมุดบันทึกทางโปรแกรม API เหล่านี้สามารถช่วยให้คุณสร้าง รับ อัปเดต และลบรายการสมุดบันทึกได้อย่างง่ายดาย
หากต้องการใช้วิธีการเหล่านี้อย่างมีประสิทธิภาพ ให้พิจารณาตัวอย่างการใช้งานต่อไปนี้:
การสร้างสมุดบันทึก
with open("/path/to/notebook.ipynb", "r") as f:
content = f.read()
artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")
การรับเนื้อหาของสมุดบันทึก
artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")
การปรับปรุงสมุดบันทึก
updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name", "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")
การลบสมุดบันทึก
is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")
การแสดงรายการสมุดบันทึกในพื้นที่ทํางาน
artifacts_list = notebookutils.notebook.list("optional_workspace_id")
อรรถประโยชน์ข้อมูลประจําตัว
คุณสามารถใช้ยูทิลิตี้ข้อมูลประจําตัวเพื่อรับโทเค็นการเข้าถึงและจัดการความลับใน Azure Key Vault ได้
เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:
notebookutils.credentials.help()
ผลิตภัณฑ์:
Help on module notebookutils.credentials in notebookutils:
NAME
notebookutils.credentials - Utility for credentials operations in Fabric
FUNCTIONS
getSecret(akvName, secret) -> str
Gets a secret from the given Azure Key Vault.
:param akvName: The name of the Azure Key Vault.
:param secret: The name of the secret.
:return: The secret value.
getToken(audience) -> str
Gets a token for the given audience.
:param audience: The audience for the token.
:return: The token.
help(method_name=None)
Provides help for the notebookutils.credentials module or the specified method.
Examples:
notebookutils.credentials.help()
notebookutils.credentials.help("getToken")
:param method_name: The name of the method to get help with.
DATA
creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...
FILE
/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py
รับโทเค็น
getToken จะส่งกลับโทเค็น Microsoft Entra สําหรับผู้ชมและชื่อที่กําหนด (ไม่บังคับ) รายการต่อไปนี้แสดงคีย์ผู้ชมที่พร้อมใช้งานในปัจจุบัน:
- ทรัพยากรผู้ชมที่เก็บข้อมูล: "ที่เก็บข้อมูล"
- ทรัพยากร Power BI: "pbi"
- ทรัพยากรชุดเก็บคีย์ของ Azure: "keyvault"
- ทรัพยากร Synapse RTA KQL DB: "kusto"
เรียกใช้คําสั่งต่อไปนี้เพื่อรับโทเค็น:
notebookutils.credentials.getToken('audience Key')
รับข้อมูลลับโดยใช้ข้อมูลประจําตัวผู้ใช้
getSecret ส่งกลับข้อมูลลับชุดเก็บคีย์ของ Azure สําหรับจุดสิ้นสุด Azure Key Vault และชื่อลับที่กําหนดโดยใช้ข้อมูลประจําตัวของผู้ใช้
notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
การต่อเชื่อมไฟล์และยกเลิกการเปิดไฟล์
Fabric รองรับสถานการณ์การเมาท์ต่อไปนี้ในแพคเกจ Microsoft Spark Utilities คุณสามารถใช้เมาท์ unmount getMountPath() และ mounts() API เพื่อแนบพื้นที่เก็บข้อมูลระยะไกล (ADLS Gen2) กับโหนดการทํางานทั้งหมด (โหนดไดรเวอร์และโหนดผู้ปฏิบัติงาน) หลังจากจุดต่อเชื่อมที่เก็บข้อมูลอยู่ในตําแหน่งให้ใช้ API ไฟล์ภายในเครื่องเพื่อเข้าถึงข้อมูลราวกับว่าจัดเก็บไว้ในระบบไฟล์ภายใน
วิธีการต่อเชื่อมบัญชี ADLS Gen2
ตัวอย่างต่อไปนี้แสดงให้เห็นวิธีการกําหนดใช้ Azure Data Lake Storage รุ่น2 การติดตั้งที่เก็บข้อมูล Blob ในทํานองเดียวกัน
ตัวอย่างนี้ถือว่า คุณมีบัญชี Data Lake Storage รุ่น2 หนึ่งบัญชีที่ชื่อว่า storegen2 และบัญชีมีคอนเทนเนอร์หนึ่งชื่อว่า mycontainer ที่คุณต้องการติดตั้ง /test ลงในเซสชัน Spark ของสมุดบันทึกของคุณ
ในการติดภาชนะที่เรียกว่า mycontainer, notebookutils ก่อนจําเป็นต้องตรวจสอบว่าคุณมีสิทธิ์ในการเข้าถึงคอนเทนเนอร์หรือไม่ ในขณะนี้ Fabric สนับสนุนวิธีการรับรองความถูกต้องสองวิธีสําหรับการดําเนินการเมานท์ทริกเกอร์: accountKey และ sastoken
เมานท์ผ่านโทเค็นลายเซ็นสําหรับการเข้าถึงที่ใช้ร่วมกันหรือคีย์บัญชี
NotebookUtils สนับสนุนอย่างชัดเจนผ่านคีย์บัญชีหรือ โทเค็นลายเซ็นการเข้าถึงที่ใช้ร่วมกัน (SAS) เป็นพารามิเตอร์เพื่อต่อเชื่อมเป้าหมาย
เพื่อเหตุผลด้านความปลอดภัย เราขอแนะนําให้คุณจัดเก็บคีย์บัญชีหรือโทเค็น SAS ใน Azure Key Vault (ตามสกรีนช็อตต่อไปนี้แสดง) จากนั้นคุณสามารถเรียกใช้ได้โดยใช้ notebookutils.credentials.getSecret API สําหรับข้อมูลเพิ่มเติมเกี่ยวกับ Azure Key Vault โปรดดู เกี่ยวกับคีย์บัญชีที่จัดการโดย Azure Key Vault
รหัสตัวอย่างสําหรับ เมธอด accountKey :
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
โค้ดตัวอย่างสําหรับ sastoken:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
ติดพารามิเตอร์:
- fileCacheTimeout: Blobs จะถูกแคชในโฟลเดอร์ temp ภายในเครื่องเป็นเวลา 120 วินาทีตามค่าเริ่มต้น ในช่วงเวลานี้ blobfuse ไม่ได้ตรวจสอบว่าไฟล์นั้นเป็นไฟล์ล่าสุดหรือไม่ สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อไคลเอ็นต์หลายไคลเอ็นต์ปรับเปลี่ยนแฟ้มในเวลาเดียวกัน เพื่อหลีกเลี่ยงความไม่สอดคล้องกันระหว่างแฟ้มภายในเครื่องและระยะไกล เราขอแนะนําให้ย่อเวลาแคช หรือแม้กระทั่งเปลี่ยนเป็น 0 และรับไฟล์ล่าสุดจากเซิร์ฟเวอร์เสมอ
- หมดเวลา: การหมดเวลาของการดําเนินการเมาท์คือ 120 วินาทีตามค่าเริ่มต้น สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อมีผู้ปฏิบัติการมากเกินไปหรือเมื่อหมดเวลาในการติดเราขอแนะนําให้เพิ่มค่า
คุณสามารถใช้พารามิเตอร์เหล่านี้ได้ดังนี้:
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
หมายเหตุ
เพื่อวัตถุประสงค์ด้านความปลอดภัย ขอแนะนําให้หลีกเลี่ยงการฝังข้อมูลประจําตัวโดยตรงในโค้ด เพื่อปกป้องข้อมูลประจําตัวของคุณเพิ่มเติม ข้อมูลลับใด ๆ ที่แสดงในผลลัพธ์สมุดบันทึกจะเปลี่ยนแปลง สําหรับข้อมูลเพิ่มเติม ให้ดู การปฏิกริยาลับ
วิธีติดตั้งเลคเฮ้าส์
รหัสตัวอย่างสําหรับการติดตั้งเลคเฮ้าส์ไปยัง /<mount_name>:
notebookutils.fs.mount(
"abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse",
"/<mount_name>"
)
เข้าถึงไฟล์ภายใต้จุดต่อเชื่อมโดยใช้ notebookutils fs API
วัตถุประสงค์หลักของการดําเนินการติดตั้งคือการอนุญาตให้ลูกค้าเข้าถึงข้อมูลที่จัดเก็บไว้ในบัญชีเก็บข้อมูลระยะไกลด้วย API ระบบไฟล์ภายในเครื่อง คุณยังสามารถเข้าถึงข้อมูลโดยใช้ notebookutils fs API ด้วยเส้นทางที่ติดอยู่เป็นพารามิเตอร์ได้ รูปแบบเส้นทางนี้จะแตกต่างกันเล็กน้อย
สมมติว่าคุณได้ติดตั้งคอนเทนเนอร์ Data Lake Storage รุ่น2 mycontainer to /test โดยใช้ api การต่อเชื่อม เมื่อคุณเข้าถึงข้อมูลด้วย API ระบบไฟล์ภายใน รูปแบบเส้นทางจะมีลักษณะดังนี้:
/synfs/notebook/{sessionId}/test/{filename}
เมื่อคุณต้องการเข้าถึงข้อมูลโดยใช้ notebookutils fs API เราขอแนะนําให้ใช้ getMountPath() เพื่อให้ได้เส้นทางที่ถูกต้อง:
path = notebookutils.fs.getMountPath("/test")
ไดเรกทอรีรายการ:
notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
อ่านเนื้อหาไฟล์:
notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
สร้างไดเรกทอรี:
notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
เข้าถึงไฟล์ภายใต้จุดยึดผ่านเส้นทางท้องถิ่น
คุณสามารถอ่านและเขียนไฟล์ได้อย่างง่ายดายในจุดต่อเชื่อมโดยใช้ระบบไฟล์มาตรฐาน นี่คือตัวอย่าง Python:
#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
วิธีการตรวจสอบจุดยึดที่มีอยู่
คุณสามารถใช้ notebookutils.fs.mounts() API เพื่อตรวจสอบข้อมูลจุดต่อเชื่อมที่มีอยู่ทั้งหมด:
notebookutils.fs.mounts()
วิธีการยกเลิกการต่อเชื่อมจุดยึด
ใช้รหัสต่อไปนี้เพื่อยกเลิกการต่อเชื่อมจุด ยึดของคุณ (/ทดสอบ ในตัวอย่างนี้):
notebookutils.fs.unmount("/test")
ข้อจำกัดที่ทราบ
การติดปัจจุบันคือการกําหนดค่าระดับงาน เราขอแนะนําให้คุณใช้เ มาท์ API เพื่อตรวจสอบว่ามีจุดต่อเชื่อมอยู่หรือไม่พร้อมใช้งาน
กลไกยกเลิกการเมาท์จะไม่ถูกนําไปใช้โดยอัตโนมัติ เมื่อแอปพลิเคชันทํางานเสร็จสิ้นหากต้องการยกเลิกเมาท์จุดและปล่อยพื้นที่ดิสก์ คุณต้องเรียกใช้ API ที่ไม่ได้ลงทะเบียนในโค้ดของคุณอย่างชัดเจน มิฉะนั้นจุดยึดจะยังคงอยู่ในโหนดหลังจากการทํางานของแอปพลิเคชันเสร็จสิ้น
ไม่รองรับการต่อเชื่อมบัญชีที่เก็บข้อมูล ADLS Gen1
สาธารณูปโภคของเลคเฮ้าส์
notebookutils.lakehouse
ให้บริการสาธารณูปโภคที่ออกแบบมาสําหรับการจัดการรายการในเลคเฮ้าส์ สาธารณูปโภคเหล่านี้ช่วยให้คุณสามารถสร้าง รับ อัปเดต และลบวัตถุของเลคเฮ้าส์ได้อย่างง่ายดาย
ภาพรวมของวิธีการ
ด้านล่างคือภาพรวมของวิธีการที่พร้อมใช้งานซึ่งจัดหาโดย notebookutils.lakehouse
:
# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]
# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table]
# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table]
ตัวอย่างการใช้
หากต้องการใช้วิธีการเหล่านี้อย่างมีประสิทธิภาพ ให้พิจารณาตัวอย่างการใช้งานต่อไปนี้:
การสร้างเลคเฮ้าส์
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
การเข้าเลคเฮ้าส์
artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")
ปรับปรุงเลคเฮ้าส์
updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
การลบเลคเฮ้าส์
is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")
รายชื่อเลคเฮ้าส์ในพื้นที่ทํางาน
artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")
แสดงรายการตารางทั้งหมดในเลคเฮ้าส์
artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")
เริ่มต้นการดําเนินการโหลดตารางในเลคเฮ้าส์
notebookutils.lakehouse.loadTable(
{
"relativePath": "Files/myFile.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}, "table_name", "artifact_name", "optional_workspace_id")
ข้อมูลเพิ่มเติม
สําหรับข้อมูลรายละเอียดเพิ่มเติมเกี่ยวกับแต่ละวิธีและพารามิเตอร์ ให้ใช้ notebookutils.lakehouse.help("methodName")
ฟังก์ชัน
โปรแกรมอรรถประโยชน์รันไทม์
แสดงข้อมูลบริบทเซสชัน
ด้วย notebookutils.runtime.context
คุณสามารถรับข้อมูลบริบทของเซสชันสดปัจจุบัน รวมถึงชื่อสมุดบันทึก ค่าเริ่มต้นของ lakehouse ข้อมูลพื้นที่ทํางาน ถ้าเป็นไปป์ไลน์ที่เรียกใช้ ฯลฯ
notebookutils.runtime.context
การจัดการเซสชัน
หยุดเซสชันแบบโต้ตอบ
แทนที่จะคลิกปุ่มหยุดด้วยตนเอง บางครั้งการหยุดเซสชันแบบโต้ตอบจะสะดวกกว่าโดยการเรียก API ในโค้ด ในกรณีดังกล่าว เรามี API notebookutils.session.stop()
เพื่อสนับสนุนการหยุดเซสชันแบบโต้ตอบผ่านโค้ด ซึ่งพร้อมใช้งานสําหรับ Scala และ PySpark
notebookutils.session.stop()
notebookutils.session.stop()
API จะหยุดเซสชันการโต้ตอบปัจจุบันแบบอะซิงโครนัสในพื้นหลัง ซึ่งจะหยุดเซสชัน Spark และปล่อยทรัพยากรที่ครอบครองโดยเซสชันเพื่อให้เซสชันอื่น ๆ ในกลุ่มเดียวกันพร้อมใช้งาน
รีสตาร์ทตัวแปลภาษา Python
notebookutils.session utility มีวิธีการรีสตาร์ทตัวแปลภาษา Python
notebookutils.session.restartPython()
หมายเหตุ
- ในกรณีการเรียกใช้การอ้างอิงสมุดบันทึก
restartPython()
จะรีสตาร์ตตัวแปล Python ของสมุดบันทึกปัจจุบันที่กําลังอ้างอิงอยู่เท่านั้น - ในบางกรณีคําสั่งอาจล้มเหลวเนื่องจากกลไกการสะท้อน Spark การเพิ่มลองใหม่สามารถบรรเทาปัญหาได้
ปัญหาที่ทราบแล้ว
เมื่อใช้รันไทม์เวอร์ชันที่สูงกว่า 1.2 และเรียกใช้
notebookutils.help()
, fabricClient ที่ระบุไว้ PBIClient API จะไม่ได้รับการรองรับสําหรับตอนนี้จะพร้อมใช้งานเพิ่มเติม นอกจากนี้ API ข้อมูลประจําตัว ไม่ได้รับการรองรับใน Scala notebook สําหรับตอนนี้สมุดบันทึก Python ไม่สนับสนุน หยุดrestartPython API เมื่อใช้ยูทิลิตี้ notebookutils.session สําหรับการจัดการเซสชัน