Κοινή χρήση μέσω


NotebookUtils (πρώην MSSparkUtils) για Fabric

Τα Βοηθητικά προγράμματα σημειωματάριου (NotebookUtils) είναι ένα ενσωματωμένο πακέτο που σας βοηθά να εκτελείτε εύκολα συνήθεις εργασίες στο Fabric Notebook. Μπορείτε να χρησιμοποιήσετε το NotebookUtils για να εργαστείτε με συστήματα αρχείων, για να λάβετε μεταβλητές περιβάλλοντος, να αλληλουχήσετε σημειωματάρια και να εργαστείτε με μυστικά. Το πακέτο NotebookUtils είναι διαθέσιμο στη Scala PySpark (Python), στα σημειωματάρια SparkR και στις διοχετεύσεις Fabric.

Σημείωμα

  • Το MsSparkUtils μετονομάζεται επίσημα σε NotebookUtils. Ο υπάρχων κώδικας παραμένει συμβατός με προηγούμενες εκδόσεις και δεν θα προκαλέσει νέες αλλαγές. Συνιστάται ιδιαίτερα η αναβάθμιση σε notebookutils για να εξασφαλιστεί η συνεχής υποστήριξη και πρόσβαση σε νέες δυνατότητες. Ο χώρος ονομάτων mssparkutils θα αποσυρθεί στο μέλλον.
  • Το NotebookUtils έχει σχεδιαστεί για χρήση με το Spark 3.4(Runtime v1.2) και νεότερες εκδόσεις. Όλες οι νέες δυνατότητες και ενημερώσεις υποστηρίζονται αποκλειστικά με τον χώρο ονομάτων σημειωματάριων στο μέλλον.

Βοηθητικά προγράμματα συστήματος αρχείων

Το notebookutils.fs παρέχει βοηθητικά προγράμματα για εργασία με διάφορα συστήματα αρχείων, συμπεριλαμβανομένων των Azure Data Lake Storage (ADLS) Gen2 και Azure Blob Storage. Βεβαιωθείτε ότι έχετε ρυθμίσει κατάλληλα τις παραμέτρους πρόσβασης στα Azure Data Lake Storage Gen2 και Azure Blob Storage .

Εκτελέστε τις ακόλουθες εντολές για μια επισκόπηση των διαθέσιμων μεθόδων:

notebookutils.fs.help()

Έξοδος

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

Το NotebookUtils λειτουργεί με το σύστημα αρχείων με τον ίδιο τρόπο όπως τα Spark API. Για παράδειγμα, πάρτε τη χρήση των notebookutils.fs.mkdirs() και Fabric lakehouse:

Χρήση Σχετική διαδρομή από ρίζα HDFS Απόλυτη διαδρομή για το σύστημα αρχείων ABFS Απόλυτη διαδρομή για το τοπικό σύστημα αρχείων στον κόμβο προγράμματος οδήγησης
Μη προεπιλεγμένη λίμνη Δεν υποστηρίζεται notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
Προεπιλεγμένη λίμνη Κατάλογος στην περιοχή "Αρχεία" ή "Πίνακες": notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")

Λίστα αρχείων

Για να καταχωρήσετε το περιεχόμενο ενός καταλόγου, χρησιμοποιήστε notebookutils.fs.ls('Η διαδρομή του καταλόγου σας'). Για παράδειγμα:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

Το notebookutils.fs.ls() API συμπεριφέρεται διαφορετικά όταν χρησιμοποιείτε σχετική διαδρομή, ανάλογα με τον τύπο του σημειωματάριου.

  • Σε ένα σημειωματάριο Spark: Η σχετική διαδρομή είναι σχετική με την προεπιλεγμένη διαδρομή ABFSS του Lakehouse. Για παράδειγμα, notebookutils.fs.ls("Files") δείχνει στον κατάλογο Files στον προεπιλεγμένο κατάλογο Lakehouse.

    Για παράδειγμα:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • Σε ένα σημειωματάριο Python: Η σχετική διαδρομή είναι σχετική με τον κατάλογο εργασίας του τοπικού συστήματος αρχείων, ο οποίος από προεπιλογή είναι /home/trusted-service-user/work. Επομένως, θα πρέπει να χρησιμοποιήσετε την πλήρη διαδρομή αντί για μια σχετική διαδρομή notebookutils.fs.ls("/lakehouse/default/Files") για πρόσβαση στον Files κατάλογο στο προεπιλεγμένο Lakehouse.

    Για παράδειγμα:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Προβολή ιδιοτήτων αρχείου

