Úvod do nástrojů Microsoft Sparku
Microsoft Spark Utilities (MSSparkUtils) je integrovaný balíček, který vám pomůže snadno provádět běžné úlohy. MsSparkUtils můžete použít k práci se systémy souborů, k získání proměnných prostředí, ke zřetězení poznámkových bloků a práci s tajnými kódy. MSSparkUtils jsou k dispozici v PySpark (Python)
kanálech , Scala
, .NET Spark (C#)
a R (Preview)
poznámkových bloků a Synapse.
Požadavky
Konfigurace přístupu k Azure Data Lake Storage Gen2
Poznámkové bloky Synapse používají průchozí průchod Microsoft Entra pro přístup k účtům ADLS Gen2. Abyste mohli získat přístup k účtu ADLS Gen2 (nebo složce), musíte být přispěvatelem dat objektů blob služby Storage.
Kanály Synapse používají identitu spravované služby (MSI) pracovního prostoru pro přístup k účtům úložiště. Pokud chcete v aktivitách kanálu používat MSSparkUtils, musí být identita pracovního prostoru přispěvatelem dat objektů blob úložiště, aby bylo možné získat přístup k účtu ADLS Gen2 (nebo složce).
Postupujte podle těchto kroků a ujistěte se, že vaše ID Microsoft Entra a MSI pracovního prostoru mají přístup k účtu ADLS Gen2:
Otevřete Azure Portal a účet úložiště, ke které chcete získat přístup. Můžete přejít na konkrétní kontejner, ke který chcete získat přístup.
Na levém panelu vyberte řízení přístupu (IAM ).
Kliknutím na Přidat>Přidat přiřazení role otevřete stránku Přidat přiřazení role.
Přiřaďte následující roli. Podrobný postup najdete v tématu Přiřazování rolí Azure s využitím webu Azure Portal.
Nastavení Hodnota Role Přispěvatel dat objektů blob úložiště Přiřadit přístup k USER and MANAGEDIDENTITY Členové účet Microsoft Entra a identita pracovního prostoru Poznámka:
Název spravované identity je také název pracovního prostoru.
Zvolte Uložit.
K datům v ADLS Gen2 můžete přistupovat pomocí Synapse Sparku pomocí následující adresy URL:
abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>
Konfigurace přístupu ke službě Azure Blob Storage
Synapse používá pro přístup ke službě Azure Blob Storage sdílený přístupový podpis (SAS). Abyste se vyhnuli zveřejnění klíčů SAS v kódu, doporučujeme vytvořit novou propojenou službu v pracovním prostoru Synapse pro účet služby Azure Blob Storage, ke které chcete získat přístup.
Pokud chcete přidat novou propojenou službu pro účet služby Azure Blob Storage, postupujte takto:
- Otevřete Azure Synapse Studio.
- Na levém panelu vyberte Spravovat a v části Externí připojení vyberte Propojené služby.
- Na panelu Nová propojená služba na pravé straně vyhledejte Azure Blob Storage.
- Zvolte Pokračovat.
- Vyberte účet služby Azure Blob Storage pro přístup a nakonfigurujte název propojené služby. Navrhněte použití klíče účtu pro metodu ověřování.
- Výběrem možnosti Test připojení ověřte správnost nastavení.
- Nejprve vyberte Vytvořit a kliknutím na Publikovat vše uložte provedené změny.
K datům ve službě Azure Blob Storage se službou Synapse Spark můžete přistupovat prostřednictvím následující adresy URL:
wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>
Tady je příklad kódu:
from pyspark.sql import SparkSession
# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Allow SPARK to access from Blob remotely
wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name
val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)
var blob_account_name = ""; // replace with your blob name
var blob_container_name = ""; // replace with your container name
var blob_relative_path = ""; // replace with your relative folder path
var linked_service_name = ""; // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);
spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);
var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";
Console.WriteLine(wasbs_path);
# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name
blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)
print( paste('Remote blob path: ',wasb_path))
Konfigurace přístupu ke službě Azure Key Vault
Službu Azure Key Vault můžete přidat jako propojenou službu pro správu přihlašovacích údajů ve službě Synapse. Pokud chcete přidat službu Azure Key Vault jako propojenou službu Synapse, postupujte takto:
Na levém panelu vyberte Spravovat a v části Externí připojení vyberte Propojené služby.
Na panelu Nová propojená služba na pravé straně vyhledejte Azure Key Vault.
Vyberte účet služby Azure Key Vault pro přístup a nakonfigurujte název propojené služby.
Výběrem možnosti Test připojení ověřte správnost nastavení.
Nejprve vyberte Vytvořit a kliknutím na Publikovat vše uložte změnu.
Poznámkové bloky Synapse používají průchod Microsoft Entra pro přístup ke službě Azure Key Vault. Kanály Synapse používají pro přístup ke službě Azure Key Vault identitu pracovního prostoru (MSI). Abyste měli jistotu, že váš kód funguje v poznámkovém bloku i v kanálu Synapse, doporučujeme udělit oprávnění k tajnému přístupu pro váš účet Microsoft Entra i identitu pracovního prostoru.
Pomocí následujícího postupu udělte identitě pracovního prostoru tajný přístup:
- Otevřete Azure Portal a Azure Key Vault, ke které chcete získat přístup.
- Na levém panelu vyberte zásady přístupu.
- Vyberte Přidat zásadu přístupu:
- Jako šablonu konfigurace zvolte Klíč, Tajný klíč a Správa certifikátů.
- Vyberte svůj účet Microsoft Entra a identitu pracovního prostoru (stejnou jako název vašeho pracovního prostoru) ve vybraném objektu zabezpečení nebo se ujistěte, že už je přiřazená.
- Vyberte vybrat a přidat.
- Výběrem tlačítka Uložit potvrďte změny.
Nástroje systému souborů
mssparkutils.fs
poskytuje nástroje pro práci s různými systémy souborů, včetně Azure Data Lake Storage Gen2 (ADLS Gen2) a Azure Blob Storage. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Data Lake Storage Gen2 a Azure Blob Storage .
Pro přehled dostupných metod spusťte následující příkazy:
from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()
Výsledky:
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
Use mssparkutils.fs.help("methodName") for more info about a method.
Zobrazení souborů
Vypíše obsah adresáře.
mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")
Zobrazení vlastností souboru
Vrátí vlastnosti souboru, včetně názvu souboru, cesty k souboru, velikosti souboru, času úpravy souboru a zda se jedná o adresář a soubor.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}
Vytvoření nového adresáře
Vytvoří daný adresář, pokud neexistuje, a všechny nezbytné nadřazené adresáře.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")
Kopírování souboru
Zkopíruje soubor nebo adresář. Podporuje kopírování napříč systémy souborů.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)
Výkonný soubor kopírování
Tato metoda poskytuje rychlejší způsob kopírování nebo přesouvání souborů, zejména velkých objemů dat.
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively
Poznámka:
Tato metoda podporuje pouze modul runtime Azure Synapse pro Apache Spark 3.3 a Modul runtime Azure Synapse pro Apache Spark 3.4.
Náhled obsahu souboru
Vrátí až první bajty maxBytes daného souboru jako řetězec kódovaný v UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)
Přesun souboru
Přesune soubor nebo adresář. Podporuje přesun mezi systémy souborů.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Zápis souboru
Zapíše daný řetězec do souboru zakódovaného v kódování UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Připojení obsahu k souboru
Připojí daný řetězec k souboru zakódovanému v UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Poznámka:
mssparkutils.fs.append()
amssparkutils.fs.put()
nepodporuje souběžné zápisy do stejného souboru kvůli nedostatku záruk atomicity.- Při použití
mssparkutils.fs.append
rozhraní API vefor
smyčce pro zápis do stejného souboru doporučujeme přidatsleep
příkaz kolem 0,5s~1s mezi opakující se zápisy. Důvodem je to, žemssparkutils.fs.append
interníflush
operace rozhraní API je asynchronní, takže krátká prodleva pomáhá zajistit integritu dat.
Odstranění souboru nebo adresáře
Odebere soubor nebo adresář.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Nástroje poznámkového bloku
Nepodporováno
Pomocí nástrojů MSSparkUtils Notebook můžete spustit poznámkový blok nebo ukončit poznámkový blok s hodnotou. Spuštěním následujícího příkazu získejte přehled dostupných metod:
mssparkutils.notebook.help()
Získání výsledků:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Poznámka:
Nástroje poznámkového bloku se nevztahují na definice úloh Apache Sparku (SJD).
Odkaz na poznámkový blok
Odkaz na poznámkový blok a vrátí jeho výstupní hodnotu. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu. Odkazovaný poznámkový blok se spustí ve fondu Sparku, ve kterém tento poznámkový blok volá tuto funkci.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Příklad:
mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })
Po dokončení běhu uvidíte odkaz na snímek s názvem Zobrazit spuštění poznámkového bloku: Název poznámkového bloku zobrazený ve výstupu buňky. Kliknutím na odkaz zobrazíte snímek pro toto konkrétní spuštění.
Paralelní spouštění více poznámkových bloků
Tato metoda mssparkutils.notebook.runMultiple()
umožňuje paralelně spouštět více poznámkových bloků nebo s předdefinovanou topologickou strukturou. Rozhraní API používá mechanismus implementace více vláken v rámci relace Sparku, což znamená, že výpočetní prostředky jsou sdíleny referenčním poznámkovým blokem.
Pomocí mssparkutils.notebook.runMultiple()
:
Spusťte několik poznámkových bloků současně, aniž byste museli čekat na dokončení každého z nich.
Pomocí jednoduchého formátu JSON určete závislosti a pořadí provádění poznámkových bloků.
Optimalizujte využití výpočetních prostředků Sparku a snižte náklady na vaše projekty Synapse.
Prohlédněte si snímky každého záznamu spuštění poznámkového bloku ve výstupu a pohodlně laďte/monitorujte úlohy poznámkového bloku.
Získejte výstupní hodnotu jednotlivých aktivit vedení a použijte je v podřízených úkolech.
Můžete se také pokusit spustit mssparkutils.notebook.help("runMultiple") a vyhledat příklad a podrobné využití.
Tady je jednoduchý příklad paralelního spuštění seznamu poznámkových bloků pomocí této metody:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
Výsledek spuštění z kořenového poznámkového bloku je následující:
Následuje příklad spouštění poznámkových bloků s topologickou strukturou pomocí mssparkutils.notebook.runMultiple()
. Pomocí této metody můžete snadno orchestrovat poznámkové bloky prostřednictvím prostředí kódu.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
]
}
mssparkutils.notebook.runMultiple(DAG)
Poznámka:
- Tato metoda podporuje pouze modul runtime Azure Synapse pro Apache Spark 3.3 a Modul runtime Azure Synapse pro Apache Spark 3.4.
- Stupeň paralelismu spuštění více poznámkových bloků je omezený na celkový dostupný výpočetní prostředek relace Sparku.
Ukončení poznámkového bloku
Ukončí poznámkový blok s hodnotou. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu.
Když z poznámkového bloku interaktivně zavoláte funkci exit(), Azure Synapse vyvolá výjimku, přeskočí spuštěné buňky dílčích sekvencí a zachová relaci Sparku naživu.
Když orchestrujete poznámkový blok, který volá
exit()
funkci v kanálu Synapse, Azure Synapse vrátí výstupní hodnotu, dokončí spuštění kanálu a zastaví relaci Sparku.Když zavoláte
exit()
funkci v odkazovaném poznámkovém bloku, Azure Synapse zastaví další spuštění v poznámkovém bloku, na který serun()
odkazuje, a dál spustí další buňky v poznámkovém bloku, které funkci volají. Například: Notebook1 obsahuje tři buňky a voláexit()
funkci ve druhé buňce. Poznámkový blok 2 obsahuje pět buněk a volánírun(notebook1)
ve třetí buňce. Při spuštění poznámkového bloku 2 se poznámkový blok 1 zastaví na druhé buňce při stisknutíexit()
funkce. Poznámkový blok 2 bude dál spouštět svou čtvrtou buňku a pátou buňku.
mssparkutils.notebook.exit("value string")
Příklad:
Poznámkový blok Sample1 se nachází ve složce/ s následujícími dvěma buňkami:
- Buňka 1 definuje vstupní parametr s výchozí hodnotou nastavenou na 10.
- Buňka 2 ukončí poznámkový blok se vstupem jako výstupní hodnotou.
Ukázku 1 můžete spustit v jiném poznámkovém bloku s výchozími hodnotami:
exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)
Výsledky:
Sample1 run success with input is 10
Ukázku 1 můžete spustit v jiném poznámkovém bloku a nastavit vstupní hodnotu na 20:
exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)
Výsledky:
Sample1 run success with input is 20
Pomocí nástrojů MSSparkUtils Notebook můžete spustit poznámkový blok nebo ukončit poznámkový blok s hodnotou. Spuštěním následujícího příkazu získejte přehled dostupných metod:
mssparkutils.notebook.help()
Získání výsledků:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Odkaz na poznámkový blok
Odkaz na poznámkový blok a vrátí jeho výstupní hodnotu. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu. Odkazovaný poznámkový blok se spustí ve fondu Sparku, ve kterém tento poznámkový blok volá tuto funkci.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Příklad:
mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))
Po dokončení běhu uvidíte odkaz na snímek s názvem Zobrazit spuštění poznámkového bloku: Název poznámkového bloku zobrazený ve výstupu buňky. Kliknutím na odkaz zobrazíte snímek pro toto konkrétní spuštění.
Ukončení poznámkového bloku
Ukončí poznámkový blok s hodnotou. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu.
Když interaktivně zavoláte funkci poznámkového
exit()
bloku, Azure Synapse vyvolá výjimku, přeskočí spuštěné buňky dílčích sekvencí a zachová relaci Sparku naživu.Když orchestrujete poznámkový blok, který volá
exit()
funkci v kanálu Synapse, Azure Synapse vrátí výstupní hodnotu, dokončí spuštění kanálu a zastaví relaci Sparku.Když zavoláte
exit()
funkci v odkazovaném poznámkovém bloku, Azure Synapse zastaví další spuštění v poznámkovém bloku, na který serun()
odkazuje, a dál spustí další buňky v poznámkovém bloku, které funkci volají. Například: Notebook1 obsahuje tři buňky a voláexit()
funkci ve druhé buňce. Poznámkový blok 2 obsahuje pět buněk a volánírun(notebook1)
ve třetí buňce. Při spuštění poznámkového bloku 2 se poznámkový blok 1 zastaví na druhé buňce při stisknutíexit()
funkce. Poznámkový blok 2 bude dál spouštět svou čtvrtou buňku a pátou buňku.
mssparkutils.notebook.exit("value string")
Příklad:
Poznámkový blok Sample1 vyhledá pod mssparkutils/folder/ s následujícími dvěma buňkami:
- Buňka 1 definuje vstupní parametr s výchozí hodnotou nastavenou na 10.
- Buňka 2 ukončí poznámkový blok se vstupem jako výstupní hodnotou.
Ukázku 1 můžete spustit v jiném poznámkovém bloku s výchozími hodnotami:
val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)
Výsledky:
exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10
Ukázku 1 můžete spustit v jiném poznámkovém bloku a nastavit vstupní hodnotu na 20:
val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)
Výsledky:
exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20
Pomocí nástrojů MSSparkUtils Notebook můžete spustit poznámkový blok nebo ukončit poznámkový blok s hodnotou. Spuštěním následujícího příkazu získejte přehled dostupných metod:
mssparkutils.notebook.help()
Získání výsledků:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Odkaz na poznámkový blok
Odkaz na poznámkový blok a vrátí jeho výstupní hodnotu. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu. Odkazovaný poznámkový blok se spustí ve fondu Sparku, ve kterém tento poznámkový blok volá tuto funkci.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Příklad:
mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))
Po dokončení běhu uvidíte odkaz na snímek s názvem Zobrazit spuštění poznámkového bloku: Název poznámkového bloku zobrazený ve výstupu buňky. Kliknutím na odkaz zobrazíte snímek pro toto konkrétní spuštění.
Ukončení poznámkového bloku
Ukončí poznámkový blok s hodnotou. Volání vnořené funkce můžete v poznámkovém bloku spouštět interaktivně nebo v kanálu.
Když interaktivně zavoláte funkci poznámkového
exit()
bloku, Azure Synapse vyvolá výjimku, přeskočí spuštěné buňky dílčích sekvencí a zachová relaci Sparku naživu.Když orchestrujete poznámkový blok, který volá
exit()
funkci v kanálu Synapse, Azure Synapse vrátí výstupní hodnotu, dokončí spuštění kanálu a zastaví relaci Sparku.Když zavoláte
exit()
funkci v odkazovaném poznámkovém bloku, Azure Synapse zastaví další spuštění v poznámkovém bloku, na který serun()
odkazuje, a dál spustí další buňky v poznámkovém bloku, které funkci volají. Například: Notebook1 obsahuje tři buňky a voláexit()
funkci ve druhé buňce. Poznámkový blok 2 obsahuje pět buněk a volánírun(notebook1)
ve třetí buňce. Při spuštění poznámkového bloku 2 se poznámkový blok 1 zastaví na druhé buňce při stisknutíexit()
funkce. Poznámkový blok 2 bude dál spouštět svou čtvrtou buňku a pátou buňku.
mssparkutils.notebook.exit("value string")
Příklad:
Poznámkový blok Sample1 se nachází ve složce/ s následujícími dvěma buňkami:
- Buňka 1 definuje vstupní parametr s výchozí hodnotou nastavenou na 10.
- Buňka 2 ukončí poznámkový blok se vstupem jako výstupní hodnotou.
Ukázku 1 můžete spustit v jiném poznámkovém bloku s výchozími hodnotami:
exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)
Výsledky:
Sample1 run success with input is 10
Ukázku 1 můžete spustit v jiném poznámkovém bloku a nastavit vstupní hodnotu na 20:
exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)
Výsledky:
Sample1 run success with input is 20
Nástroje pro přihlašovací údaje
Pomocí nástrojů MSSparkUtils Credentials Utilities můžete získat přístupové tokeny propojených služeb a spravovat tajné kódy ve službě Azure Key Vault.
Spuštěním následujícího příkazu získejte přehled dostupných metod:
mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()
Získání výsledku:
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
Poznámka:
V současné době se v jazyce C# nepodporuje getSecretWithLS(linkedService, tajný klíč).
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
Získání tokenu
Vrátí token Microsoft Entra pro danou cílovou skupinu, název (volitelné). V následující tabulce jsou uvedeny všechny dostupné typy cílových skupin:
Typ cílové skupiny | Řetězcový literál, který se má použít při volání rozhraní API |
---|---|
Azure Storage | Storage |
Azure Key Vault | Vault |
Správa Azure | AzureManagement |
Azure SQL Data Warehouse (vyhrazené a bezserverové) | DW |
Azure Synapse | Synapse |
Azure Data Lake Store | DataLakeStore |
Azure Data Factory | ADF |
Průzkumník dat Azure | AzureDataExplorer |
Azure Database for MySQL | AzureOSSDB |
Azure Database for MariaDB | AzureOSSDB |
Azure Database for PostgreSQL | AzureOSSDB |
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')
Ověření tokenu
Vrátí hodnotu true, pokud nevypršela platnost tokenu.
mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')
Získání připojovací řetězec nebo přihlašovacích údajů pro propojenou službu
Vrátí připojovací řetězec nebo přihlašovací údaje pro propojenou službu.
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
Získání tajného kódu pomocí identity pracovního prostoru
Vrátí tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí identity pracovního prostoru. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Key Vault .
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
Získání tajného kódu pomocí přihlašovacích údajů uživatele
Vrátí tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí přihlašovacích údajů uživatele.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Vložení tajného kódu pomocí identity pracovního prostoru
Vloží tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí identity pracovního prostoru. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Key Vault .
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')
Vložení tajného kódu pomocí identity pracovního prostoru
Vloží tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí identity pracovního prostoru. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Key Vault .
mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")
Vložení tajného kódu pomocí identity pracovního prostoru
Vloží tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí identity pracovního prostoru. Ujistěte se, že správně nakonfigurujete přístup ke službě Azure Key Vault .
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')
Vložení tajného kódu pomocí přihlašovacích údajů uživatele
Vloží tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí přihlašovacích údajů uživatele.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')
Vložení tajného kódu pomocí přihlašovacích údajů uživatele
Vloží tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí přihlašovacích údajů uživatele.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')
Vložení tajného kódu pomocí přihlašovacích údajů uživatele
Vloží tajný klíč služby Azure Key Vault pro daný název služby Azure Key Vault, název tajného kódu a název propojené služby pomocí přihlašovacích údajů uživatele.
mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")
Nástroje pro prostředí
Spuštěním následujících příkazů získáte přehled dostupných metod:
mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()
Získání výsledku:
getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id
Získání uživatelského jména
Vrátí aktuální uživatelské jméno.
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()
Získání ID uživatele
Vrátí aktuální ID uživatele.
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()
Získání ID úlohy
Vrátí ID úlohy.
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()
Získání názvu pracovního prostoru
Vrátí název pracovního prostoru.
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()
Získání názvu fondu
Vrátí název fondu Spark.
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()
Získání ID clusteru
Vrátí aktuální ID clusteru.
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()
Kontext modulu runtime
Nástroje modulu runtime Mssparkutils odhalily 3 vlastnosti modulu runtime. K získání vlastností uvedených níže můžete použít kontext modulu runtime mssparkutils:
- Název poznámkového bloku – název aktuálního poznámkového bloku vždy vrátí hodnotu pro interaktivní režim i režim kanálu.
- Pipelinejobid – ID spuštění kanálu vrátí hodnotu v režimu kanálu a vrátí prázdný řetězec v interaktivním režimu.
- Activityrunid – ID spuštění aktivity poznámkového bloku vrátí hodnotu v režimu kanálu a vrátí prázdný řetězec v interaktivním režimu.
Kontext modulu runtime v současné době podporuje Python i Scala.
mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context
Správa relací
Zastavení interaktivní relace
Místo ručního kliknutí na tlačítko Zastavit je někdy vhodnější zastavit interaktivní relaci voláním rozhraní API v kódu. V takových případech poskytujeme rozhraní API mssparkutils.session.stop()
, které podporuje zastavení interaktivní relace prostřednictvím kódu, je k dispozici pro Scala a Python.
mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()
Rozhraní API zastaví aktuální interaktivní relaci asynchronně na pozadí, zastaví relaci Sparku a uvolní prostředky obsazené relací, aby byly dostupné pro jiné relace ve stejném fondu.
Poznámka:
Nedoporučujeme volat integrovaná rozhraní API jazyka, jako je sys.exit
Scala nebo sys.exit()
Python ve vašem kódu, protože taková rozhraní API pouze ukončují proces interpreta, takže relace Sparku zůstane aktivní a prostředky se nevolají.
Závislosti balíčků
Pokud chcete vyvíjet poznámkové bloky nebo úlohy místně a potřebujete odkazovat na relevantní balíčky pro rady pro kompilaci nebo integrované vývojové prostředí, můžete použít následující balíčky.
Další kroky
- Podívejte se na ukázkové poznámkové bloky Synapse
- Rychlý start: Vytvoření fondu Apache Spark ve službě Azure Synapse Analytics pomocí webových nástrojů
- Co je Apache Spark ve službě Azure Synapse Analytics
- Azure Synapse Analytics
- Použití rozhraní API pro připojení nebo odpojení souboru ve službě Synapse