Freigeben über


Microsoft Spark-Hilfsprogramme (MSSparkUtils) für Fabric

Microsoft Spark Utilities (MSSparkUtils) ist ein integriertes Paket, mit dem sich gängige Aufgaben leichter erledigen lassen. Sie können MSSparkUtils verwenden, um mit Dateisystemen zu arbeiten, Umgebungsvariablen zu erhalten, Notebooks miteinander zu verketten und mit Geheimnissen zu arbeiten. Das MSSparkUtils-Paket ist in PySpark (Python) Scala, SparkR-Notebooks und Fabric-Pipelines verfügbar.

Hinweis

  • MsSparkUtils wurde offiziell in NotebookUtils umbenannt. Der vorhandene Code bleibt abwärtskompatibel und verursacht keine Breaking Changes. Es wird dringend empfohlen, ein Upgrade auf NotebookUtils durchzuführen, um weiterhin Unterstützung für und Zugriff auf neue Features zu gewährleisten. Der Namespace „mssparkutils“ wird in Zukunft ausgemustert.
  • NotebookUtils ist für die Zusammenarbeit mit Spark 3.4 (Runtime v1.2) und höher ausgelegt. Alle neuen Features und Updates werden künftig ausschließlich mit dem Namespace „notebookutils“ unterstützt.

Dateisystem-Hilfsprogramme

mssparkutils.fs stellt Hilfsprogramme für die Arbeit mit verschiedenen Dateisystemen zur Verfügung, einschließlich Azure Data Lake Storage (ADLS) Gen2 und Azure Blob Storage. Stellen Sie sicher, dass Sie den Zugriff auf Azure Data Lake Storage Gen2 und Azure Blob Storage entsprechend konfigurieren.

Führen Sie die folgenden Befehle aus, um eine Übersicht über die verfügbaren Methoden zu erhalten:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Ausgabe

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils funktioniert mit dem Dateisystem auf die gleiche Weise wie Spark-APIs. Beispiel: Syntax von mssparkuitls.fs.mkdirs() und Fabric Lakehouse:

Verwendung Relativer Pfad vom HDFS-Stamm Absoluter Pfad für das ABFS-Dateisystem Absoluter Pfad für das lokale Dateisystem auf dem Treiberknoten
Nicht-Standard-Lakehouse Nicht unterstützt mssparkutils.fs.mkdirs("abfss://< container_name>@<speicherkonto_name.dfs.core.windows.net/>< neues_verz>") mssparkutils.fs.mkdirs("datei:/<neues_verz>")
Standard-Lakehouse Verzeichnis unter „Files“ oder „Tables“: mssparkutils.fs.mkdirs("Files/<neues_verz>") mssparkutils.fs.mkdirs("abfss://< container_name>@<speicherkonto_name.dfs.core.windows.net/>< neues_verz>") mssparkutils.fs.mkdirs("datei:/<neues_verz>")

Auflisten von Dateien

Zum Auflisten des Inhalts eines Verzeichnisses verwenden Sie mssparkutils.fs.ls('Ihr Verzeichnispfad'). Beispiel:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Anzeigen von Dateieigenschaften

Diese Methode gibt Dateieigenschaften zurück, einschließlich Dateiname, Dateipfad, Dateigröße und ob es sich um ein Verzeichnis und eine Datei handelt.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Neues Verzeichnis erstellen

Diese Methode erstellt das angegebene Verzeichnis, wenn es nicht vorhanden ist, und alle erforderlichen übergeordneten Verzeichnisse.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Datei kopieren

Diese Methode kopiert eine Datei oder ein Verzeichnis und unterstützt dateisystemübergreifende Kopieraktivitäten.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Leistungsfähiges Kopieren von Dateien

Diese Methode bietet eine schnellere Möglichkeit zum Kopieren oder Verschieben von Dateien, insbesondere bei großen Datenmengen.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Vorschau von Dateiinhalt anzeigen