Αυτή η μέθοδος επιστρέφει ιδιότητες αρχείου όπως το όνομα αρχείου, τη διαδρομή αρχείου, το μέγεθος αρχείου και εάν πρόκειται για κατάλογο και αρχείο.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Δημιουργία νέου καταλόγου

Αυτή η μέθοδος δημιουργεί τον δεδομένο κατάλογο εάν δεν υπάρχει και δημιουργεί τυχόν απαραίτητους γονικοί κατάλογοι.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Αντιγραφή αρχείου

Αυτή η μέθοδος αντιγράφει ένα αρχείο ή έναν κατάλογο και υποστηρίζει αντιγραφή δραστηριότητας μεταξύ συστημάτων αρχείων.

notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Σημείωμα

Λόγω των περιορισμών της συντόμευσης OneLake, όταν πρέπει να χρησιμοποιήσετε notebookutils.fs.cp() για να αντιγράψετε δεδομένα από τη συντόμευση τύπου S3/GCS, συνιστάται να χρησιμοποιήσετε μια έφιππη διαδρομή αντί για μια διαδρομή abfss.

Εκτέλεση αρχείου αντιγραφής

Αυτή η μέθοδος προσφέρει μια πιο αποτελεσματική προσέγγιση για την αντιγραφή ή τη μετακίνηση αρχείων, ιδιαίτερα όταν αντιμετωπίζετε μεγάλους όγκους δεδομένων. Για βελτιωμένες επιδόσεις στο Fabric, συνιστάται να χρησιμοποιήσετε fastcp ως υποκατάστατο της παραδοσιακής cp μεθόδου.

Σημείωμα

  • notebookutils.fs.fastcp() Η δεν υποστηρίζει την αντιγραφή αρχείων στο OneLake σε όλες τις περιοχές. Σε αυτήν την περίπτωση, μπορείτε να χρησιμοποιήσετε τη συνάρτηση notebookutils.fs.cp() αντί για αυτό.
  • Λόγω των περιορισμών της συντόμευσης OneLake, όταν πρέπει να χρησιμοποιήσετε notebookutils.fs.fastcp() για να αντιγράψετε δεδομένα από τη συντόμευση τύπου S3/GCS, συνιστάται να χρησιμοποιήσετε μια έφιππη διαδρομή αντί για μια διαδρομή abfss.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Προεπισκόπηση περιεχομένου αρχείου

Αυτή η μέθοδος επιστρέφει έως τα πρώτα byte "maxBytes" του δεδομένου αρχείου ως Συμβολοσειρά κωδικοποιημένη σε UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Μετακίνηση αρχείων

Αυτή η μέθοδος μετακινεί ένα αρχείο ή κατάλογο και υποστηρίζει κινήσεις μεταξύ συστημάτων αρχείων.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Εγγραφή αρχείου

Αυτή η μέθοδος εγγράφει τη δεδομένη συμβολοσειρά σε ένα αρχείο, κωδικοποιημένη σε UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Προσάρτηση περιεχομένου σε ένα αρχείο

Αυτή η μέθοδος προσαρτά τη δεδομένη συμβολοσειρά σε ένα αρχείο, κωδικοποιημένο σε UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Σημείωμα

  • notebookutils.fs.append() και notebookutils.fs.put() δεν υποστηρίζουν ταυτόχρονη εγγραφή στο ίδιο αρχείο λόγω έλλειψης εγγυήσεων ατομικής λειτουργίας.
  • Όταν χρησιμοποιείτε το notebookutils.fs.append API σε έναν βρόχο for για να γράψετε στο ίδιο αρχείο, συνιστούμε να προσθέσετε μια πρόταση sleep περίπου 0,5s ~ 1s μεταξύ των επαναλαμβανόμενων εγγραφών. Αυτή η πρόταση οφείλεται στο γεγονός ότι η εσωτερική λειτουργία notebookutils.fs.append του flush API είναι ασύγχρονη, επομένως, μια μικρή καθυστέρηση συμβάλλει στη διασφάλιση της ακεραιότητας των δεδομένων.

Διαγραφή αρχείου ή καταλόγου

Αυτή η μέθοδος καταργεί ένα αρχείο ή κατάλογο.

notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Κατάλογος μονταρίσματος/κατάργησης μονταρίσματος

Βρείτε περισσότερες πληροφορίες σχετικά με τη λεπτομερή χρήση στο θέμα Μοντάρετε αρχείο και κατάργησηmount.

