แชร์ผ่าน


NotebookUtils (เดิม MSSparkUtils) สําหรับ Fabric

Notebook Utilities (NotebookUtils) เป็นแพคเกจในตัวเพื่อช่วยให้คุณทํางานทั่วไปได้อย่างง่ายดายใน Fabric Notebook คุณสามารถใช้ NotebookUtils เพื่อทํางานกับระบบไฟล์ เพื่อรับตัวแปรสภาพแวดล้อม เพื่อเกี่ยวโยงสมุดบันทึกเข้าด้วยกัน และทํางานกับข้อมูลลับ แพคเกจ NotebookUtils มีให้ใช้งานใน PySpark (Python) Scala, SparkR notebooks และ Fabric pipelines

หมายเหตุ

  • MsSparkUtils ถูกเปลี่ยนชื่อ เป็น NotebookUtils อย่างเป็นทางการแล้ว โค้ดที่มีอยู่จะยังคง เข้ากัน ได้ย้อนกลับและจะไม่ทําให้เกิดการเปลี่ยนแปลงที่เสียหาย ขอแนะนําให้อัปเกรดเป็นสมุดบันทึกเพื่อให้แน่ใจว่าการสนับสนุนอย่างต่อเนื่องและการเข้าถึงคุณลักษณะใหม่ เนมสเปซ mssparkutils จะถูกยกเลิกในอนาคต
  • NotebookUtils ออกแบบมาเพื่อทํางานกับ Spark 3.4 (Runtime v1.2) และสูงกว่า ฟีเจอร์และการอัปเดตใหม่ทั้งหมดจะได้รับการสนับสนุนเฉพาะกับ namespace ของ notebookutils ในอนาคต

โปรแกรมอรรถประโยชน์ระบบไฟล์

notebookutils.fs ให้บริการสาธารณูปโภคสําหรับการทํางานกับระบบไฟล์ต่าง ๆ รวมถึง Azure Data Lake Storage (ADLS) Gen2 และ Azure Blob Storage ตรวจสอบให้แน่ใจว่าคุณได้กําหนดค่าการเข้าถึงไปยัง Azure Data Lake Storage รุ่น2 และ Azure Blob Storage อย่างเหมาะสม

เรียกใช้คําสั่งต่อไปนี้สําหรับภาพรวมของวิธีการที่พร้อมใช้งาน:

notebookutils.fs.help()

เอาท์พุท

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

NotebookUtils ทํางานกับระบบไฟล์ในลักษณะเดียวกับ Spark API ใช้ notebookutils.fs.mkdirs() และการใช้งาน Fabric lakehouse ตัวอย่างเช่น:

การใช้งาน เส้นทางสัมพัทธ์จากราก HDFS เส้นทางสัมบูรณ์สําหรับระบบไฟล์ ABFS เส้นทางสัมบูรณ์สําหรับระบบไฟล์ภายในเครื่องในโหนดโปรแกรมควบคุม
เลคเฮ้าส์ที่ไม่ได้เป็นค่าเริ่มต้น ไม่ได้รับการสนับสนุน notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
เลคเฮ้าส์ค่าเริ่มต้น ไดเรกทอรีภายใต้ "Files" หรือ "Tables": notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")

แสดงรายการไฟล์

หากต้องการแสดงรายการเนื้อหาของไดเรกทอรี ให้ใช้ notebookutils.fs.ls('เส้นทางไดเรกทอรีของคุณ') ตัวอย่างเช่น:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

API notebookutils.fs.ls() จะทํางานแตกต่างกันเมื่อใช้เส้นทางสัมพัทธ์ โดยขึ้นอยู่กับชนิดของสมุดบันทึก

  • ในสมุดบันทึก Spark : เส้นทางสัมพัทธ์จะสัมพันธ์กับเส้นทาง ABFSS ของ Lakehouse ตามค่าเริ่มต้น ตัวอย่างเช่น notebookutils.fs.ls("Files") ชี้ไปที่ไดเรกทอรี Files ในเลคเฮ้าส์เริ่มต้น

    ตัวอย่างเช่น:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • ในสมุดบันทึก Python : เส้นทางสัมพัทธ์จะสัมพันธ์กับไดเรกทอรีการทํางานของระบบไฟล์ภายในเครื่อง ซึ่งค่าเริ่มต้นคือ /home/trusted-service-user/work ดังนั้น คุณควรใช้เส้นทางแบบเต็มแทนที่จะเป็นเส้นทางสัมพัทธ์ notebookutils.fs.ls("/lakehouse/default/Files") ในการเข้าถึงไดเรกทอรี Files ในเลคเฮ้าส์เริ่มต้น

    ตัวอย่างเช่น:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