Diese Methode gibt bis zu den ersten „maxBytes”-Bytes der angegebenen Datei als in UTF-8 codierte Zeichenfolge zurück.

mssparkutils.fs.head('file path', maxBytes to read)

Datei verschieben

Diese Methode verschiebt eine Datei oder ein Verzeichnis und unterstützt dateisystemübergreifende Verschiebungen.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Datei schreiben

Diese Methode schreibt die angegebene Zeichenfolge in eine Datei, die in UTF-8 codiert ist.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Inhalt an eine Datei anfügen

Diese Methode fügt die angegebene Zeichenfolge an eine Datei an, die in UTF-8 codiert ist.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Hinweis

Wenn Sie die mssparkutils.fs.append-API in einer for-Schleife zum Schreiben in dieselbe Datei verwenden, empfehlen wir, eine sleep-Anweisung um 0,5s~1s zwischen den wiederkehrenden Schreibvorgängen hinzuzufügen. Dies liegt daran, dass der interne mssparkutils.fs.append-Vorgang der flush-API asynchron ist, sodass eine kurze Verzögerung die Datenintegrität gewährleistet.

Datei oder Verzeichnis löschen

Die Methode entfernt eine Datei oder ein Verzeichnis.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Verzeichnis einbinden/Einbindung aufheben

Weitere Informationen zur detaillierten Verwendung finden Sie unter Ein- und Aushängen von Dateien.

Notebook-Utilities

Verwenden Sie die MSSparkUtils Notebook Utilities, um ein Notebook auszuführen oder ein Notebook mit einem Wert zu beenden. Führen Sie den folgenden Befehl aus, um eine Übersicht über die verfügbaren Methoden zu erhalten:

mssparkutils.notebook.help()

Ausgabe:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Hinweis

Notebook-Dienstprogramme gelten nicht für Apache Spark-Auftragsdefinitionen (SJD).

Verweis auf ein Notizbuch

Diese Methode verweist auf ein Notebook und gibt dessen Exit-Wert zurück. Sie können Verschachtelungsfunktionsaufrufe in einem Notebook interaktiv oder in einer Pipeline ausführen. Das Notebook, auf das verwiesen wird, wird in dem Spark-Pool ausgeführt, in dem das Notebook diese Funktion aufruft.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Beispiel:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Das Fabric-Notebook unterstützt auch das Verweisen auf Notebooks über mehrere Arbeitsbereiche hinweg, indem die Arbeitsbereichs-ID angegeben wird.

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Sie können den Momentaufnahmelink der Verweisausführung in der Zellenausgabe öffnen. Die Momentaufnahme erfasst die Ergebnisse der Codeausführung und ermöglicht das einfache Debuggen einer Verweisausführung.

Screenshot mit dem Ergebnis der Verweisausführung.

Screenshot einer Momentaufnahme mit Codeausführungsergebnissen.

Hinweis

  • Das arbeitsbereichübergreifende Referenznotebook wird von Laufzeitversion 1.2 und höher unterstützt.
  • Wenn Sie die Dateien unter Notebook-Ressource verwenden, verwenden Sie mssparkutils.nbResPath im referenzierten Notebook, um sicherzustellen, dass es auf denselben Ordner wie die interaktive Ausführung verweist.

Parallele Referenzausführung mehrerer Notizbücher

Wichtig

Dieses Feature befindet sich in Vorschau.

Mit der Methode mssparkutils.notebook.runMultiple() können Sie mehrere Notebooks parallel oder mit einer vordefinierten topologischen Struktur ausführen. Die API verwendet einen Multithread-Implementierungsmechanismus innerhalb einer Spark-Sitzung, was bedeutet, dass die Computeressourcen vom Referenznotebook gemeinsam genutzt werden.

