Compartir vía


Protección de credenciales con servicios vinculados mediante mssparkutils

El acceso a los datos desde orígenes externos es un patrón común. Salvo que el origen de datos externo permita el acceso anónimo, lo más probable es que tenga que proteger la conexión con una credencial, un secreto o una cadena de conexión.

Azure Synapse Analytics usa Microsoft Entra passthrough de forma predeterminada para la autenticación entre recursos. Si necesita conectarse a un recurso con otras credenciales, use directamente mssparkutils. El paquete mssparkutils simplifica el proceso de recuperar tokens de SAS, tokens de Microsoft Entra, cadenas de conexión y secretos almacenados en un servicio vinculado o de una instancia de Azure Key Vault.

Microsoft Entra passthrough usa permisos asignados a usted como usuario en Microsoft Entra ID, en lugar de permisos asignados a Synapse o a una entidad de servicio independiente. Por ejemplo, si desea usar el acceso directo de Microsoft Entra para acceder a un blob en una cuenta de almacenamiento, debe ir a esa cuenta de almacenamiento y asignarle el rol de colaborador de blobs.

Al recuperar secretos de Azure Key Vault, se recomienda crear un servicio vinculado a Azure Key Vault. Asegúrese de que la identidad de servicio administrada (MSI) del área de trabajo de Synapse tenga privilegios para la obtención de secretos en Azure Key Vault. Synapse se autenticará en Azure Key Vault mediante la identidad de servicio administrada del área de trabajo de Synapse. Si te conectas directamente a Azure Key Vault sin un servicio vinculado, autentícate mediante la credencial de Microsoft Entra de usuario.

Para más información, consulte la información sobre los servicios vinculados.

Uso

Ayuda de mssparkutils para tokens y secretos

Esta función muestra la documentación de ayuda para la administración de secretos y tokens en Synapse.

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

Resultado obtenido:

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

Acceso a Azure Data Lake Storage Gen2

Almacenamiento principal de ADLS Gen2

El acceso a los archivos desde el Azure Data Lake Storage primario usa Microsoft Entra passthrough para la autenticación de manera predeterminada y no requiere el uso explícito de las mssparkutils. La identidad usada en la autenticación transferida difiere en función de algunos factores. De forma predeterminada, los cuadernos interactivos se ejecutan con la identidad del usuario, pero se pueden cambiar a la identidad de servicio administrado (MSI) del área de trabajo. Los trabajos por lotes y las ejecuciones no interactivas del cuaderno usan la MSI del área de trabajo.

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

Almacenamiento de ADLS Gen2 con servicios vinculados

Azure Synapse Analytics ofrece una experiencia integrada de servicios vinculados al conectarse a Azure Data Lake Storage Gen2. Los servicios vinculados se pueden configurar para autenticarse con una clave de cuenta, una entidad de servicio, una identidad administrada o una credencial.

Cuando el método de autenticación del servicio vinculado se establece en Clave de cuenta, el servicio vinculado se autenticará con la clave de la cuenta de almacenamiento proporcionada, solicitará una clave SAS y la aplicará automáticamente a la solicitud de almacenamiento mediante LinkedServiceBasedSASProvider.

Synapse permite a los usuarios establecer el servicio vinculado para una cuenta de almacenamiento determinada. Esto permite leer y escribir datos de varias cuentas de almacenamiento en una sola aplicación o consulta de Spark. Una vez que establezcamos spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName para cada cuenta de almacenamiento que se usará, Synapse determina qué servicio vinculado se usará para una operación de lectura y escritura determinada. Pero si nuestro trabajo de Spark solo se ocupa de una única cuenta de almacenamiento, podemos omitir el nombre de la cuenta de almacenamiento y usar spark.storage.synapse.linkedServiceName.

Nota:

No se puede cambiar el método de autenticación del contenedor de almacenamiento ABFS predeterminado.

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

Cuando el método de autenticación del servicio vinculado está establecido en Identidad administrada o Entidad de servicio, el servicio vinculado usará el token de la identidad administrada o la entidad de servicio con el proveedor LinkedServiceBasedTokenProvider.

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

Establecimiento de la configuración de autenticación mediante la configuración de Spark

La configuración de autenticación también se puede especificar mediante configuraciones de Spark, en lugar de ejecutar instrucciones de Spark. Todas las configuraciones de Spark deben tener el prefijo spark., así como todas las configuraciones de Hadoop deben tener el prefijo spark.hadoop..

Nombre de la configuración de Spark Valor de configuración
spark.storage.synapse.teststorage.dfs.core.windows.net.linkedServiceName NOMBRE DEL SERVICIO VINCULADO
spark.hadoop.fs.azure.account.oauth.provider.type.teststorage.dfs.core.windows.net microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider

Almacenamiento de ADLS Gen2 sin servicios vinculados

Conéctese directamente al almacenamiento de ADLS Gen2 mediante una clave SAS. Use el valor ConfBasedSASProvider y proporcione la clave SAS para la configuración de spark.storage.synapse.sas. Los tokens de SAS pueden establecerse a nivel de contenedor, de cuenta o global. No se recomienda establecer claves de SAS a nivel global, ya que el trabajo no podrá leer o escribir desde más de una cuenta de almacenamiento.