ดูคุณสมบัติไฟล์

วิธีการนี้จะส่งกลับคุณสมบัติไฟล์ รวมถึงชื่อไฟล์ เส้นทางไฟล์ ขนาดไฟล์ และอาจเป็นไดเรกทอรีและไฟล์หรือไม่

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

สร้างไดเรกทอรีใหม่

วิธีนี้จะสร้างไดเรกทอรีที่ระบุถ้าไม่มีอยู่ และสร้างไดเรกทอรีหลักใด ๆ ที่จําเป็น

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

คัดลอกไฟล์

วิธีนี้จะคัดลอกไฟล์หรือไดเรกทอรี และรองรับการคัดลอกกิจกรรมข้ามระบบไฟล์

notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

ไฟล์สําเนาที่มีประสิทธิภาพ

วิธีนี้เป็นวิธีที่มีประสิทธิภาพในการคัดลอกหรือย้ายไฟล์โดยเฉพาะอย่างยิ่งเมื่อทํางานกับข้อมูลขนาดใหญ่ เพื่อเพิ่มประสิทธิภาพการทํางานที่ดีขึ้นบน Fabric จะแนะนําให้ใช้fastcpแทนวิธีการแบบดั้งเดิมcp

หมายเหตุ

notebookutils.fs.fastcp() ไม่รองรับการคัดลอกไฟล์ใน OneLake ทั่วทั้งภูมิภาค ในกรณีนี้ คุณสามารถใช้ notebookutils.fs.cp() แทนได้

notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

แสดงตัวอย่างเนื้อหาไฟล์

วิธีนี้ส่งกลับถึงไบต์ 'maxBytes' แรกของไฟล์ที่ระบุเป็นสตริงที่เข้ารหัสลับใน UTF-8

notebookutils.fs.head('file path', maxBytes to read)

ย้ายไฟล์

วิธีนี้จะย้ายไฟล์หรือไดเรกทอรี และสนับสนุนการย้ายข้ามระบบไฟล์

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

เขียนไฟล์

วิธีนี้เขียนสตริงที่ระบุออกไปยังไฟล์เข้ารหัสใน UTF-8

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

ผนวกเนื้อหาไปยังไฟล์

เมธอดนี้ผนวกสตริงที่กําหนดไปยังไฟล์ที่เข้ารหัสไว้ใน UTF-8

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

หมายเหตุ

  • notebookutils.fs.append() และ notebookutils.fs.put() ไม่สนับสนุนการเขียนพร้อมกันในไฟล์เดียวกันเนื่องจากขาดการรับประกันความเป็นอะตอม
  • เมื่อใช้ notebookutils.fs.append API ในการ for วนรอบเพื่อเขียนไปยังไฟล์เดียวกัน เราขอแนะนําให้เพิ่ม sleep คําสั่งประมาณ 0.5s ~ 1s ระหว่างการเขียนที่เกิดซ้ํา ทั้งนี้เนื่องจาก notebookutils.fs.append การดําเนินการภายใน flush ของ API เป็นแบบอะซิงโครนัส ดังนั้นการหน่วงเวลาสั้น ๆ จะช่วยรับประกันความสมบูรณ์ของข้อมูล

ลบไฟล์หรือไดเรกทอรี

วิธีนี้จะลบไฟล์หรือไดเรกทอรีออก

notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

ไดเรกทอรีการต่อเชื่อม/ยกเลิกการต่อเชื่อม

ค้นหาข้อมูลเพิ่มเติมเกี่ยวกับการใช้งานโดยละเอียดในแฟ้มที่ต่อเชื่อมและยกเลิกการเปิด

อรรถประโยชน์สมุดบันทึก

ใช้อรรถประโยชน์สมุดบันทึกเพื่อเรียกใช้สมุดบันทึกหรือออกจากสมุดบันทึกด้วยค่า เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:

notebookutils.notebook.help()

ผลิตภัณฑ์:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.

[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

หมายเหตุ

อรรถประโยชน์ของสมุดบันทึกไม่สามารถใช้ได้กับข้อกําหนดงาน Apache Spark (SJD)

การอ้างอิงสมุดบันทึก

วิธีการนี้จะอ้างอิงสมุดบันทึกและส่งกลับค่าออกจากสมุดบันทึก คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้ สมุดบันทึกที่กําลังอ้างอิงถูกอ้างอิงจะทํางานบนพูล Spark ของสมุดบันทึกที่เรียกใช้ฟังก์ชันนี้

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

ตัวอย่างเช่น:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

โน้ตบุ๊คผ้ายังสนับสนุนการอ้างอิงสมุดบันทึกทั่วทั้งพื้นที่ทํางานโดยการระบุ ID พื้นที่ทํางาน

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

คุณสามารถเปิดลิงก์สแนปช็อตของการเรียกใช้การอ้างอิงในผลลัพธ์ของเซลล์ สแนปช็อตจะจับผลลัพธ์การเรียกใช้โค้ดและช่วยให้คุณสามารถดีบักการอ้างอิงได้อย่างง่ายดาย

สกรีนช็อตของผลลัพธ์การเรียกใช้การอ้างอิง

สกรีนช็อตของตัวอย่างสแนปช็อต

หมายเหตุ

  • สมุดบันทึกอ้างอิงข้ามพื้นที่ทํางานได้รับการสนับสนุนโดยรันไทม์เวอร์ชัน 1.2 และสูงกว่า
  • ถ้าคุณใช้ไฟล์ภายใต้ ทรัพยากรสมุดบันทึก ให้ใช้ notebookutils.nbResPath ในสมุดบันทึกอ้างอิงเพื่อตรวจสอบให้แน่ใจว่าจะชี้ไปยังโฟลเดอร์เดียวกันกับการเรียกใช้แบบโต้ตอบ

การอ้างอิงจะเรียกใช้สมุดบันทึกหลายเล่มพร้อมกัน

สำคัญ

คุณลักษณะนี้อยู่ในตัวอย่าง

วิธีนี้ notebookutils.notebook.runMultiple() ช่วยให้คุณสามารถเรียกใช้สมุดบันทึกหลายเล่มพร้อมกันหรือด้วยโครงสร้างโทโพโลยีที่กําหนดไว้ล่วงหน้า API กําลังใช้กลไกการใช้งานแบบหลายเธรดภายในเซสชัน Spark ซึ่งหมายความว่าสมุดบันทึกอ้างอิงจะเรียกใช้การแชร์ทรัพยากรการคํานวณ

ด้วย notebookutils.notebook.runMultiple()คุณสามารถ:

  • ใช้สมุดบันทึกหลายเล่มพร้อมกันโดยไม่ต้องรอให้แต่ละสมุดบันทึกเสร็จสิ้น

  • ระบุการขึ้นต่อกันและลําดับการดําเนินการสําหรับสมุดบันทึกของคุณ โดยใช้รูปแบบ JSON แบบง่าย

  • ปรับการใช้ทรัพยากรการคํานวณ Spark ให้เหมาะสมและลดค่าใช้จ่ายของโครงการ Fabric ของคุณ

  • ดูสแนปช็อตของแต่ละบันทึกการเรียกใช้สมุดบันทึกในผลลัพธ์ และแก้จุดบกพร่อง/ตรวจสอบงานสมุดบันทึกของคุณอย่างสะดวก

  • รับค่าออกจากแต่ละกิจกรรมของผู้บริหาร และใช้ในงานปลายทาง

คุณยังสามารถลองเรียกใช้ notebookutils.notebook.help("runMultiple") เพื่อค้นหาตัวอย่างและการใช้งานโดยละเอียด

นี่คือตัวอย่างง่าย ๆ ของการเรียกใช้รายการของสมุดบันทึกในแบบขนานโดยใช้วิธีนี้:


notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:

ภาพหน้าจอของการอ้างอิงรายการสมุดบันทึก

ต่อไปนี้คือตัวอย่างของการเรียกใช้สมุดบันทึกที่มีโครงสร้าง notebookutils.notebook.runMultiple()ด้านบนโดยใช้ ใช้วิธีนี้เพื่อปรับโน้ตบุ๊กผ่านประสบการณ์การใช้รหัสได้อย่างง่ายดาย

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

ผลลัพธ์การดําเนินการจากสมุดบันทึกรากมีดังนี้:

สกรีนช็อตของรายการอ้างอิงของสมุดบันทึกที่มีพารามิเตอร์

นอกจากนี้เรายังมีวิธีการตรวจสอบว่ามีการกําหนด DAG อย่างถูกต้องหรือไม่

notebookutils.notebook.validateDAG(DAG)

หมายเหตุ

  • ระดับการทํางานแบบขนานของการเรียกใช้สมุดบันทึกหลายรายการถูกจํากัดไว้ที่ทรัพยากรการคํานวณที่พร้อมใช้งานทั้งหมดของเซสชัน Spark
  • ขีดจํากัดสูงสุดสําหรับกิจกรรมสมุดบันทึกหรือสมุดบันทึกพร้อมกันคือ 50 การใช้งานเกินขีดจํากัดนี้อาจนําไปสู่ความเสถียรและปัญหาด้านประสิทธิภาพเนื่องจากการใช้ทรัพยากรการคํานวณสูง ถ้ามีปัญหาเกิดขึ้น ให้ลองแยกสมุดบันทึกออกเป็นการโทรหลาย runMultiple รายการ หรือลดภาวะพร้อมกันโดยการปรับ เขตข้อมูลภาวะ พร้อมกันในพารามิเตอร์ DAG
  • การหมดเวลาเริ่มต้นสําหรับ DAG ทั้งหมดคือ 12 ชั่วโมง และการหมดเวลาเริ่มต้นสําหรับแต่ละเซลล์ในสมุดบันทึกย่อยคือ 90 วินาที คุณสามารถเปลี่ยนการหมดเวลาได้โดยการตั้งค่า เขตข้อมูล timeoutInSeconds และ timeoutPerCellInSeconds ในพารามิเตอร์ DAG ได้

ออกจากสมุดบันทึก

วิธีนี้จะออกจากสมุดบันทึกที่มีค่า คุณสามารถเรียกใช้ฟังก์ชันการซ้อนกันในสมุดบันทึกแบบโต้ตอบหรือในไปป์ไลน์ได้

  • เมื่อคุณเรียกใช้ ฟังก์ชัน exit() จากสมุดบันทึกแบบโต้ตอบ สมุดบันทึก Fabric จะแสดงข้อยกเว้น ข้ามการเรียกใช้เซลล์ที่ตามมา และทําให้เซสชัน Spark มีชีวิต

  • เมื่อคุณแก้ไขสมุดบันทึกในไปป์ไลน์ที่เรียกใช้ ฟังก์ชัน exit() กิจกรรมสมุดบันทึกจะส่งกลับด้วยค่าการออก ทําให้การทํางานของไปป์ไลน์เสร็จสมบูรณ์ และหยุดเซสชัน Spark

  • เมื่อคุณเรียกใช้ฟังก์ชัน exit() ในสมุดบันทึกที่กําลังอ้างอิง Fabric Spark จะหยุดการดําเนินการเพิ่มเติมของสมุดบันทึกที่อ้างอิง และดําเนินการต่อเพื่อเรียกใช้เซลล์ถัดไปในสมุดบันทึกหลักที่เรียกใช้ฟังก์ชัน run() ตัวอย่างเช่น: Notebook1 มีเซลล์สามเซลล์และเรียก ฟังก์ชัน exit() ในเซลล์ที่สอง Notebook2 มีเซลล์ห้าเซลล์และเรียกฟังก์ชัน run(notebook1) ในเซลล์ที่สาม เมื่อคุณเรียกใช้ Notebook2 Notebook2 Notebook1 จะหยุดที่เซลล์ที่สองเมื่อกดฟังก์ชัน exit() Notebook2 ยังคงเรียกใช้เซลล์ที่สี่และเซลล์ที่ห้า

notebookutils.notebook.exit("value string")

หมายเหตุ

ฟังก์ชัน exit() จะเขียนทับเอาต์พุตเซลล์ปัจจุบันเพื่อหลีกเลี่ยงการสูญเสียเอาต์พุตของคําสั่งรหัสอื่น ๆ โปรดเรียกใช้ notebookutils.notebook.exit() ในเซลล์แยกต่างหาก

ตัวอย่างเช่น:

ตัวอย่าง 1 สมุดบันทึกที่มีเซลล์สองเซลล์ต่อไปนี้:

  • Cell 1 กําหนดพารามิเตอร์การ ป้อนข้อมูล ด้วยค่าเริ่มต้นที่ตั้งค่าเป็น 10

  • เซลล์ 2 ออกจากสมุดบันทึกที่มี ข้อมูลป้อนเข้า เป็นค่าออก

สกรีนช็อตที่แสดงสมุดบันทึกตัวอย่างของฟังก์ชันจบการทํางาน

คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่นด้วยค่าเริ่มต้น:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

ผลิตภัณฑ์:

Notebook is executed successfully with exit value 10

คุณสามารถเรียกใช้ Sample1 ในสมุดบันทึกอื่น และตั้งค่าการป้อนข้อมูลเป็น 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

ผลิตภัณฑ์:

Notebook is executed successfully with exit value 20

จัดการสิ่งประดิษฐ์สมุดบันทึก

notebookutils.notebook มีโปรแกรมอรรถประโยชน์พิเศษสําหรับการจัดการรายการสมุดบันทึกทางโปรแกรม API เหล่านี้สามารถช่วยให้คุณสร้าง รับ อัปเดต และลบรายการสมุดบันทึกได้อย่างง่ายดาย

หากต้องการใช้วิธีการเหล่านี้อย่างมีประสิทธิภาพ ให้พิจารณาตัวอย่างการใช้งานต่อไปนี้:

การสร้างสมุดบันทึก

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

การรับเนื้อหาของสมุดบันทึก

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

การปรับปรุงสมุดบันทึก

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

การลบสมุดบันทึก

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

การแสดงรายการสมุดบันทึกในพื้นที่ทํางาน

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

อรรถประโยชน์ข้อมูลประจําตัว

คุณสามารถใช้ยูทิลิตี้ข้อมูลประจําตัวเพื่อรับโทเค็นการเข้าถึงและจัดการความลับใน Azure Key Vault ได้

เรียกใช้คําสั่งต่อไปนี้เพื่อดูภาพรวมของวิธีการที่พร้อมใช้งาน:

notebookutils.credentials.help()

ผลิตภัณฑ์:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

รับโทเค็น

getToken จะส่งกลับโทเค็น Microsoft Entra สําหรับผู้ชมและชื่อที่กําหนด (ไม่บังคับ) รายการต่อไปนี้แสดงคีย์ผู้ชมที่พร้อมใช้งานในปัจจุบัน:

  • ทรัพยากรผู้ชมที่เก็บข้อมูล: "ที่เก็บข้อมูล"
  • ทรัพยากร Power BI: "pbi"
  • ทรัพยากรชุดเก็บคีย์ของ Azure: "keyvault"
  • ทรัพยากร Synapse RTA KQL DB: "kusto"

เรียกใช้คําสั่งต่อไปนี้เพื่อรับโทเค็น:

notebookutils.credentials.getToken('audience Key')

รับข้อมูลลับโดยใช้ข้อมูลประจําตัวผู้ใช้

getSecret ส่งกลับข้อมูลลับชุดเก็บคีย์ของ Azure สําหรับจุดสิ้นสุด Azure Key Vault และชื่อลับที่กําหนดโดยใช้ข้อมูลประจําตัวของผู้ใช้

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

การต่อเชื่อมไฟล์และยกเลิกการเปิดไฟล์

Fabric รองรับสถานการณ์การเมาท์ต่อไปนี้ในแพคเกจ Microsoft Spark Utilities คุณสามารถใช้เมาท์ unmount getMountPath() และ mounts() API เพื่อแนบพื้นที่เก็บข้อมูลระยะไกล (ADLS Gen2) กับโหนดการทํางานทั้งหมด (โหนดไดรเวอร์และโหนดผู้ปฏิบัติงาน) หลังจากจุดต่อเชื่อมที่เก็บข้อมูลอยู่ในตําแหน่งให้ใช้ API ไฟล์ภายในเครื่องเพื่อเข้าถึงข้อมูลราวกับว่าจัดเก็บไว้ในระบบไฟล์ภายใน

วิธีการต่อเชื่อมบัญชี ADLS Gen2

ตัวอย่างต่อไปนี้แสดงให้เห็นวิธีการกําหนดใช้ Azure Data Lake Storage รุ่น2 การติดตั้งที่เก็บข้อมูล Blob ในทํานองเดียวกัน

ตัวอย่างนี้ถือว่า คุณมีบัญชี Data Lake Storage รุ่น2 หนึ่งบัญชีที่ชื่อว่า storegen2 และบัญชีมีคอนเทนเนอร์หนึ่งชื่อว่า mycontainer ที่คุณต้องการติดตั้ง /test ลงในเซสชัน Spark ของสมุดบันทึกของคุณ

สกรีนช็อตแสดงตําแหน่งที่จะเลือกคอนเทนเนอร์ที่จะต่อเชื่อม

ในการติดภาชนะที่เรียกว่า mycontainer, notebookutils ก่อนจําเป็นต้องตรวจสอบว่าคุณมีสิทธิ์ในการเข้าถึงคอนเทนเนอร์หรือไม่ ในขณะนี้ Fabric สนับสนุนวิธีการรับรองความถูกต้องสองวิธีสําหรับการดําเนินการเมานท์ทริกเกอร์: accountKey และ sastoken

เมานท์ผ่านโทเค็นลายเซ็นสําหรับการเข้าถึงที่ใช้ร่วมกันหรือคีย์บัญชี

NotebookUtils สนับสนุนอย่างชัดเจนผ่านคีย์บัญชีหรือ โทเค็นลายเซ็นการเข้าถึงที่ใช้ร่วมกัน (SAS) เป็นพารามิเตอร์เพื่อต่อเชื่อมเป้าหมาย

เพื่อเหตุผลด้านความปลอดภัย เราขอแนะนําให้คุณจัดเก็บคีย์บัญชีหรือโทเค็น SAS ใน Azure Key Vault (ตามสกรีนช็อตต่อไปนี้แสดง) จากนั้นคุณสามารถเรียกใช้ได้โดยใช้ notebookutils.credentials.getSecret API สําหรับข้อมูลเพิ่มเติมเกี่ยวกับ Azure Key Vault โปรดดู เกี่ยวกับคีย์บัญชีที่จัดการโดย Azure Key Vault

สกรีนช็อตแสดงตําแหน่งที่จัดเก็บข้อมูลลับในชุดเก็บคีย์ของ Azure

รหัสตัวอย่างสําหรับ เมธอด accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

โค้ดตัวอย่างสําหรับ sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

ติดพารามิเตอร์:

  • fileCacheTimeout: Blobs จะถูกแคชในโฟลเดอร์ temp ภายในเครื่องเป็นเวลา 120 วินาทีตามค่าเริ่มต้น ในช่วงเวลานี้ blobfuse ไม่ได้ตรวจสอบว่าไฟล์นั้นเป็นไฟล์ล่าสุดหรือไม่ สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อไคลเอ็นต์หลายไคลเอ็นต์ปรับเปลี่ยนแฟ้มในเวลาเดียวกัน เพื่อหลีกเลี่ยงความไม่สอดคล้องกันระหว่างแฟ้มภายในเครื่องและระยะไกล เราขอแนะนําให้ย่อเวลาแคช หรือแม้กระทั่งเปลี่ยนเป็น 0 และรับไฟล์ล่าสุดจากเซิร์ฟเวอร์เสมอ
  • หมดเวลา: การหมดเวลาของการดําเนินการเมาท์คือ 120 วินาทีตามค่าเริ่มต้น สามารถตั้งค่าพารามิเตอร์เพื่อเปลี่ยนการหมดเวลาเริ่มต้นได้ เมื่อมีผู้ปฏิบัติการมากเกินไปหรือเมื่อหมดเวลาในการติดเราขอแนะนําให้เพิ่มค่า

คุณสามารถใช้พารามิเตอร์เหล่านี้ได้ดังนี้:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

หมายเหตุ

เพื่อวัตถุประสงค์ด้านความปลอดภัย ขอแนะนําให้หลีกเลี่ยงการฝังข้อมูลประจําตัวโดยตรงในโค้ด เพื่อปกป้องข้อมูลประจําตัวของคุณเพิ่มเติม ข้อมูลลับใด ๆ ที่แสดงในผลลัพธ์สมุดบันทึกจะเปลี่ยนแปลง สําหรับข้อมูลเพิ่มเติม ให้ดู การปฏิกริยาลับ

วิธีติดตั้งเลคเฮ้าส์

รหัสตัวอย่างสําหรับการติดตั้งเลคเฮ้าส์ไปยัง /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

เข้าถึงไฟล์ภายใต้จุดต่อเชื่อมโดยใช้ notebookutils fs API

วัตถุประสงค์หลักของการดําเนินการติดตั้งคือการอนุญาตให้ลูกค้าเข้าถึงข้อมูลที่จัดเก็บไว้ในบัญชีเก็บข้อมูลระยะไกลด้วย API ระบบไฟล์ภายในเครื่อง คุณยังสามารถเข้าถึงข้อมูลโดยใช้ notebookutils fs API ด้วยเส้นทางที่ติดอยู่เป็นพารามิเตอร์ได้ รูปแบบเส้นทางนี้จะแตกต่างกันเล็กน้อย

สมมติว่าคุณได้ติดตั้งคอนเทนเนอร์ Data Lake Storage รุ่น2 mycontainer to /test โดยใช้ api การต่อเชื่อม เมื่อคุณเข้าถึงข้อมูลด้วย API ระบบไฟล์ภายใน รูปแบบเส้นทางจะมีลักษณะดังนี้:

/synfs/notebook/{sessionId}/test/{filename}

เมื่อคุณต้องการเข้าถึงข้อมูลโดยใช้ notebookutils fs API เราขอแนะนําให้ใช้ getMountPath() เพื่อให้ได้เส้นทางที่ถูกต้อง:

path = notebookutils.fs.getMountPath("/test")
  • ไดเรกทอรีรายการ:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • อ่านเนื้อหาไฟล์:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • สร้างไดเรกทอรี:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

เข้าถึงไฟล์ภายใต้จุดยึดผ่านเส้นทางท้องถิ่น

คุณสามารถอ่านและเขียนไฟล์ได้อย่างง่ายดายในจุดต่อเชื่อมโดยใช้ระบบไฟล์มาตรฐาน นี่คือตัวอย่าง Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

วิธีการตรวจสอบจุดยึดที่มีอยู่

คุณสามารถใช้ notebookutils.fs.mounts() API เพื่อตรวจสอบข้อมูลจุดต่อเชื่อมที่มีอยู่ทั้งหมด:

notebookutils.fs.mounts()

วิธีการยกเลิกการต่อเชื่อมจุดยึด

ใช้รหัสต่อไปนี้เพื่อยกเลิกการต่อเชื่อมจุด ยึดของคุณ (/ทดสอบ ในตัวอย่างนี้):

notebookutils.fs.unmount("/test")

ข้อจำกัดที่ทราบ

  • การติดปัจจุบันคือการกําหนดค่าระดับงาน เราขอแนะนําให้คุณใช้เ มาท์ API เพื่อตรวจสอบว่ามีจุดต่อเชื่อมอยู่หรือไม่พร้อมใช้งาน

  • กลไกยกเลิกการเมาท์จะไม่ถูกนําไปใช้โดยอัตโนมัติ เมื่อแอปพลิเคชันทํางานเสร็จสิ้นหากต้องการยกเลิกเมาท์จุดและปล่อยพื้นที่ดิสก์ คุณต้องเรียกใช้ API ที่ไม่ได้ลงทะเบียนในโค้ดของคุณอย่างชัดเจน มิฉะนั้นจุดยึดจะยังคงอยู่ในโหนดหลังจากการทํางานของแอปพลิเคชันเสร็จสิ้น

  • ไม่รองรับการต่อเชื่อมบัญชีที่เก็บข้อมูล ADLS Gen1

สาธารณูปโภคของเลคเฮ้าส์

notebookutils.lakehouse ให้บริการสาธารณูปโภคที่ออกแบบมาสําหรับการจัดการรายการในเลคเฮ้าส์ สาธารณูปโภคเหล่านี้ช่วยให้คุณสามารถสร้าง รับ อัปเดต และลบวัตถุของเลคเฮ้าส์ได้อย่างง่ายดาย

ภาพรวมของวิธีการ

ด้านล่างคือภาพรวมของวิธีการที่พร้อมใช้งานซึ่งจัดหาโดย notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

ตัวอย่างการใช้

หากต้องการใช้วิธีการเหล่านี้อย่างมีประสิทธิภาพ ให้พิจารณาตัวอย่างการใช้งานต่อไปนี้:

การสร้างเลคเฮ้าส์

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

การเข้าเลคเฮ้าส์

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

ปรับปรุงเลคเฮ้าส์

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

การลบเลคเฮ้าส์

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

รายชื่อเลคเฮ้าส์ในพื้นที่ทํางาน

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

แสดงรายการตารางทั้งหมดในเลคเฮ้าส์

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

เริ่มต้นการดําเนินการโหลดตารางในเลคเฮ้าส์

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

ข้อมูลเพิ่มเติม

สําหรับข้อมูลรายละเอียดเพิ่มเติมเกี่ยวกับแต่ละวิธีและพารามิเตอร์ ให้ใช้ notebookutils.lakehouse.help("methodName") ฟังก์ชัน

โปรแกรมอรรถประโยชน์รันไทม์

แสดงข้อมูลบริบทเซสชัน

ด้วย notebookutils.runtime.context คุณสามารถรับข้อมูลบริบทของเซสชันสดปัจจุบัน รวมถึงชื่อสมุดบันทึก ค่าเริ่มต้นของ lakehouse ข้อมูลพื้นที่ทํางาน ถ้าเป็นไปป์ไลน์ที่เรียกใช้ ฯลฯ

notebookutils.runtime.context

การจัดการเซสชัน

หยุดเซสชันแบบโต้ตอบ

แทนที่จะคลิกปุ่มหยุดด้วยตนเอง บางครั้งการหยุดเซสชันแบบโต้ตอบจะสะดวกกว่าโดยการเรียก API ในโค้ด ในกรณีดังกล่าว เรามี API notebookutils.session.stop() เพื่อสนับสนุนการหยุดเซสชันแบบโต้ตอบผ่านโค้ด ซึ่งพร้อมใช้งานสําหรับ Scala และ PySpark

notebookutils.session.stop()

notebookutils.session.stop() API จะหยุดเซสชันการโต้ตอบปัจจุบันแบบอะซิงโครนัสในพื้นหลัง ซึ่งจะหยุดเซสชัน Spark และปล่อยทรัพยากรที่ครอบครองโดยเซสชันเพื่อให้เซสชันอื่น ๆ ในกลุ่มเดียวกันพร้อมใช้งาน

รีสตาร์ทตัวแปลภาษา Python

notebookutils.session utility มีวิธีการรีสตาร์ทตัวแปลภาษา Python

notebookutils.session.restartPython()

หมายเหตุ

  • ในกรณีการเรียกใช้การอ้างอิงสมุดบันทึก restartPython() จะรีสตาร์ตตัวแปล Python ของสมุดบันทึกปัจจุบันที่กําลังอ้างอิงอยู่เท่านั้น
  • ในบางกรณีคําสั่งอาจล้มเหลวเนื่องจากกลไกการสะท้อน Spark การเพิ่มลองใหม่สามารถบรรเทาปัญหาได้

ปัญหาที่ทราบแล้ว

  • เมื่อใช้รันไทม์เวอร์ชันที่สูงกว่า 1.2 และเรียกใช้ notebookutils.help(), fabricClient ที่ระบุไว้ PBIClient API จะไม่ได้รับการรองรับสําหรับตอนนี้จะพร้อมใช้งานเพิ่มเติม นอกจากนี้ API ข้อมูลประจําตัว ไม่ได้รับการรองรับใน Scala notebook สําหรับตอนนี้

  • สมุดบันทึก Python ไม่สนับสนุน หยุดrestartPython API เมื่อใช้ยูทิลิตี้ notebookutils.session สําหรับการจัดการเซสชัน