Mit mssparkutils.notebook.runMultiple() haben Sie folgende Möglichkeiten:

  • Führen Sie mehrere Notebooks gleichzeitig aus, ohne darauf zu warten, dass die einzelnen Notebooks abgeschlossen sind.

  • Geben Sie die Abhängigkeiten und die Reihenfolge der Ausführung für Ihre Notebooks mithilfe eines einfachen JSON-Formats an.

  • Optimieren Sie die Verwendung von Spark Computeressourcen, und reduzieren Sie die Kosten Ihrer Fabric-Projekte.

  • Zeigen Sie die Momentaufnahmen der einzelnen Notebook-Ausführungsdatensätze in der Ausgabe an, und debuggen/überwachen Sie Ihre Notebookaufgaben bequem.

  • Ermitteln Sie den Beendenwert jeder ausführenden Aktivität und verwenden Sie diesen in nachgelagerten Aufgaben.

Sie können auch versuchen, das Mssparkutils.notebook.help("runMultiple") auszuführen, um das Beispiel und die detaillierte Verwendung zu finden.

Hier ist ein einfaches Beispiel für die parallele Ausführung einer Liste von Notebooks mit dieser Methode:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Das Ausführungsergebnis aus dem Stammnotebook lautet wie folgt:

Screenshot des Verweises auf eine Liste von Notebooks.

Im Folgenden sehen Sie ein Beispiel für das Ausführen von Notebooks mit topologischer Struktur mithilfe von mssparkutils.notebook.runMultiple(). Verwenden Sie diese Methode, um Notebooks einfach über eine Codeumgebung zu koordinieren.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Das Ausführungsergebnis aus dem Stammnotebook lautet wie folgt:

Screenshot des Verweises auf eine Liste von Notebooks mit Parametern.

Hinweis

  • Der Parallelitätsgrad der Ausführung mehrerer Notebooks ist auf die gesamte verfügbare Computeressource einer Spark-Sitzung beschränkt.
  • Die Obergrenze für Notebook-Aktivitäten oder gleichzeitige Notebooks beträgt 50. Eine Überschreitung dieses Grenzwerts kann zu Stabilitäts- und Leistungsproblemen führen, da eine hohe Auslastung der Computeressourcen auftritt. Wenn Probleme auftreten, sollten Sie in Erwägung ziehen, die Notebooks in mehrere runMultiple Aufrufe aufzuteilen oder die Gleichzeitigkeit zu reduzieren, indem Sie das Feld Gleichzeitigkeit im DAG-Parameter anpassen.
  • Die Standard-Zeitüberschreitung für die gesamte DAG beträgt 12 Stunden, und die Standard-Zeitüberschreitung für jede Zelle im untergeordneten Notebook beträgt 90 Sekunden. Sie können die Zeitüberschreitung ändern, indem Sie die Felder timeoutInSeconds und timeoutPerCellInSeconds im DAG-Parameter festlegen.

Beenden eines Notebooks

Diese Methode beendet ein Notebook mit einem Wert. Sie können Verschachtelungsfunktionsaufrufe in einem Notebook interaktiv oder in einer Pipeline ausführen.

  • Wenn Sie eine exit()-Funktion aus einem Notebook interaktiv aufrufen, löst das Fabric-Notebook eine Ausnahme aus, überspringt die Ausführung von nachfolgenden Zellen und behält die Spark-Sitzung bei.

  • Wenn Sie ein Notebook in einer Pipeline orchestrieren, die eine exit()-Funktion aufruft, gibt die Notebook-Aktivität einen Exit-Wert zurück, schließt die Pipelineausführung ab und beendet die Spark-Sitzung.

  • Wenn Sie eine exit()-Funktion in einem Notebook aufrufen, auf das verwiesen wird, beendet Fabric Spark die weitere Ausführung des Notebooks, auf das verwiesen wird, und fährt mit der Ausführung der nächsten Zellen im Hauptnotebook fort, das die run()-Funktion aufruft. Ein Beispiel: Notebook1 hat drei Zellen und ruft in der zweiten Zelle eine exit()-Funktion auf. Notebook2 weist fünf Zellen auf und ruft run(notebook1) in der dritten Zelle auf. Wenn Sie Notebook2 ausführen, wird Notebook1 in der zweiten Zelle beendet, sobald die exit()-Funktion erreicht wird. Notebook2 setzt die Ausführung seiner vierten Zelle und fünften Zelle fort.

