Βοηθητικά προγράμματα Microsoft Spark (MSSparkUtils) για Fabric
Το Microsoft Spark Utilities (MSSparkUtils) είναι ένα ενσωματωμένο πακέτο που σας βοηθά να εκτελείτε εύκολα συνήθεις εργασίες. Μπορείτε να χρησιμοποιήσετε το MSSparkUtils για να εργαστείτε με συστήματα αρχείων, να λάβετε μεταβλητές περιβάλλοντος, να αλληλουχήσετε σημειωματάρια και να εργαστείτε με μυστικά. Το πακέτο MSSparkUtils είναι διαθέσιμο στη Σκάλα PySpark (Python), στα σημειωματάρια SparkR και στις διοχετεύσεις Fabric.
Σημείωμα
- Το MsSparkUtils έχει μετονομαστεί επίσημα σε NotebookUtils. Ο υπάρχων κώδικας θα παραμείνει συμβατός με προηγούμενες εκδόσεις και δεν θα προκαλέσει νέες αλλαγές. Συνιστάται ιδιαίτερα η αναβάθμιση σε notebookutils για να εξασφαλιστεί η συνεχής υποστήριξη και πρόσβαση σε νέες δυνατότητες. Ο χώρος ονομάτων mssparkutils θα αποσυρθεί στο μέλλον.
- Το NotebookUtils έχει σχεδιαστεί για χρήση με το Spark 3.4(Runtime v1.2) και νεότερες εκδόσεις. Όλες οι νέες δυνατότητες και ενημερώσεις θα υποστηρίζονται αποκλειστικά με τον χώρο ονομάτων notebookutils στο μέλλον.
Βοηθητικά προγράμματα συστήματος αρχείων
Το mssparkutils.fs παρέχει βοηθητικά προγράμματα για εργασία με διάφορα συστήματα αρχείων, συμπεριλαμβανομένων των Azure Data Lake Storage (ADLS) Gen2 και Azure Blob Storage. Βεβαιωθείτε ότι έχετε ρυθμίσει κατάλληλα τις παραμέτρους πρόσβασης στα Azure Data Lake Storage Gen2 και Azure Blob Storage .
Εκτελέστε τις ακόλουθες εντολές για μια επισκόπηση των διαθέσιμων μεθόδων:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Έξοδος
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
Το MSSparkUtils λειτουργεί με το σύστημα αρχείων με τον ίδιο τρόπο όπως τα API Spark. Για παράδειγμα, πάρτε τα mssparkuitls.fs.mkdirs() και Fabric lakehouse:
Χρήση | Σχετική διαδρομή από ρίζα HDFS | Απόλυτη διαδρομή για το σύστημα αρχείων ABFS | Απόλυτη διαδρομή για το τοπικό σύστημα αρχείων στον κόμβο προγράμματος οδήγησης |
---|---|---|---|
Μη προετοιχές λιμνοθάσπιτο | Δεν υποστηρίζεται | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Προεπιλεγμένη λίμνη | Κατάλογος στην περιοχή "Αρχεία" ή "Πίνακες": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Λίστα αρχείων
Για να καταχωρήσετε το περιεχόμενο ενός καταλόγου, χρησιμοποιήστε mssparkutils.fs.ls('Η διαδρομή του καταλόγου σας'). Για παράδειγμα:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Προβολή ιδιοτήτων αρχείου
Αυτή η μέθοδος επιστρέφει ιδιότητες αρχείου όπως το όνομα αρχείου, τη διαδρομή αρχείου, το μέγεθος αρχείου και εάν πρόκειται για κατάλογο και αρχείο.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Δημιουργία νέου καταλόγου
Αυτή η μέθοδος δημιουργεί τον δεδομένο κατάλογο εάν δεν υπάρχει και δημιουργεί τυχόν απαραίτητους γονικοί κατάλογοι.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Αντιγραφή αρχείου
Αυτή η μέθοδος αντιγράφει ένα αρχείο ή έναν κατάλογο και υποστηρίζει αντιγραφή δραστηριότητας μεταξύ συστημάτων αρχείων.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Εκτέλεση αρχείου αντιγραφής
Αυτή η μέθοδος παρέχει έναν ταχύτερο τρόπο αντιγραφής ή μετακίνησης αρχείων, ιδιαίτερα μεγάλων όγκων δεδομένων.
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Προεπισκόπηση περιεχομένου αρχείου
Αυτή η μέθοδος επιστρέφει έως τα πρώτα byte "maxBytes" του δεδομένου αρχείου ως Συμβολοσειρά κωδικοποιημένη σε UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
Μετακίνηση αρχείων
Αυτή η μέθοδος μετακινεί ένα αρχείο ή κατάλογο και υποστηρίζει κινήσεις μεταξύ συστημάτων αρχείων.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
Εγγραφή αρχείου
Αυτή η μέθοδος εγγράφει τη δεδομένη συμβολοσειρά σε ένα αρχείο, κωδικοποιημένη σε UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Προσάρτηση περιεχομένου σε ένα αρχείο
Αυτή η μέθοδος προσαρτά τη δεδομένη συμβολοσειρά σε ένα αρχείο, κωδικοποιημένο σε UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Σημείωμα
Όταν χρησιμοποιείτε το mssparkutils.fs.append
API σε for
βρόχο για εγγραφή στο ίδιο αρχείο, συνιστούμε να προσθέσετε μια sleep
πρόταση περίπου 0,5s~ 1s μεταξύ των επαναλαμβανόμενων εγγραφών. Αυτό συμβαίνει επειδή η mssparkutils.fs.append
εσωτερική flush
λειτουργία του API είναι ασύγχρονη, επομένως μια μικρή καθυστέρηση συμβάλλει στη διασφάλιση της ακεραιότητας των δεδομένων.
Διαγραφή αρχείου ή καταλόγου
Αυτή η μέθοδος καταργεί ένα αρχείο ή κατάλογο.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Κατάλογος μονταρίσματος/κατάργησης μονταρίσματος
Βρείτε περισσότερες πληροφορίες σχετικά με τη λεπτομερή χρήση στο θέμα Μοντάρετε αρχείο και κατάργησηmount.
Βοηθητικά προγράμματα σημειωματάριου
Χρησιμοποιήστε τα βοηθητικά προγράμματα σημειωματάριου MSSparkUtils για να εκτελέσετε ένα σημειωματάριο ή να κλείσετε ένα σημειωματάριο με μια τιμή. Εκτελέστε την ακόλουθη εντολή για να δείτε μια επισκόπηση των διαθέσιμων μεθόδων:
mssparkutils.notebook.help()
Απόδοση:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Σημείωμα
Τα βοηθητικά προγράμματα σημειωματάριου δεν ισχύουν για τους ορισμούς εργασίας Apache Spark (SJD).
Αναφορά σημειωματάριου
Αυτή η μέθοδος αναφέρεται σε ένα σημειωματάριο και επιστρέφει την τιμή εξόδου του. Μπορείτε να εκτελέσετε κλήσεις συνάρτησης ένθεσης σε ένα σημειωματάριο με αλληλεπιδραστικό τρόπο ή σε μια διοχέτευση. Το σημειωματάριο στο οποίο γίνεται αναφορά εκτελείται στον χώρο συγκέντρωσης Spark του σημειωματάριου που καλεί αυτήν τη συνάρτηση.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
Για παράδειγμα:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
Το σημειωματάριο Fabric υποστηρίζει επίσης την αναφορά σημειωματάριων σε πολλούς χώρους εργασίας καθορίζοντας το αναγνωριστικό χώρου εργασίας.
mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
Μπορείτε να ανοίξετε τη σύνδεση στιγμιότυπου της εκτέλεσης αναφοράς στην έξοδο κελιού. Το στιγμιότυπο καταγράφει τα αποτελέσματα εκτέλεσης κώδικα και σας επιτρέπει να εντοπίσετε εύκολα μια εκτέλεση αναφοράς.
Σημείωμα
- Το σημειωματάριο αναφοράς μεταξύ χώρων εργασίας υποστηρίζεται από την έκδοση 1.2 του περιβάλλοντος εκτέλεσης και νεότερες εκδόσεις.
- Εάν χρησιμοποιείτε τα αρχεία στην περιοχή Πόρος σημειωματάριου, χρησιμοποιήστε
mssparkutils.nbResPath
το στο σημειωματάριο στο οποίο γίνεται αναφορά για να βεβαιωθείτε ότι δείχνουν στον ίδιο φάκελο με την εκτέλεση αλληλεπίδρασης.
Αναφορά εκτέλεσης πολλών σημειωματάριων παράλληλα
Σημαντικό
Αυτή η δυνατότητα είναι σε προεπισκόπηση.
Η μέθοδος mssparkutils.notebook.runMultiple()
σάς επιτρέπει να εκτελείτε πολλά σημειωματάρια παράλληλα ή με μια προκαθορισμένη τοπολογική δομή. Το API χρησιμοποιεί έναν μηχανισμό υλοποίησης πολλών νημάτων εντός μιας περιόδου λειτουργίας spark, το οποίο σημαίνει ότι οι πόροι υπολογιστικής λειτουργίας χρησιμοποιούνται από κοινού από τις εκτελέσεις του σημειωματάριου αναφοράς.
Με mssparkutils.notebook.runMultiple()
το , μπορείτε να κάνετε τα εξής:
Εκτελέστε πολλά σημειωματάρια ταυτόχρονα, χωρίς να πρέπει να ολοκληρωθεί το καθένα.
Καθορίστε τις εξαρτήσεις και τη σειρά εκτέλεσης για τα σημειωματάριά σας, χρησιμοποιώντας μια απλή μορφή JSON.
Βελτιστοποιήστε τη χρήση των υπολογιστικών πόρων Spark και μειώστε το κόστος των έργων σας Fabric.
Προβάλετε τα Στιγμιότυπα κάθε εγγραφής εκτέλεσης σημειωματάριου στην έξοδο και εντοπίστε εύκολα τις εργασίες του σημειωματάριου/παρακολουθήστε τις εργασίες σας.
Λάβετε την αξία εξόδου κάθε εκτελεστικής δραστηριότητας και χρησιμοποιήστε τις σε εργασίες κατάντη.
Μπορείτε επίσης να δοκιμάσετε να εκτελέσετε το mssparkutils.notebook.help("runMultiple") για να βρείτε το παράδειγμα και τη λεπτομερή χρήση.
Ακολουθεί ένα απλό παράδειγμα εκτέλεσης μιας λίστας σημειωματάριων παράλληλα με χρήση αυτής της μεθόδου:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
Το αποτέλεσμα εκτέλεσης από το ριζικό σημειωματάριο είναι το εξής:
Ακολουθεί ένα παράδειγμα εκτέλεσης σημειωματάριων με τοπολογική δομή που χρησιμοποιεί mssparkutils.notebook.runMultiple()
. Χρησιμοποιήστε αυτήν τη μέθοδο για να οργανώσετε εύκολα σημειωματάρια μέσω μιας εμπειρίας κώδικα.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
Το αποτέλεσμα εκτέλεσης από το ριζικό σημειωματάριο είναι το εξής:
Σημείωμα
- Ο βαθμός παραλληλισμού της εκτέλεσης πολλαπλού σημειωματάριου περιορίζεται στον συνολικό διαθέσιμο υπολογιστικό πόρο μιας περιόδου λειτουργίας Spark.
- Το ανώτατο όριο για δραστηριότητες σημειωματάριου ή ταυτόχρονα σημειωματάρια είναι 50. Η υπέρβαση αυτού του ορίου μπορεί να οδηγήσει σε προβλήματα σταθερότητας και επιδόσεων λόγω υψηλής χρήσης υπολογιστικών πόρων. Εάν προκύψουν προβλήματα, εξετάστε το ενδεχόμενο να διαχωρίσετε τα σημειωματάρια σε πολλαπλές
runMultiple
κλήσεις ή να μειώσετε την ταυτόχρονη εκτέλεση ρυθμίζοντας το πεδίο ταυτόχρονης εκτέλεσης στην παράμετρο DAG. - Το προεπιλεγμένο χρονικό όριο για ολόκληρο το DAG είναι 12 ώρες και το προεπιλεγμένο χρονικό όριο για κάθε κελί στο θυγατρικό σημειωματάριο είναι 90 δευτερόλεπτα. Μπορείτε να αλλάξετε το χρονικό όριο ρυθμίζοντας τα πεδία timeoutInSeconds και timeoutPerCellInSeconds στην παράμετρο DAG.
Έξοδος από ένα σημειωματάριο
Αυτή η μέθοδος εξέρχεται από ένα σημειωματάριο με μια τιμή. Μπορείτε να εκτελέσετε κλήσεις συνάρτησης ένθεσης σε ένα σημειωματάριο με αλληλεπιδραστικό τρόπο ή σε μια διοχέτευση.
Όταν καλείτε μια συνάρτηση exit() από ένα σημειωματάριο με αλληλεπιδραστικό τρόπο, το σημειωματάριο Fabric δημιουργεί μια εξαίρεση, παραλείπει την εκτέλεση επόμενων κελιών και διατηρεί την περίοδο λειτουργίας Spark ζωντανή.
Όταν ενορχηστρώνει ένα σημειωματάριο σε μια διοχέτευση που καλεί μια συνάρτηση exit( ), η δραστηριότητα σημειωματάριου επιστρέφει με μια τιμή εξόδου, ολοκληρώνει την εκτέλεση της διοχέτευσης και διακόπτει την περίοδο λειτουργίας Spark.
Όταν καλείτε μια συνάρτηση exit() σε ένα σημειωματάριο στο οποίο γίνεται αναφορά, το Fabric Spark θα διακόψει την περαιτέρω εκτέλεση του σημειωματάριου στο οποίο γίνεται αναφορά και θα συνεχίσει να εκτελεί τα επόμενα κελιά στο κύριο σημειωματάριο που καλεί τη συνάρτηση run(). Για παράδειγμα: Το Notebook1 έχει τρία κελιά και καλεί μια συνάρτηση exit() στο δεύτερο κελί. Το Notebook2 έχει πέντε κελιά και καλεί την εκτέλεση (notebook1) στο τρίτο κελί. Όταν εκτελείτε το Notebook2, το Notebook1 σταματά στο δεύτερο κελί κατά το πάτημα της συνάρτησης exit(). Το Notebook2 συνεχίζει να εκτελεί το τέταρτο κελί και το πέμπτο κελί του.
mssparkutils.notebook.exit("value string")
Για παράδειγμα:
Σημειωματάριο Sample1 με τα ακόλουθα δύο κελιά:
Το κελί 1 ορίζει μια παράμετρο εισόδου με προεπιλεγμένη τιμή που έχει οριστεί σε 10.
Το κελί 2 εξέρχεται από το σημειωματάριο με είσοδο ως τιμή εξόδου.
Μπορείτε να εκτελέσετε το Sample1 σε ένα άλλο σημειωματάριο με προεπιλεγμένες τιμές:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Απόδοση:
Notebook executed successfully with exit value 10
Μπορείτε να εκτελέσετε το Sample1 σε ένα άλλο σημειωματάριο και να ορίσετε την τιμή εισόδου ως 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Απόδοση:
Notebook executed successfully with exit value 20
Βοηθητικά προγράμματα διαπιστευτηρίων
Μπορείτε να χρησιμοποιήσετε τα βοηθητικά προγράμματα διαπιστευτηρίων MSSparkUtils για να λάβετε διακριτικά πρόσβασης και να διαχειριστείτε μυστικά σε ένα Azure Key Vault.
Εκτελέστε την ακόλουθη εντολή για να δείτε μια επισκόπηση των διαθέσιμων μεθόδων:
mssparkutils.credentials.help()
Απόδοση:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name
Λήψη διακριτικού
Η συνάρτηση getToken επιστρέφει ένα διακριτικό Microsoft Entra για ένα δεδομένο κοινό και όνομα (προαιρετικό). Η παρακάτω λίστα εμφανίζει τα τρέχουσα διαθέσιμα κλειδιά ακροατηρίου:
- Πόρος ακροατηρίου χώρου αποθήκευσης: "χώρος αποθήκευσης"
- Πόρος Power BI: "pbi"
- Πόρος Azure Key Vault: "keyvault"
- Πόρος Synapse RTA KQL DB: "kusto"
Εκτελέστε την ακόλουθη εντολή για να λάβετε το διακριτικό:
mssparkutils.credentials.getToken('audience Key')
Λήψη μυστικού κωδικού με χρήση διαπιστευτηρίων χρήστη
Η συνάρτηση getSecret επιστρέφει έναν μυστικό κωδικό Azure Key Vault για ένα δεδομένο τελικό σημείο Azure Key Vault και ένα μυστικό όνομα, χρησιμοποιώντας διαπιστευτήρια χρήστη.
mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
Μοντάρετε αρχείο και κατάργησηmount
Το Fabric υποστηρίζει τα παρακάτω σενάρια μονταρίσματος στο πακέτο Microsoft Spark Utilities. Μπορείτε να χρησιμοποιήσετε τα API μονταρίσματος, unmount, getMountPath() και mounts() για να συνδέσετε απομακρυσμένο χώρο αποθήκευσης (ADLS Gen2) σε όλους τους κόμβους εργασίας (κόμβοι οδηγού και κόμβους εργαζομένων). Αφού το σημείο μονταρίσματος χώρου αποθήκευσης είναι στη θέση του, χρησιμοποιήστε το τοπικό αρχείο API για να αποκτήσετε πρόσβαση στα δεδομένα σαν να είναι αποθηκευμένα στο τοπικό σύστημα αρχείων.
Πώς μπορείτε να μοντάρετε έναν λογαριασμό ADLS Gen2
Το παρακάτω παράδειγμα δείχνει πώς μπορείτε να μοντάρετε το Azure Data Lake Storage Gen2. Η τοποθέτηση χώρου αποθήκευσης αντικειμένων blob λειτουργεί με παρόμοιο τρόπο.
Αυτό το παράδειγμα προϋποθέτει ότι έχετε έναν λογαριασμό Data Lake Storage Gen2 με το όνομα storegen2 και ο λογαριασμός έχει ένα κοντέινερ με το όνομα mycontainer που θέλετε να μοντάρετε στο /test στην περίοδο λειτουργίας Spark του σημειωματάριού σας.
Για να μοντάρετε το κοντέινερ που ονομάζεται mycontainer, το mssparkutils πρέπει πρώτα να ελέγξει αν έχετε το δικαίωμα πρόσβασης στο κοντέινερ. Προς το παρόν, το Fabric υποστηρίζει δύο μεθόδους ελέγχου ταυτότητας για τη λειτουργία μονταρίσματος εναύσματος: accountKey και sastoken.
Μοντάρετε μέσω διακριτικού υπογραφής κοινόχρηστής πρόσβασης ή κλειδιού λογαριασμού
Το MSSparkUtils υποστηρίζει τη ρητή διαβίβαση ενός κλειδιού λογαριασμού ή ενός διακριτικού υπογραφής κοινόχρηστης πρόσβασης (SAS) ως παράμετρο για τη μονταροποίηση του προορισμού.
Για λόγους ασφαλείας, συνιστούμε να αποθηκεύετε κλειδιά λογαριασμού ή διακριτικά SAS στο Azure Key Vault (όπως δείχνει το παρακάτω στιγμιότυπο οθόνης). Στη συνέχεια, μπορείτε να τις ανακτήσετε χρησιμοποιώντας το mssparkutils.credentials.getSecret API. Για περισσότερες πληροφορίες σχετικά με το Azure Key Vault, ανατρέξτε στο θέμα Πληροφορίες για τα κλειδιά λογαριασμού διαχειριζόμενου χώρου αποθήκευσης Azure Key Vault.
Δείγμα κώδικα για τη μέθοδο accountKey :
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Δείγμα κώδικα για διακριτικό:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Σημείωμα
Ίσως χρειαστεί να κάνετε εισαγωγή mssparkutils
εάν δεν είναι διαθέσιμη:
from notebookutils import mssparkutils
Μοντάρετε παραμέτρους:
- fileCacheTimeout: Τα blob θα αποθηκευτούν στο cache στον τοπικό φάκελο temp για 120 δευτερόλεπτα από προεπιλογή. Σε αυτό το διάστημα, blobfuse δεν θα ελέγξει αν το αρχείο είναι ενημερωμένο ή όχι. Η παράμετρος μπορεί να οριστεί για αλλαγή του προεπιλεγμένου χρόνου λήξης χρονικού ορίου. Όταν πολλά προγράμματα-πελάτες τροποποιούν αρχεία ταυτόχρονα, προκειμένου να αποφεύγονται ασυνέπειες μεταξύ τοπικών και απομακρυσμένων αρχείων, συνιστούμε να μικρύνετε τον χρόνο cache ή ακόμα και να τον αλλάξετε σε 0 και να λαμβάνετε πάντα τα πιο πρόσφατα αρχεία από τον διακομιστή.
- χρονικό όριο: Το χρονικό όριο της λειτουργίας μονταρίσματος είναι 120 δευτερόλεπτα από προεπιλογή. Η παράμετρος μπορεί να οριστεί για αλλαγή του προεπιλεγμένου χρόνου λήξης χρονικού ορίου. Όταν υπάρχουν πάρα πολλοί εκτελεστές ή όταν λήγουν οι ώρες μονταρίσματος, συνιστούμε να αυξάνετε την τιμή.
Μπορείτε να χρησιμοποιήσετε αυτές τις παραμέτρους ως εξής:
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Σημείωμα
Για λόγους ασφαλείας, συνιστούμε να μην αποθηκεύετε διαπιστευτήρια στον κώδικα. Για την περαιτέρω προστασία των διαπιστευτηρίων σας, θα διατυπώνουμε τον μυστικό κωδικό σας στην έξοδο σημειωματάριου. Για περισσότερες πληροφορίες, ανατρέξτε στο θέμα Μυστικός διαχωρισμός.
Πώς να τοποθετήσετε ένα lakehouse
Δείγμα κώδικα για την τοποθέτηση μιας λίμνης σε /test:
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
Σημείωμα
Δεν υποστηρίζεται η τοποθέτηση ενός τοπικού τελικού σημείου. Το Fabric υποστηρίζει μόνο την τοποθέτηση του καθολικού τελικού σημείου, onelake.dfs.fabric.microsoft.com
.
Αρχεία πρόσβασης κάτω από το σημείο μονταρίσματος χρησιμοποιώντας το API mssparktuils fs
Ο κύριος σκοπός της λειτουργίας μονταρίσματος είναι να επιτρέψει στους πελάτες να έχουν πρόσβαση στα δεδομένα που είναι αποθηκευμένα σε έναν λογαριασμό απομακρυσμένου χώρου αποθήκευσης με ένα API τοπικού συστήματος αρχείων. Μπορείτε επίσης να αποκτήσετε πρόσβαση στα δεδομένα χρησιμοποιώντας το API mssparkutils fs με μια μονταρισμένη διαδρομή ως παράμετρο. Αυτή η μορφή διαδρομής είναι λίγο διαφορετική.
Ας υποθέσουμε ότι έχετε τοποθετήσει το κοντέινερ Data Lake Storage Gen2 στο /test χρησιμοποιώντας το API μονταρίσματος. Όταν αποκτάτε πρόσβαση στα δεδομένα με ένα API τοπικού συστήματος αρχείων, η μορφή διαδρομής είναι ως εξής:
/synfs/notebook/{sessionId}/test/{filename}
Όταν θέλετε να αποκτήσετε πρόσβαση στα δεδομένα χρησιμοποιώντας το API mssparkutils fs, συνιστάται να χρησιμοποιήσετε το getMountPath() για να λάβετε την ακριβή διαδρομή:
path = mssparkutils.fs.getMountPath("/test")
Κατάλογοι λίστας:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
Ανάγνωση περιεχομένου αρχείου:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
Δημιουργία καταλόγου:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
Αρχεία πρόσβασης κάτω από το σημείο μονταρίσματος μέσω τοπικής διαδρομής
Μπορείτε εύκολα να διαβάσετε και γράψετε τα αρχεία στο σημείο μονταρίσματος χρησιμοποιώντας το τυπικό σύστημα αρχείων. Ακολουθεί ένα παράδειγμα Python:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Έλεγχος υπαρχόντων σημείων μονταρίσματος
Μπορείτε να χρησιμοποιήσετε το API mssparkutils.fs.mounts() για να ελέγξετε όλες τις υπάρχουσες πληροφορίες σημείου μονταρίσματος:
mssparkutils.fs.mounts()
Κατάργηση του μονταρίσματος του σημείου μονταρίσματος
Χρησιμοποιήστε τον παρακάτω κώδικα για να καταργήσετε το σημείο μονταρίσματος (/δοκιμή σε αυτό το παράδειγμα):
mssparkutils.fs.unmount("/test")
Γνωστοί περιορισμοί
Η τρέχουσα μονταρίσματος είναι μια ρύθμιση παραμέτρων επιπέδου εργασίας. Συνιστούμε να χρησιμοποιήσετε το API μονταρίσματος για να ελέγξετε εάν υπάρχει ή δεν υπάρχει σημείο μονταρίσματος.
Ο μηχανισμός unmount δεν είναι αυτόματος. Όταν ολοκληρωθεί η εκτέλεση της εφαρμογής, για να καταργήσετε το σημείο μονταρίσματος και να αποδεσμεύσετε τον χώρο στον δίσκο, πρέπει να καλέσετε ρητά ένα API unmount στον κώδικά σας. Διαφορετικά, το σημείο μονταρίσματος θα εξακολουθεί να υπάρχει στον κόμβο μετά την ολοκλήρωση της εκτέλεσης της εφαρμογής.
Η τοποθέτηση ενός λογαριασμού χώρου αποθήκευσης ADLS Gen1 δεν υποστηρίζεται.
Βοηθητικά προγράμματα Lakehouse
mssparkutils.lakehouse
Η παρέχει βοηθητικά προγράμματα ειδικά προσαρμοσμένα για τη διαχείριση αντικειμένων σχεδίασης Lakehouse. Αυτά τα βοηθητικά προγράμματα δίνουν τη δυνατότητα στους χρήστες να δημιουργούν, να ανακτούν, να ενημερώνουν και να διαγράφουν αντικείμενα σχεδίασης Lakehouse χωρίς προσπάθεια.
Σημείωμα
Τα API Lakehouse υποστηρίζονται μόνο στην έκδοση χρόνου εκτέλεσης 1.2+.
Επισκόπηση μεθόδων
Ακολουθεί μια επισκόπηση των διαθέσιμων μεθόδων που παρέχονται από mssparkutils.lakehouse
το :
# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]
Παραδείγματα χρήσης
Για να χρησιμοποιήσετε αποτελεσματικά αυτές τις μεθόδους, εξετάστε τα παρακάτω παραδείγματα χρήσης:
Δημιουργία ενός αντικειμένου σχεδίασης Lakehouse
artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
Ανάκτηση ενός αντικειμένου σχεδίασης Lakehouse
artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")
Ενημέρωση ενός αντικειμένου σχεδίασης Lakehouse
updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
Διαγραφή αντικειμένου σχεδίασης Lakehouse
is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")
Παράθεση τεχνουργημάτων Lakehouse
artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")
Πρόσθετες πληροφορίες
Για πιο λεπτομερείς πληροφορίες σχετικά με κάθε μέθοδο και τις παραμέτρους της, χρησιμοποιήστε τη mssparkutils.lakehouse.help("methodName")
συνάρτηση.
Με τα βοηθητικά προγράμματα Lakehouse του MSSparkUtils, η διαχείριση των αντικειμένων σχεδίασης Lakehouse γίνεται πιο αποδοτική και ενσωματωμένη στις διοχετεύσεις fabric σας, βελτιώνοντας τη συνολική εμπειρία διαχείρισης δεδομένων σας.
Μην διστάσετε να εξερευνήσετε αυτά τα βοηθητικά προγράμματα και να τα ενσωματώσετε στις ροές εργασιών fabric σας για απρόσκοπτη διαχείριση αντικειμένων σχεδίασης Lakehouse.
Βοηθητικά προγράμματα χρόνου εκτέλεσης
Εμφάνιση των πληροφοριών περιβάλλοντος περιόδου λειτουργίας
Με mssparkutils.runtime.context
μπορείτε να λάβετε τις πληροφορίες περιβάλλοντος της τρέχουσας ζωντανής περιόδου λειτουργίας, συμπεριλαμβανομένου του ονόματος σημειωματάριου, της προεπιλεγμένης λίμνης, των πληροφοριών χώρου εργασίας, εάν πρόκειται για εκτέλεση διοχέτευσης κ.λπ.
mssparkutils.runtime.context
Γνωστό ζήτημα
Όταν χρησιμοποιείτε την έκδοση χρόνου εκτέλεσης πάνω από 1.2 και εκτελείτε mssparkutils.help()
το , τα API fabricClient, warehouse και workspace δεν υποστηρίζονται προς το παρόν, θα είναι διαθέσιμα στο περαιτέρω.