Βοηθητικά προγράμματα σημειωματάριου

Χρησιμοποιήστε τα Βοηθητικά προγράμματα σημειωματάριου για να εκτελέσετε ένα σημειωματάριο ή κλείστε ένα σημειωματάριο με μια τιμή. Εκτελέστε την ακόλουθη εντολή για να δείτε μια επισκόπηση των διαθέσιμων μεθόδων:

notebookutils.notebook.help()

Απόδοση:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.

[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Σημείωμα

Τα βοηθητικά προγράμματα σημειωματάριου δεν ισχύουν για τους ορισμούς εργασίας Apache Spark (SJD).

Αναφορά σημειωματάριου

Αυτή η μέθοδος αναφέρεται σε ένα σημειωματάριο και επιστρέφει την τιμή εξόδου του. Μπορείτε να εκτελέσετε κλήσεις συνάρτησης ένθεσης σε ένα σημειωματάριο με αλληλεπιδραστικό τρόπο ή σε μια διοχέτευση. Το σημειωματάριο στο οποίο γίνεται αναφορά εκτελείται στον χώρο συγκέντρωσης Spark του σημειωματάριου που καλεί αυτήν τη συνάρτηση.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Για παράδειγμα:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Το σημειωματάριο Fabric υποστηρίζει επίσης την αναφορά σημειωματάριων σε πολλούς χώρους εργασίας καθορίζοντας το αναγνωριστικό χώρου εργασίας.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Μπορείτε να ανοίξετε τη σύνδεση στιγμιότυπου της εκτέλεσης αναφοράς στην έξοδο κελιού. Το στιγμιότυπο καταγράφει τα αποτελέσματα εκτέλεσης κώδικα και σας επιτρέπει να εντοπίσετε εύκολα μια εκτέλεση αναφοράς.

Στιγμιότυπο οθόνης του αποτελέσματος εκτέλεσης αναφοράς.

Στιγμιότυπο οθόνης ενός παραδείγματος στιγμιότυπου αναφοράς.

Σημείωμα

  • Το σημειωματάριο αναφοράς μεταξύ χώρων εργασίας υποστηρίζεται από την έκδοση 1.2 του περιβάλλοντος εκτέλεσης και νεότερες εκδόσεις.
  • Εάν χρησιμοποιείτε τα αρχεία στην περιοχή Πόρος σημειωματάριου, χρησιμοποιήστε notebookutils.nbResPath το στο σημειωματάριο στο οποίο γίνεται αναφορά για να βεβαιωθείτε ότι δείχνουν στον ίδιο φάκελο με την εκτέλεση αλληλεπίδρασης.

Αναφορά εκτέλεσης πολλών σημειωματάριων παράλληλα

Σημαντικό

Αυτή η δυνατότητα είναι σε προεπισκόπηση.

Η μέθοδος notebookutils.notebook.runMultiple() σάς επιτρέπει να εκτελείτε πολλά σημειωματάρια παράλληλα ή με μια προκαθορισμένη τοπολογική δομή. Το API χρησιμοποιεί έναν μηχανισμό υλοποίησης πολλών νημάτων εντός μιας περιόδου λειτουργίας spark, το οποίο σημαίνει ότι οι εκτελέσεις σημειωματάριου αναφοράς μοιράζονται τους υπολογιστικούς πόρους.

Με notebookutils.notebook.runMultiple()το , μπορείτε να κάνετε τα εξής:

  • Εκτελέστε πολλά σημειωματάρια ταυτόχρονα, χωρίς να πρέπει να ολοκληρωθεί το καθένα.

  • Καθορίστε τις εξαρτήσεις και τη σειρά εκτέλεσης για τα σημειωματάριά σας, χρησιμοποιώντας μια απλή μορφή JSON.

  • Βελτιστοποιήστε τη χρήση των υπολογιστικών πόρων Spark και μειώστε το κόστος των έργων σας Fabric.

  • Προβάλετε τα Στιγμιότυπα κάθε εγγραφής εκτέλεσης σημειωματάριου στην έξοδο και εντοπίστε εύκολα τις εργασίες του σημειωματάριου/παρακολουθήστε τις εργασίες σας.

  • Λάβετε την αξία εξόδου κάθε εκτελεστικής δραστηριότητας και χρησιμοποιήστε τις σε εργασίες κατάντη.

Μπορείτε επίσης να δοκιμάσετε να εκτελέσετε το notebookutils.notebook.help("runMultiple") για να βρείτε το παράδειγμα και τη λεπτομερή χρήση.

Ακολουθεί ένα απλό παράδειγμα εκτέλεσης μιας λίστας σημειωματάριων παράλληλα με χρήση αυτής της μεθόδου:


notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Το αποτέλεσμα εκτέλεσης από το ριζικό σημειωματάριο είναι το εξής:

Στιγμιότυπο οθόνης αναφοράς μιας λίστας σημειωματάριων.

Ακολουθεί ένα παράδειγμα εκτέλεσης σημειωματάριων με τοπολογική δομή που χρησιμοποιεί notebookutils.notebook.runMultiple(). Χρησιμοποιήστε αυτήν τη μέθοδο για να οργανώσετε εύκολα σημειωματάρια μέσω μιας εμπειρίας κώδικα.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Το αποτέλεσμα εκτέλεσης από το ριζικό σημειωματάριο είναι το εξής:

Στιγμιότυπο οθόνης αναφοράς μιας λίστας σημειωματάριων με παραμέτρους.

Παρέχουμε επίσης μια μέθοδο για να ελέγξετε εάν το DAG έχει οριστεί σωστά.

notebookutils.notebook.validateDAG(DAG)

Σημείωμα

  • Ο βαθμός παραλληλισμού της εκτέλεσης πολλαπλού σημειωματάριου περιορίζεται στον συνολικό διαθέσιμο υπολογιστικό πόρο μιας περιόδου λειτουργίας Spark.
  • Το ανώτατο όριο για δραστηριότητες σημειωματάριου ή ταυτόχρονα σημειωματάρια είναι 50. Η υπέρβαση αυτού του ορίου μπορεί να οδηγήσει σε προβλήματα σταθερότητας και επιδόσεων λόγω υψηλής χρήσης υπολογιστικών πόρων. Εάν προκύψουν προβλήματα, εξετάστε το ενδεχόμενο να διαχωρίσετε τα σημειωματάρια σε πολλαπλές runMultiple κλήσεις ή να μειώσετε την ταυτόχρονη εκτέλεση ρυθμίζοντας το πεδίο ταυτόχρονης εκτέλεσης στην παράμετρο DAG.
  • Το προεπιλεγμένο χρονικό όριο για ολόκληρο το DAG είναι 12 ώρες και το προεπιλεγμένο χρονικό όριο για κάθε κελί στο θυγατρικό σημειωματάριο είναι 90 δευτερόλεπτα. Μπορείτε να αλλάξετε το χρονικό όριο ορίζοντας τα πεδία timeoutInSeconds και χρονικό όριοPerCellInSeconds πεδία στην παράμετρο DAG.

Έξοδος από ένα σημειωματάριο

Αυτή η μέθοδος εξέρχεται από ένα σημειωματάριο με μια τιμή. Μπορείτε να εκτελέσετε κλήσεις συνάρτησης ένθεσης σε ένα σημειωματάριο με αλληλεπιδραστικό τρόπο ή σε μια διοχέτευση.

  • Όταν καλείτε μια συνάρτηση exit() από ένα σημειωματάριο με αλληλεπιδραστικό τρόπο, το σημειωματάριο Fabric δημιουργεί μια εξαίρεση, παραλείπει την εκτέλεση επόμενων κελιών και διατηρεί την περίοδο λειτουργίας Spark ζωντανή.

  • Όταν ενορχηστρώνει ένα σημειωματάριο σε μια διοχέτευση που καλεί μια συνάρτηση exit(), η δραστηριότητα σημειωματάριου επιστρέφει με μια τιμή εξόδου. Αυτό ολοκληρώνει την εκτέλεση της διοχέτευσης και διακόπτει την περίοδο λειτουργίας Spark.

  • Όταν καλείτε μια συνάρτηση exit() σε ένα σημειωματάριο στο οποίο γίνεται αναφορά, το Fabric Spark θα διακόψει την περαιτέρω εκτέλεση του σημειωματάριου στο οποίο γίνεται αναφορά και θα συνεχίσει να εκτελεί τα επόμενα κελιά στο κύριο σημειωματάριο που καλεί τη συνάρτηση run(). Για παράδειγμα: Το Notebook1 έχει τρία κελιά και καλεί μια συνάρτηση exit() στο δεύτερο κελί. Το Notebook2 έχει πέντε κελιά και καλεί την εκτέλεση (notebook1) στο τρίτο κελί. Όταν εκτελείτε το Notebook2, το Notebook1 σταματά στο δεύτερο κελί κατά το πάτημα της συνάρτησης exit(). Το Notebook2 συνεχίζει να εκτελεί το τέταρτο κελί και το πέμπτο κελί του.

notebookutils.notebook.exit("value string")

Σημείωμα

Η συνάρτηση exit() αντικαθιστά την τρέχουσα έξοδο κελιού. Για να αποφύγετε την απώλεια της εξόδου άλλων δηλώσεων κώδικα, καλέστε notebookutils.notebook.exit() σε ξεχωριστό κελί.

Για παράδειγμα:

Σημειωματάριο Sample1 με τα ακόλουθα δύο κελιά:

  • Το κελί 1 ορίζει μια παράμετρο εισόδου με προεπιλεγμένη τιμή που έχει οριστεί σε 10.

  • Το κελί 2 εξέρχεται από το σημειωματάριο με είσοδο ως τιμή εξόδου.

Στιγμιότυπο οθόνης που εμφανίζει ένα δείγμα σημειωματάριου της συνάρτησης εξόδου.

Μπορείτε να εκτελέσετε το Sample1 σε ένα άλλο σημειωματάριο με προεπιλεγμένες τιμές:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Απόδοση:

Notebook is executed successfully with exit value 10

Μπορείτε να εκτελέσετε το Sample1 σε ένα άλλο σημειωματάριο και να ορίσετε την τιμή εισόδου ως 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Απόδοση:

Notebook is executed successfully with exit value 20

Διαχείριση τεχνουργημάτων σημειωματάριου

notebookutils.notebook Το παρέχει εξειδικευμένα βοηθητικά προγράμματα για τη διαχείριση στοιχείων Σημειωματάριου μέσω προγραμματισμού. Αυτά τα API μπορούν να σας βοηθήσουν να δημιουργείτε, να λαμβάνετε, να ενημερώνετε και να διαγράφετε εύκολα στοιχεία σημειωματάριου.

Για να χρησιμοποιήσετε αποτελεσματικά αυτές τις μεθόδους, εξετάστε τα παρακάτω παραδείγματα χρήσης:

Δημιουργία σημειωματάριου

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Λήψη περιεχομένου σημειωματάριου

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Ενημέρωση σημειωματάριου

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Διαγραφή σημειωματάριου

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Καταχώρηση σημειωματάριων σε έναν χώρο εργασίας

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Βοηθητικά προγράμματα διαπιστευτηρίων

Μπορείτε να χρησιμοποιήσετε τα Βοηθητικά προγράμματα διαπιστευτηρίων για να λάβετε διακριτικά πρόσβασης και να διαχειριστείτε μυστικά σε ένα Azure Key Vault.

Εκτελέστε την ακόλουθη εντολή για να δείτε μια επισκόπηση των διαθέσιμων μεθόδων:

notebookutils.credentials.help()

Απόδοση:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Λήψη διακριτικού

Η συνάρτηση getToken επιστρέφει ένα διακριτικό Microsoft Entra για ένα δεδομένο κοινό και όνομα (προαιρετικό). Η παρακάτω λίστα εμφανίζει τα τρέχουσα διαθέσιμα κλειδιά ακροατηρίου:

  • Πόρος ακροατηρίου χώρου αποθήκευσης: "χώρος αποθήκευσης"
  • Πόρος Power BI: "pbi"
  • Πόρος Azure Key Vault: "keyvault"
  • Πόρος Synapse RTA KQL DB: "kusto"

Εκτελέστε την ακόλουθη εντολή για να λάβετε το διακριτικό:

notebookutils.credentials.getToken('audience Key')

Λήψη μυστικού κωδικού με χρήση διαπιστευτηρίων χρήστη

Η συνάρτηση getSecret επιστρέφει έναν μυστικό κωδικό Azure Key Vault για ένα δεδομένο τελικό σημείο Azure Key Vault και ένα μυστικό όνομα, χρησιμοποιώντας διαπιστευτήρια χρήστη.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Μοντάρετε αρχείο και κατάργησηmount

Το Fabric υποστηρίζει τα παρακάτω σενάρια μονταρίσματος στο πακέτο Microsoft Spark Utilities. Μπορείτε να χρησιμοποιήσετε τα API μονταρίσματος, unmount, getMountPath() και mounts() για να συνδέσετε απομακρυσμένο χώρο αποθήκευσης (ADLS Gen2) σε όλους τους κόμβους εργασίας (κόμβοι οδηγού και κόμβους εργαζομένων). Αφού το σημείο μονταρίσματος χώρου αποθήκευσης είναι στη θέση του, χρησιμοποιήστε το τοπικό αρχείο API για να αποκτήσετε πρόσβαση στα δεδομένα σαν να είναι αποθηκευμένα στο τοπικό σύστημα αρχείων.

Πώς μπορείτε να μοντάρετε έναν λογαριασμό ADLS Gen2

Το παρακάτω παράδειγμα δείχνει πώς μπορείτε να μοντάρετε το Azure Data Lake Storage Gen2. Η τοποθέτηση χώρου αποθήκευσης αντικειμένων blob λειτουργεί με παρόμοιο τρόπο.

Αυτό το παράδειγμα προϋποθέτει ότι έχετε έναν λογαριασμό Data Lake Storage Gen2 με το όνομα storegen2 και ο λογαριασμός έχει ένα κοντέινερ με το όνομα mycontainer που θέλετε να μοντάρετε στο /test στην περίοδο λειτουργίας Spark του σημειωματάριού σας.

Στιγμιότυπο οθόνης που εμφανίζει πού μπορείτε να επιλέξετε ένα κοντέινερ για μοντάρετε.

Για να μοντάρετε το κοντέινερ που ονομάζεται mycontainer, τα notebookutils πρέπει πρώτα να ελέγξουν αν έχετε το δικαίωμα πρόσβασης στο κοντέινερ. Προς το παρόν, το Fabric υποστηρίζει δύο μεθόδους ελέγχου ταυτότητας για τη λειτουργία μονταρίσματος εναύσματος: accountKey και sastoken.

Μοντάρετε μέσω διακριτικού υπογραφής κοινόχρηστής πρόσβασης ή κλειδιού λογαριασμού

Το NotebookUtils υποστηρίζει τη ρητή διαβίβαση ενός κλειδιού λογαριασμού ή ενός διακριτικού υπογραφής κοινόχρηστης πρόσβασης (SAS) ως παραμέτρου για τη μονταροποίηση του προορισμού.

Για λόγους ασφαλείας, συνιστούμε να αποθηκεύετε κλειδιά λογαριασμού ή διακριτικά SAS στο Azure Key Vault (όπως δείχνει το παρακάτω στιγμιότυπο οθόνης). Στη συνέχεια, μπορείτε να τις ανακτήσετε χρησιμοποιώντας το notebookutils.credentials.getSecret API. Για περισσότερες πληροφορίες σχετικά με το Azure Key Vault, ανατρέξτε στο θέμα Πληροφορίες για τα κλειδιά λογαριασμού διαχειριζόμενου χώρου αποθήκευσης Azure Key Vault.

Στιγμιότυπο οθόνης που δείχνει πού αποθηκεύονται μυστικά σε ένα Azure Key Vault.

Δείγμα κώδικα για τη μέθοδο accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Δείγμα κώδικα για διακριτικό:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Μοντάρετε παραμέτρους:

  • fileCacheTimeout: Τα αντικειμένα blob αποθηκεύονται στο cache στον τοπικό φάκελο temp για 120 δευτερόλεπτα από προεπιλογή. Σε αυτό το διάστημα, το blobfuse δεν ελέγχει αν το αρχείο είναι ενημερωμένο ή όχι. Η παράμετρος μπορεί να οριστεί για αλλαγή του προεπιλεγμένου χρόνου χρονικού συνόλου. Όταν πολλά προγράμματα-πελάτες τροποποιούν αρχεία ταυτόχρονα, για να αποφεύγονται ασυνέπειες μεταξύ τοπικών και απομακρυσμένων αρχείων, συνιστούμε να μικρύνετε τον χρόνο cache ή ακόμα και να τον αλλάξετε σε 0 και να λαμβάνετε πάντα τα πιο πρόσφατα αρχεία από τον διακομιστή.
  • χρονικό όριο: Το χρονικό όριο της λειτουργίας μονταρίσματος είναι 120 δευτερόλεπτα από προεπιλογή. Η παράμετρος μπορεί να οριστεί για αλλαγή του προεπιλεγμένου χρόνου χρονικού συνόλου. Όταν υπάρχουν πάρα πολλοί εκτελεστές ή όταν λήγουν οι ώρες μονταρίσματος, συνιστούμε να αυξάνετε την τιμή.

Μπορείτε να χρησιμοποιήσετε αυτές τις παραμέτρους ως εξής:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Σημείωμα

Για λόγους ασφάλειας, συνιστάται να αποφεύγετε την ενσωμάτωση διαπιστευτηρίων απευθείας στον κώδικα. Για την περαιτέρω προστασία των διαπιστευτηρίων σας, τυχόν μυστικά που εμφανίζονται στις εξόδους σημειωματάριου αποθηκεύονται. Για περισσότερες πληροφορίες, ανατρέξτε στο θέμα Μυστικός διαχωρισμός.

Πώς να τοποθετήσετε ένα lakehouse

Δείγμα κώδικα για την τοποθέτηση μιας λίμνης σε /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Πρόσβαση σε αρχεία κάτω από το σημείο μονταρίσματος χρησιμοποιώντας το API notebookutils fs

Ο κύριος σκοπός της λειτουργίας μονταρίσματος είναι να επιτρέψει στους πελάτες να έχουν πρόσβαση στα δεδομένα που είναι αποθηκευμένα σε έναν λογαριασμό απομακρυσμένου χώρου αποθήκευσης με ένα API τοπικού συστήματος αρχείων. Μπορείτε επίσης να αποκτήσετε πρόσβαση στα δεδομένα χρησιμοποιώντας το API notebookutils fs με μια μονταρισμένη διαδρομή ως παράμετρο. Αυτή η μορφή διαδρομής είναι λίγο διαφορετική.

Ας υποθέσουμε ότι έχετε τοποθετήσει το κοντέινερ Data Lake Storage Gen2 στο/test χρησιμοποιώντας το API μονταρίσματος. Όταν αποκτάτε πρόσβαση στα δεδομένα με ένα API τοπικού συστήματος αρχείων, η μορφή διαδρομής είναι ως εξής:

/synfs/notebook/{sessionId}/test/{filename}

Όταν θέλετε να αποκτήσετε πρόσβαση στα δεδομένα χρησιμοποιώντας το API notebookutils fs, προτείνουμε τη χρήση του getMountPath() για να λάβετε την ακριβή διαδρομή:

path = notebookutils.fs.getMountPath("/test")
  • Κατάλογοι λίστας:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Ανάγνωση περιεχομένου αρχείου:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Δημιουργία καταλόγου:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Αρχεία πρόσβασης κάτω από το σημείο μονταρίσματος μέσω τοπικής διαδρομής

Μπορείτε εύκολα να διαβάσετε και γράψετε τα αρχεία στο σημείο μονταρίσματος χρησιμοποιώντας το τυπικό σύστημα αρχείων. Ακολουθεί ένα παράδειγμα Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Έλεγχος υπαρχόντων σημείων μονταρίσματος

Μπορείτε να χρησιμοποιήσετε το API notebookutils.fs.mounts() για να ελέγξετε όλες τις υπάρχουσες πληροφορίες σημείου μονταρίσματος:

notebookutils.fs.mounts()

Κατάργηση του μονταρίσματος του σημείου μονταρίσματος

Χρησιμοποιήστε τον παρακάτω κώδικα για να καταργήσετε το σημείο μονταρίσματος (/δοκιμή σε αυτό το παράδειγμα):

notebookutils.fs.unmount("/test")

Γνωστοί περιορισμοί

  • Η τρέχουσα μονταρίσματος είναι μια ρύθμιση παραμέτρων επιπέδου εργασίας. Συνιστούμε να χρησιμοποιήσετε το API μονταρίσματος για να ελέγξετε εάν υπάρχει ή όχι σημείο μονταρίσματος.

  • Ο μηχανισμός unmount δεν εφαρμόζεται αυτόματα. Όταν ολοκληρωθεί η εκτέλεση της εφαρμογής, για να καταργήσετε το σημείο μονταρίσματος και να αποδεσμεύσετε τον χώρο στον δίσκο, πρέπει να καλέσετε ρητά ένα API unmount στον κώδικά σας. Διαφορετικά, το σημείο μονταρίσματος θα εξακολουθεί να υπάρχει στον κόμβο μετά την ολοκλήρωση της εκτέλεσης της εφαρμογής.

  • Η μοντάρισμα ενός λογαριασμού χώρου αποθήκευσης ADLS Gen1 δεν υποστηρίζεται.

Βοηθητικά προγράμματα Lakehouse

notebookutils.lakehouse παρέχει βοηθητικά προγράμματα προσαρμοσμένα για τη διαχείριση αντικειμένων Lakehouse. Αυτά τα βοηθητικά προγράμματα σάς δίνουν τη δυνατότητα να δημιουργείτε, να λαμβάνετε, να ενημερώνετε και να διαγράφετε αντικείμενα σχεδίασης Lakehouse χωρίς προσπάθεια.

Επισκόπηση μεθόδων

Ακολουθεί μια επισκόπηση των διαθέσιμων μεθόδων που παρέχονται από notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Παραδείγματα χρήσης

Για να χρησιμοποιήσετε αποτελεσματικά αυτές τις μεθόδους, εξετάστε τα παρακάτω παραδείγματα χρήσης:

Δημιουργία ενός Lakehouse

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Λήψη ενός Lakehouse

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Ενημέρωση lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Διαγραφή lakehouse

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Παράθεση lakehouses σε έναν χώρο εργασίας

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Παράθεση όλων των πινάκων σε ένα Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Εκκίνηση μιας λειτουργίας πίνακα φόρτωσης σε ένα Lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Πρόσθετες πληροφορίες

Για πιο λεπτομερείς πληροφορίες σχετικά με κάθε μέθοδο και τις παραμέτρους της, χρησιμοποιήστε τη notebookutils.lakehouse.help("methodName") συνάρτηση.

Βοηθητικά προγράμματα χρόνου εκτέλεσης

Εμφάνιση των πληροφοριών περιβάλλοντος περιόδου λειτουργίας

Με notebookutils.runtime.context μπορείτε να λάβετε τις πληροφορίες περιβάλλοντος της τρέχουσας ζωντανής περιόδου λειτουργίας, συμπεριλαμβανομένου του ονόματος σημειωματάριου, της προεπιλεγμένης λίμνης, των πληροφοριών χώρου εργασίας, εάν πρόκειται για εκτέλεση διοχέτευσης κ.λπ.

notebookutils.runtime.context

Διαχείριση περιόδων λειτουργίας

Διακοπή αλληλεπιδραστικής περιόδου λειτουργίας

Αντί να κάνετε μη αυτόματο κλικ στο κουμπί διακοπής, ορισμένες φορές είναι πιο εύκολο να διακόψετε μια αλληλεπιδραστική περίοδο λειτουργίας καλώντας ένα API στον κώδικα. Σε αυτές τις περιπτώσεις, παρέχουμε μια notebookutils.session.stop() API για την υποστήριξη διακοπής της αλληλεπιδραστικής περιόδου λειτουργίας μέσω κώδικα, η οποία είναι διαθέσιμη για scala και PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API διακόπτει ασύγχρονα την τρέχουσα αλληλεπιδραστική περίοδο λειτουργίας στο παρασκήνιο. Διακόπτει επίσης τη συνεδρία Spark και δημοσιεύει πόρους που καταλαμβάνονται από την περίοδο λειτουργίας, ώστε να είναι διαθέσιμοι σε άλλες περιόδους λειτουργίας στην ίδια ομάδα.

Επανεκκίνηση του διερμηνέα Python

Το βοηθητικό πρόγραμμα notebookutils.session παρέχει έναν τρόπο επανεκκίνησης του διερμηνέα Python.

notebookutils.session.restartPython()

Σημείωμα

  • Στην περίπτωση εκτέλεσης αναφοράς σημειωματάριου, restartPython() επανεκκινεί μόνο τον διερμηνέα Python του τρέχοντος σημειωματάριου στο οποίο γίνεται αναφορά.
  • Σε σπάνιες περιπτώσεις, η εντολή ενδέχεται να αποτύχει λόγω του μηχανισμού αντανάκλασης Spark, προσθέτοντας επανάληψη μπορεί να μετριάσει το πρόβλημα.

Γνωστό ζήτημα

  • Όταν χρησιμοποιείτε την έκδοση περιβάλλοντος εκτέλεσης πάνω από 1.2 και εκτελείτε notebookutils.help()το , τα API fabricClient, PBIClient δεν υποστηρίζονται προς το παρόν, θα είναι διαθέσιμα στο περαιτέρω. Επιπλέον, το API διαπιστευτηρίων δεν υποστηρίζεται προς το παρόν στα σημειωματάρια Scala.

  • Το σημειωματάριο Python δεν υποστηρίζει τη διακοπή , επανεκκινήστε τοPython API κατά τη χρήση του βοηθητικού προγράμματος notebookutils.session για διαχείριση περιόδου λειτουργίας.