mssparkutils.notebook.exit("value string")

Beispiel:

Sample1-Notebook mit den beiden folgenden Zellen:

  • Zelle 1 definiert einen Eingabe-Parameter, dessen Standardwert auf 10 festgelegt ist.

  • Zelle 2 beendet das Notebook mit Eingabe als Exit-Wert.

Screenshot: Beispielnotebook einer exit-Funktion.

Sie können Muster1 in einem anderen Notebook mit Standardwerten ausführen:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Ausgabe:

Notebook executed successfully with exit value 10

Sie können Muster1 in einem anderen Notebook ausführen und den Eingabewert als 20 festlegen:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Ausgabe:

Notebook executed successfully with exit value 20

Hilfsprogramme für Anmeldeinformationen

Sie können die MSSparkUtils-Hilfsprogramme für Anmeldeinformationen verwenden, um die Zugriffstoken abzurufen und Geheimnisse in einem Azure Key Vault zu verwalten.

Führen Sie den folgenden Befehl aus, um eine Übersicht über die verfügbaren Methoden zu erhalten:

mssparkutils.credentials.help()

Ausgabe:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

Abrufen von Token

getToken gibt ein Microsoft Entra-Token für eine bestimmte Zielgruppe und einen bestimmten Name (optional) zurück. Die folgende Liste zeigt die derzeit verfügbaren Zielgruppenschlüssel:

  • Speicher-Zielgruppenressource: „storage“
  • Power BI-Ressource: „pbi“
  • Azure Key Vault-Ressource: „keyvault“
  • Synapse RTA KQL DB-Ressource: „kusto“

Führen Sie den folgenden Befehl aus, um das Token abzurufen:

mssparkutils.credentials.getToken('audience Key')

Geheimnis mithilfe der Benutzeranmeldeinformationen abrufen

getSecret gibt das Azure Key Vault-Geheimnis für einen angegebenen Azure Key Vault-Endpunkt und Geheimnisnamen mithilfe der Benutzeranmeldeinformationen zurück.

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Ein- und Aushängen von Dateien

Fabric unterstützt die folgenden Einbindungsszenarien im Paket der Microsoft Spark-Hilfsprogramme. Sie können die APIs mount, unmount, getMountPath() und mounts() verwenden, um Remotespeicher (ADLS Gen2) an alle funktionierenden Knoten (Treiberknoten und Workerknoten) anzufügen. Wenn der Bereitstellungspunkt für den Speicher vorhanden ist, können Sie die lokale Datei-API verwenden, um auf Daten zuzugreifen, als ob sie im lokalen Dateisystem gespeichert wären.

Einbinden eines ADLS Gen2-Kontos

Das folgende Beispiel veranschaulicht, wie Sie Azure Data Lake Storage Gen2 einbinden. Das Einbinden von Blob Storage funktioniert ähnlich.

In diesem Beispiel wird davon ausgegangen, dass Sie über ein Data Lake Storage Gen2-Konto mit dem Namen storegen2 verfügen und das Konto über einen Container mit dem Namen mycontainer verfügt, den Sie in /test in Ihrer Spark-Notebooksitzung einbinden möchten.

Screenshot: Position eines einzubindenden Containers

Zum Einbinden des Containers namens mycontainer muss mssparkutils zuerst überprüfen, ob Sie über die Berechtigung zum Zugreifen auf den Container verfügen. Derzeit unterstützt Fabric zwei Authentifizierungsmethoden für den Triggereinbindungsvorgang: accountKey und sastoken.

Einbinden über SAS-Token (Shared Access Signature) oder Kontoschlüssel

MSSparkUtils unterstützt die explizite Übergabe eines Kontoschlüssels oder eines SAS-Token (Shared Access Signature) als Parameter zum Einbinden des Ziels.