Configuración de SAS por contenedor de almacenamiento

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

Configuración de SAS por cuenta de almacenamiento

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

Configuración de SAS de todas las cuentas de almacenamiento

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

Uso de MSAL para adquirir tokens (mediante credenciales de aplicación personalizadas)

Cuando el controlador de almacenamiento de ABFS esté configurado a fin de usar MSAL directamente para las autenticaciones, el proveedor no almacena en caché los tokens. Esto puede generar problemas de fiabilidad. Se recomienda usar ClientCredsTokenProvider como parte de Synapse Spark.

%%spark
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.$source_full_storage_account_name", "<Entra AppId>")
spark.conf.set("fs.azure.account.oauth2.client.secret.$source_full_storage_account_name", "<Entra app secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.$source_full_storage_account_name", "https://login.microsoftonline.com/<tenantid>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{source_full_storage_account_name}.linkedServiceName", "<Entra AppId>")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{source_full_storage_account_name}.linkedServiceName", "<Entra app secret>")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{source_full_storage_account_name}.linkedServiceName", "https://login.microsoftonline.com/<tenantid>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

Almacenamiento de ADLS Gen2 con token de SAS (de Azure Key Vault)

Conéctese al almacenamiento de ADLS Gen2 mediante un token de SAS almacenado en un secreto de Azure Key Vault.

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

TokenLibrary para otros servicios vinculados

Para conectarse a otros servicios vinculados, puede realizar una llamada directa a TokenLibrary.

getConnectionString()

Para recuperar la cadena de conexión, usa la función getConnectionString y pasa el nombre del servicio vinculado.

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

getPropertiesAll es una función auxiliar disponible en Scala y Python para obtener todas las propiedades de un servicio vinculado.

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

La salida tendrá el siguiente aspecto:

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.windows.net/',
    'Database': None
}

GetSecret()

Para recuperar un secreto almacenado de Azure Key Vault, se recomienda crear un servicio vinculado a Azure Key Vault en el área de trabajo de Synapse. La identidad de servicio administrada del área de trabajo de Synapse deberá tener concedidos permisos para obtener secretos en Azure Key Vault. El servicio vinculado usará la identidad de servicio administrada para conectarse a un servicio de Azure Key Vault y recuperar el secreto. De lo contrario, la conexión directa a Azure Key Vault usará la credencial de Microsoft Entra del usuario. En este caso, será necesario conceder al usuario los permisos para obtener secretos en Azure Key Vault.

En nubes de la Administración Pública, proporciona el nombre de dominio completo del almacén de claves.

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

Para recuperar un secreto de Azure Key Vault, usa la función mssparkutils.credentials.getSecret().


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Conexiones de servicio vinculadas admitidas desde el entorno de ejecución de Spark

Aunque Azure Synapse Analytics admite varias conexiones de servicio vinculadas (desde canalizaciones y otros productos de Azure), no todas se admiten desde el runtime de Spark. Esta es la lista de servicios vinculados admitidos:

  • Azure Blob Storage
  • Servicios de Azure AI
  • Azure Cosmos DB
  • Explorador de datos de Azure
  • Azure Database for MySQL
  • Azure Database for PostgreSQL
  • Azure Data Lake Store (Gen1)
  • Azure Key Vault
  • Azure Machine Learning
  • Azure Purview
  • Azure SQL Database
  • Azure SQL Data Warehouse (dedicado y sin servidor)
  • Azure Storage

mssparkutils.credentials.getToken()

Cuando necesite un token de portador de OAuth para acceder directamente a los servicios, puede usar el método getToken. Se admiten los siguientes recursos:

Nombre de servicio Literal de cadena que se va a usar en la llamada API
Azure Storage Storage
Azure Key Vault Vault
Azure Management AzureManagement
Azure SQL Data Warehouse (Dedicated and Serverless) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
Azure Database for MySQL AzureOSSDB
Azure Database for MariaDB AzureOSSDB
Azure Database for PostgreSQL AzureOSSDB

Acceso al servicio vinculado no admitido desde el entorno de ejecución de Spark

No se admiten los siguientes métodos de acceso a los servicios vinculados desde el entorno de ejecución de Spark:

  • Pasar argumentos al servicio vinculado con parámetros
  • Conexiones con identidades administradas asignadas por el usuario (UAMI)
  • Obtención del token de portador al recurso de Keyvault cuando el Cuaderno o SparkJobDefinition se ejecuta como identidad administrada
    • Como alternativa, en lugar de obtener un token de acceso, puedes crear un servicio vinculado a Keyvault y obtener el secreto del Cuaderno o trabajo por lotes.
  • En el caso de las conexiones de Azure Cosmos DB, solo se admite el acceso basado en claves. No se admite el acceso basado en tokens.

Al ejecutar un cuaderno o un trabajo de Spark, las solicitudes para obtener un token o secreto mediante un servicio vinculado pueden producir el error "BadRequest". Esto suele deberse a un problema de configuración con el servicio vinculado. Si ve este mensaje de error, compruebe la configuración del servicio vinculado. Si tienes alguna pregunta, ponte en contacto con el Soporte técnico de Microsoft Azure en Azure Portal.