Aus Sicherheitsgründen empfehlen wir, Kontoschlüssel oder SAS-Token in Azure Key Vault zu speichern (wie im folgenden Beispiel gezeigt). Sie können sie mithilfe der mssparkutils.credentials.getSecret-API abrufen. Weitere Informationen zu Azure Key Vault finden Sie unter Informationen zu mit Azure Key Vault verwalteten Speicherkontoschlüsseln.

Screenshot des Speicherorts von Geheimnissen in einem Azure Key Vault.

Beispielcode für die accountKey-Methode:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Beispielcode für sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Hinweis

Möglicherweise müssen Sie mssparkutils importieren, wenn es noch nicht verfügbar ist:

from notebookutils import mssparkutils

Einbindungsparameter:

  • fileCacheTimeout: Blobs werden standardmäßig 120 Sekunden lang im lokalen temporären Ordner zwischengespeichert. Während dieser Zeit prüft BlobFuse nicht, ob die Datei aktuell ist oder nicht. Der Parameter kann so festgelegt werden, dass die Standardtimeoutzeit geändert wird. Wenn mehrere Clients gleichzeitig Dateien ändern, empfiehlt es sich, die Cachezeit zu verkürzen oder sogar auf 0 zu ändern und immer die neuesten Dateien vom Server abzurufen, um Inkonsistenzen zwischen lokalen Dateien und Remotedateien zu vermeiden.
  • Timeout: Das Timeout des Einbindungsvorgangs beträgt standardmäßig 120 Sekunden. Der Parameter kann so festgelegt werden, dass die Standardtimeoutzeit geändert wird. Wenn zu viele Executors vorhanden sind oder bei der Einbindung ein Timeout auftritt, empfiehlt es sich, den Wert zu erhöhen.

Sie können diese Parameter wie folgt verwenden:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Hinweis

Aus Sicherheitsgründen ist davon abzuraten, Anmeldeinformationen in Code zu speichern. Zum weiteren Schutz Ihrer Anmeldeinformationen redigieren wir Ihr Geheimnis in der Notebookausgabe. Weitere Informationen finden Sie unter Geheimnisbearbeitung.

Einbinden eines Lakehouse

Beispielcode zum Einbinden eines Lakehouse in /test:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Hinweis

Das Einbinden eines regionalen Endpunkts wird nicht unterstützt. Fabric unterstützt nur das Einbinden des globalen Endpunkts: onelake.dfs.fabric.microsoft.com.

Zugriff auf Dateien unter dem Bereitstellungspunkt mithilfe der mssparktuils fs-API

Der Hauptzweck des Einbindungsvorgangs besteht darin, Kunden den Zugriff auf Daten zu ermöglichen, die in einem Remotespeicherkonto gespeichert sind, indem sie eine lokale Dateisystem-API verwenden. Sie können auch auf die Daten zugreifen, indem Sie die mssparkutils fs-API mit einem bereitgestellten Pfad als Parameter verwenden. Dieses Pfadformat weicht davon etwas ab.

Angenommen, Sie haben den Data Lake Storage Gen2-Container mycontainer mithilfe der Bereitstellungs-API unter /test eingebunden. Wenn Sie mithilfe der lokalen Dateisystem-API auf die Daten zugreifen, sieht das Pfadformat wie folgt aus:

/synfs/notebook/{sessionId}/test/{filename}

Wenn Sie mithilfe der mssparkutils fs-API auf die Daten zugreifen möchten, empfiehlt es sich, getMountPath() zu verwenden, um den genauen Pfad abzurufen:

path = mssparkutils.fs.getMountPath("/test")
  • Auflisten von Verzeichnissen:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Lesen von Dateiinhalten:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Erstellen eines Verzeichnisses:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Zugreifen auf Dateien unter dem Bereitstellungspunkt über einen lokalen Pfad

Sie können die Dateien im Bereitstellungspunkt einfach mithilfe des Standarddateisystems lesen und schreiben. Hier ist ein Python-Beispiel:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Überprüfen vorhandener Bereitstellungspunkte

Sie können die mssparkutils.fs.mounts()-API verwenden, um alle vorhandenen Bereitstellungspunktinformationen zu überprüfen:

mssparkutils.fs.mounts()

Aufheben der Bereitstellung eines Bereitstellungspunkts

Verwenden Sie den folgenden Code, um Ihren Bereitstellungspunkt (/test in diesem Beispiel) aufzuheben:

mssparkutils.fs.unmount("/test")

Bekannte Einschränkungen

  • Die aktuelle Einbindung ist eine Konfiguration auf Auftragsebene. Wir empfehlen Ihnen, die mounts-API zu verwenden, um zu überprüfen, ob der Bereitstellungspunkt vorhanden ist oder nicht verfügbar ist.

  • Der Mechanismus für die Aufhebung der Einbindung ist nicht automatisch. Wenn die Ausführung der Anwendung beendet ist, müssen Sie explizit eine unmount-API in Ihrem Code aufrufen, um den Bereitstellungspunkt aufzuheben und den Speicherplatz freizugeben. Andernfalls ist der Bereitstellungspunkt weiterhin im Knoten vorhanden, nachdem die Anwendungsausführung beendet wurde.

  • Das Einbinden eines ADLS Gen1-Speicherkontos wird nicht unterstützt.

Lakehouse-Hilfsprogramm

mssparkutils.lakehouse stellt Hilfsprogramme bereit, die speziell auf die Verwaltung von Lakehouse-Artefakten zugeschnitten sind. Mit diesen Hilfsprogrammen können Benutzer mühelos Lakehouse-Artefakte erstellen, abrufen, aktualisieren und löschen.

Hinweis

Lakehouse-APIs werden nur in der Laufzeitversion 1.2+ unterstützt.

Übersicht über die Methoden

Im Folgenden finden Sie eine Übersicht über die verfügbaren Methoden, die von mssparkutils.lakehouse bereitgestellt werden:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

Anwendungsbeispiele

Berücksichtigen Sie die folgenden Anwendungsbeispiele, um diese Methoden effektiv zu nutzen:

Erstellen eines Lakehouse-Artefakts

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Abrufen eines Lakehouse-Artefakts

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

Aktualisieren eines Lakehouse-Artefakts

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Löschen eines Lakehouse-Artefakts

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Auflisten von Lakehouse-Artefakten

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

Weitere Informationen

Verwenden Sie die mssparkutils.lakehouse.help("methodName")-Funktion, um detailliertere Informationen zu den einzelnen Methoden und ihren Parametern zu finden.

Mit den Lakehouse-Dienstprogrammen von MSSparkUtils wird die Verwaltung Ihrer Lakehouse-Artefakte effizienter und in Ihre Fabric-Pipelines integriert, wodurch Ihre gesamte Datenverwaltungserfahrung verbessert wird.

Sie können diese Dienstprogramme erkunden und sie in Ihre Fabric-Workflows integrieren, um eine nahtlose Verwaltung von Lakehouse-Artefakten zu ermöglichen.

Runtime-Dienstprogramme

Anzeigen der Sitzungskontextinformationen

Mit mssparkutils.runtime.context kannst du die Kontextinformationen der aktuellen Echtzeitsitzung abrufen, einschließlich des Notebooknamens, des Standardlakehouse, der Workspaceinformation, wenn es sich um eine Pipelineausführung handelt usw.

mssparkutils.runtime.context

Hinweis

mssparkutils.env wird auf Fabric nicht offiziell unterstützt, verwenden Sie bitte notebookutils.runtime.context als Alternative.

Bekanntes Problem

Wenn Sie eine Laufzeitversion über 1.2 verwenden und mssparkutils.help() ausführen, werden die aufgelisteten fabricClient-, Warehouse- und Arbeitsbereich-APIs derzeit nicht unterstützt, werden aber in Zukunft verfügbar sein.