Conexión de Azure Databricks y Azure Synapse con PolyBase (heredado)
Importante
Esta documentación se ha retirado y es posible que no se actualice. Los productos, servicios o tecnologías mencionados en este contenido ya no se admiten. Vea Consulta de datos en Azure Synapse Analytics.
Databricks recomienda usar la funcionalidad predeterminada COPY
con Azure Data Lake Storage Gen2 para las conexiones a Azure Synapse. En este artículo se incluye documentación heredada sobre PolyBase y Blob Storage.
Azure Synapse Analytics (anteriormente, SQL Data Warehouse) es un almacenamiento de datos empresarial basado en la nube que aprovecha el procesamiento paralelo masivo (MPP) para ejecutar rápidamente consultas complejas en petabytes de datos. Use Azure como componente clave de una solución de macrodatos. Importe macrodatos en Azure con consultas T-SQL de PolyBase simples o la instrucción COPY y, luego, use la potencia de MPP para realizar análisis de alto rendimiento. Al realizar la integración y el análisis, el almacenamiento de datos pasará a ser la versión única de certeza con la que puede contar su empresa para obtener información.
Para acceder a Azure Synapse desde Azure Databricks, puede utilizar el conector de Azure Synapse, una implementación de origen de datos para Apache Spark que utiliza Azure Blob Storage, y PolyBase o la instrucción COPY
en Azure Synapse para transferir eficazmente grandes volúmenes de datos entre un clúster de Azure Databricks y una instancia de Azure Synapse.
Tanto el clúster de Azure Databricks como la instancia de Azure Synapse acceden a un contenedor común de Blob Storage para intercambiar datos entre estos dos sistemas. En Azure Databricks, el conector de Azure Synapse desencadena trabajos de Apache Spark para leer y escribir datos en el contenedor de Blob Storage. En Azure Synapse, las operaciones de carga y descarga de datos que realiza PolyBase las desencadena el conector de Azure Synapse a través de JDBC. En Databricks Runtime 7.0 y versiones superiores, se usa COPY
de manera predeterminada para cargar datos en Azure Synapse mediante el conector de Azure Synapse a través de JDBC.
Nota:
COPY
solo está disponible en instancias de Azure Synapse Gen2, las que proporcionan un mejor rendimiento. Si la base de datos todavía usa instancias de Gen1, se recomienda que migre la base de datos a Gen2.
El conector de Azure Synapse es más adecuado para ETL que para las consultas interactivas, porque cada ejecución de consulta puede extraer grandes cantidades de datos en Blob Storage. Si tiene pensado hacer varias consultas en la misma tabla de Azure Synapse, le recomendamos que guarde los datos extraídos en un formato como Parquet.
Requisitos
Una clave maestra de base de datos de Azure Synapse.
Autenticación
El conector de Azure Synapse usa tres tipos de conexiones de red:
- Controlador de Spark a Azure Synapse
- Controlador de Spark y ejecutores a la cuenta de Azure Storage
- Azure Synapse a la cuenta de Azure Storage
┌─────────┐
┌─────────────────────────>│ STORAGE │<────────────────────────┐
│ Storage acc key / │ ACCOUNT │ Storage acc key / │
│ Managed Service ID / └─────────┘ OAuth 2.0 / │
│ │ │
│ │ │
│ │ Storage acc key / │
│ │ OAuth 2.0 / │
│ │ │
v v ┌──────v────┐
┌──────────┐ ┌──────────┐ │┌──────────┴┐
│ Synapse │ │ Spark │ ││ Spark │
│ Analytics│<────────────────────>│ Driver │<───────────────>│ Executors │
└──────────┘ JDBC with └──────────┘ Configured └───────────┘
username & password / in Spark
En las secciones siguientes, se describen las opciones de configuración de autenticación de cada conexión.
Controlador de Spark a Azure Synapse
El controlador de Spark se puede conectar a Azure Synapse mediante JDBC con un nombre de usuario y contraseña, o bien OAuth 2.0 con una entidad de servicio para la autenticación.
Nombre de usuario y contraseña
Se recomienda usar las cadenas de conexión que proporciona Azure Portal para ambos tipos de autenticación, que habilitan el cifrado Capa de sockets seguros (SSL) para todos los datos que se envían entre el controlador de Spark y la instancia de Azure Synapse a través de la conexión JDBC. Si quiere comprobar que el cifrado SSL está habilitado, busque encrypt=true
en la cadena de conexión.
Para permitir que el controlador de Spark se conecte a Azure Synapse, le recomendamos que establezca Permitir que los servicios y recursos de Azure accedan a este área de trabajo en ON en el panel Redes, en Seguridad del área de trabajo de Azure Synapse a través de Azure Portal. Esta configuración permite las comunicaciones desde todas las direcciones IP de Azure y todas las subredes de Azure, con lo que los controladores de Spark pueden conectarse a la instancia de Azure Synapse.
OAuth 2.0 con una entidad de servicio
Puede autenticarse en Azure Synapse Analytics con una entidad de servicio con acceso a la cuenta de almacenamiento subyacente. Para obtener más información sobre el uso de las credenciales de entidad de servicio para acceder a una cuenta de almacenamiento de Azure, consulte el documento sobre Conexión a Azure Data Lake Storage Gen2 y Blob Storage. Debe establecer la opción enableServicePrincipalAuth
en true
en los parámetros de configuración de la conexión para permitir que el conector se autentique con una entidad de servicio.
También puede utilizar otra entidad de servicio para la conexión de Azure Synapse Analytics. A continuación, puede ver un ejemplo en el que se configuran credenciales de entidad de servicio para la cuenta de almacenamiento y las credenciales de entidad de servicio opcionales para Synapse:
ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
Scala
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Python
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
R
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Controlador de Spark y ejecutores a la cuenta de Azure Storage
El contenedor de Azure Storage sirve como intermediario para almacenar datos masivos al leer o escribir en Azure Synapse. Spark se conectará a ADLS Gen2 o Blob Storage mediante el uso del controlador abfss
.
Entre las opciones de autenticación disponibles se encuentran las siguientes:
- Clave de acceso y secreto de la cuenta de almacenamiento
- Autenticación de OAuth 2.0. Para obtener más información sobre OAuth 2.0 y las entidades de servicio, consulte Acceso al almacenamiento mediante una entidad de servicio y Microsoft Entra ID (Azure Active Directory).
En los ejemplos siguientes, se muestran estas dos maneras con el enfoque de clave de acceso de la cuenta de almacenamiento. Lo mismo se aplica a la configuración de OAuth 2.0.
Configuración de sesión de cuaderno (opción preferida)
Con este enfoque, la clave de acceso de la cuenta se establece en la configuración de la sesión asociada con el cuaderno que ejecuta el comando. Esta configuración no afecta a otros cuadernos asociados al mismo clúster. spark
es el objeto SparkSession
que se proporciona en el cuaderno.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Configuración global de Hadoop
Este enfoque actualiza la configuración global de Hadoop que está asociada con el objeto SparkContext
que comparten todos los cuadernos.
Scala
sc.hadoopConfiguration.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Python
hadoopConfiguration
no se expone en todas las versiones de PySpark. Aunque el comando siguiente se basa en algunos elementos internos de Spark, debería funcionar con todas las versiones de PySpark y es poco probable que se interrumpa o se modifique en el futuro:
sc._jsc.hadoopConfiguration().set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Azure Synapse a la cuenta de Azure Storage
Azure Synapse también se conecta a una cuenta de almacenamiento durante la carga y descarga de datos temporales.
En caso de que haya configurado una clave de cuenta y un secreto para la cuenta de almacenamiento, puede establecerforwardSparkAzureStorageCredentials
en true
. De este modo, el conector de Azure Synapse detecta automáticamente la clave de acceso de la cuenta que se estableció en la configuración de la sesión del cuaderno o la configuración global de Hadoop y reenvía dicha clave de acceso de la cuenta de almacenamiento a la instancia conectada de Azure Synapse mediante la creación de una credencial con ámbito de base de datos temporal de Azure.
De manera alternativa, si usa ADLS Gen2 con la autenticación de OAuth 2.0 o si la instancia de Azure Synapse está configurada para tener una Identidad de servicio administrada (por lo general, junto con una Configuración de VNet + Puntos de conexión de servicio), debe establecer useAzureMSI
en true
. En este caso, el conector especificará IDENTITY = 'Managed Service Identity'
para la credencial con ámbito de base de datos y ningún SECRET
.
Compatibilidad con streaming
El conector de Azure Synapse ofrece una compatibilidad de escritura con Structured Streaming eficaz y escalable para Azure Synapse que ofrece una experiencia de usuario coherente con escrituras por lote, y utiliza PolyBase o COPY
para hacer transferencias de datos de gran tamaño entre un clúster de Azure Databricks y una instancia de Azure Synapse. De manera similar a lo que ocurre con las escrituras por lotes, el streaming está diseñado en gran medida para ETL, lo que proporciona una latencia mayor que puede no resultar adecuada para el procesamiento de datos en tiempo real en algunos casos.
Semántica de tolerancia a errores
De manera predeterminada, el streaming de Azure Synapse ofrece una garantía de un extremo a otro por única vez para escribir datos en una tabla de Azure Synapse mediante el seguimiento confiable del progreso de la consulta. Esto se realiza a través de una combinación de la ubicación del punto de control en DBFS, la tabla de punto de control en Azure Synapse y un mecanismo de bloqueo a fin de garantizar que el streaming puede controlar cualquier tipo de error, reintento y reinicio de consulta.
También puede seleccionar una semántica menos restrictiva que se utiliza al menos una vez para el streaming de Azure Synapse. Para ello, establezca la opción spark.databricks.sqldw.streaming.exactlyOnce.enabled
en false
; de este modo, la duplicación de datos puede ocurrir en caso de que una conexión con Azure Synapse presente errores intermitentes o si se produce la terminación inesperada de la consulta.
Utilización (lote)
Puede utilizar este conector a través de la API de origen de datos en cuadernos de Scala, Python, SQL y R.
Scala
// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
Python
# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
SQL
-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
R
# Load SparkR
library(SparkR)
# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
Utilización (streaming)
Puede escribir datos con Structured Streaming en cuadernos de Scala y Python.
Scala
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
Python
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
Configuración
En esta sección, se describe cómo configurar la semántica de escritura para el conector, los permisos requeridos y parámetros de configuración varios.
En esta sección:
- Modos de guardado admitidos para escrituras por lotes
- Modos de salida admitidos para escrituras en streaming
- Semántica de escritura
- Permisos de Azure Synapse requeridos para PolyBase
- Permisos de Azure Synapse requeridos para la instrucción
COPY
- Parámetros
- Inserción de consultas en Azure Synapse
- Administración de datos temporales
- Administración de objetos temporales
- Administración de tablas de punto de control de streaming
Modos de guardado admitidos para escrituras por lotes
El conector de Azure Synapse admite los modos de guardado ErrorIfExists
, Ignore
, Append
y Overwrite
, con ErrorIfExists
como modo predeterminado.
Para más información sobre los modos de guardado que se admiten en Apache Spark, consulte la documentación de Spark SQL sobre los modos de guardado.
Modos de salida admitidos para escrituras en streaming
El conector de Azure Synapse admite los modos de salida Append
y Complete
para agregaciones y anexos de registro.
Para más información sobre los modos de salida y la matriz de compatibilidad, consulte la guía de Structured Streaming.
Semántica de escritura
Nota:
COPY
está disponible en Databricks Runtime 7.0 y versiones posteriores.
Además de PolyBase, el conector de Azure Synapse admite la instrucción COPY
. La instrucción COPY
ofrece una manera más cómoda para cargar datos en Azure Synapse sin necesidad de crear una tabla externa, requiere menos permisos para cargar datos y mejora el rendimiento de la ingesta de datos en Azure Synapse.
De manera predeterminada, el conector detecta automáticamente la mejor semántica de escritura (COPY
cuando el destino es una instancia de Azure Synapse Gen2; de lo contrario, PolyBase). También puede especificar la semántica de escritura con la configuración siguiente:
Scala
// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
Python
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
SQL
-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;
R
# Load SparkR
library(SparkR)
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")
donde <write-semantics>
es polybase
para usar PolyBase, o copy
para usar la instrucción COPY
.
Permisos de Azure Synapse requeridos para PolyBase
Cuando se usa PolyBase, el conector de Azure Synapse requiere que el usuario de la conexión JDBC tenga permiso para ejecutar los comandos siguientes en la instancia de Azure Synapse conectada:
- CREATE DATABASE SCOPED CREDENTIAL
- CREATE EXTERNAL DATA SOURCE
- CREATE EXTERNAL FILE FORMAT
- CREATE EXTERNAL TABLE
Como requisito previo para el primer comando, el conector espera que ya exista una clave maestra de base de datos para la instancia de Azure Synapse especificada. Si no es así, puede crear una clave con el comando CREATE MASTER KEY.
Además, para leer la tabla de Azure Synapse establecida a través de dbTable
o a las tablas a las que se hace referencia en query
, el usuario de JDBC debe tener permiso para acceder a las tablas de Azure Synapse necesarias. Para volver a escribir datos en una tabla de Azure Synapse establecida a través de dbTable
, el usuario de JDBC debe tener permiso para escribir en esta tabla de Azure Synapse.
En la tabla siguiente, se resumen los permisos necesarios para todas las operaciones con PolyBase:
Operación | Permisos | Permisos al utilizar un origen de datos externo |
---|---|---|
Escritura por lotes | CONTROL | Consulte Escritura por lotes |
Escritura en streaming | CONTROL | Consulte Escritura en streaming |
Lectura | CONTROL | Consulte Lectura |
Permisos de Azure Synapse requeridos para PolyBase con la opción de origen de datos externo
Puede usar PolyBase con un origen de datos externo aprovisionado previamente. Para más información, consulte el parámetro externalDataSource
en Parámetros.
Para PolyBase con un origen de datos externo aprovisionado previamente, el conector de Azure Synapse requiere que el usuario de la conexión JDBC tenga permiso para ejecutar los comandos siguientes en la instancia de Azure Synapse conectada:
Para crear un origen de datos externo, primero debe crear una credencial con ámbito de base de datos. En los vínculos siguientes, se describe cómo crear una credencial con ámbito para entidades de servicio y un origen de datos externo para una ubicación de ABFS:
Nota:
La ubicación del origen de datos externo debe apuntar a un contenedor. El conector no funcionará si la ubicación es un directorio de un contenedor.
En la tabla siguiente, se resumen los permisos para las operaciones de escritura de PolyBase con la opción de origen de datos externo:
Operación | Permisos (insertar en una tabla existente) | Permisos (insertar en una tabla nueva) |
---|---|---|
Escritura por lotes | ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
Escritura en streaming | ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
En la tabla siguiente, se resumen los permisos para las operaciones de lectura de PolyBase con la opción de origen de datos externo:
Operación | Permisos |
---|---|
Lectura | CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
Puede utilizar este conector para leer a través de la API de origen de datos en cuadernos de Scala, Python, SQL y R.
Scala
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("externalDataSource", "<your-pre-provisioned-data-source>")
.option("dbTable", "<your-table-name>")
.load()
Python
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("externalDataSource", "<your-pre-provisioned-data-source>") \
.option("dbTable", "<your-table-name>") \
.load()
SQL
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
externalDataSource '<your-pre-provisioned-data-source>'
);
R
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
externalDataSource = "<your-pre-provisioned-data-source>")
Permisos de Azure Synapse requeridos para la instrucción COPY
Nota:
Disponible en Databricks Runtime 7.0 y versiones posteriores.
Cuando se usa la instrucción COPY
, el conector de Azure Synapse requiere que el usuario de la conexión JDBC tenga permiso para ejecutar los comandos siguientes en la instancia de Azure Synapse conectada:
Si la tabla de destino no existe en Azure Synapse, se requiere permiso para ejecutar el comando siguiente además del comando anterior:
En la tabla siguiente, se resumen los permisos para las escrituras por lotes y en streaming con COPY
:
Operación | Permisos (insertar en una tabla existente) | Permisos (insertar en una tabla nueva) |
---|---|---|
Escritura por lotes | ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Escritura en streaming | ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Parámetros
El mapa de parámetros o OPTIONS
que se proporcionan en Spark SQL admiten la configuración siguiente:
Parámetro | Obligatorio | Valor predeterminado | Notas |
---|---|---|---|
dbTable |
Sí, a menos que se especifique query |
Sin valor predeterminado | Tabla a partir de la que se creará o leerá en Azure Synapse. Se requiere este parámetro al guardar los datos de vuelta en Azure Synapse. También puede usar {SCHEMA NAME}.{TABLE NAME} para acceder a una tabla en un esquema determinado. Si no se proporciona el nombre del esquema, se usa el esquema predeterminado asociado al usuario de JDBC.La variante dbtable admitida previamente está en desuso y se omitirá en las versiones futuras. En su lugar, use el nombre en una mezcla de mayúsculas y minúsculas. |
query |
Sí, a menos que se especifique dbTable |
Sin valor predeterminado | Consulta a partir de la que se leerá en Azure Synapse. En el caso de las tablas a las que se hace referencia en la consulta, también puede usar {SCHEMA NAME}.{TABLE NAME} para acceder a una tabla en un esquema determinado. Si no se proporciona el nombre del esquema, se usa el esquema predeterminado asociado al usuario de JDBC. |
user |
No | Sin valor predeterminado | Nombre de usuario de Azure Synapse. Se debe usar en conjunto con la opción password . Solo se puede usar si el usuario y la contraseña no se pasan en la dirección URL. Si se pasan ambos, se producirá un error. |
password |
No | Sin valor predeterminado | Contraseña de Azure Synapse. Se debe usar en conjunto con la opción user . Solo se puede usar si el usuario y la contraseña no se pasan en la dirección URL. Si se pasan ambos, se producirá un error. |
url |
Sí | Sin valor predeterminado | Dirección URL de JDBC con sqlserver establecido como subprotocolo. Se recomienda usar la cadena de conexión proporcionada por Azure Portal. Se recomienda establecer encrypt=true porque habilita el cifrado SSL de la conexión JDBC. Si user y password se establecen por separado, no es necesario que los incluya en la dirección URL. |
jdbcDriver |
No | Determinado por el subprotocolo de la dirección URL de JDBC | Nombre de clase del controlador JDBC que se usará. Esta clase debe estar en la ruta de clases. En la mayoría de los casos, no debería ser necesario especificar esta opción, ya que el subprotocolo de la dirección URL de JDBC debe determinar automáticamente el nombre de clase del controlador adecuado. La variante jdbc_driver admitida previamente está en desuso y se omitirá en las versiones futuras. En su lugar, use el nombre en una mezcla de mayúsculas y minúsculas. |
tempDir |
Sí | Sin valor predeterminado | Identificador URI abfss . Se recomienda usar un contenedor de Blob Storage dedicado para la instancia de Azure Synapse.La variante tempdir admitida previamente está en desuso y se omitirá en las versiones futuras. En su lugar, use el nombre en una mezcla de mayúsculas y minúsculas. |
tempFormat |
No | PARQUET |
Formato en el que se guardarán los archivos temporales en el almacén de blobs al escribir en Azure Synapse. El valor predeterminado es PARQUET ; no se permiten otros valores en este momento. |
tempCompression |
No | SNAPPY |
Algoritmo de compresión que se va a usar para codificar o descodificar temporalmente tanto mediante Spark como Azure Synapse. Actualmente, los valores admitidos son UNCOMPRESSED , SNAPPY y GZIP . |
forwardSparkAzureStorageCredentials |
No | false | Si es true , la biblioteca detecta automáticamente las credenciales que Spark utiliza para conectarse al contenedor de Blob Storage y reenvía esas credenciales a Azure Synapse a través de JDBC. Estas credenciales se envían como parte de la consulta de JDBC. Por lo tanto, se recomienda encarecidamente habilitar el cifrado SSL de la conexión JDBC cuando use esta opción.La versión actual del conector de Azure Synapse requiere (exactamente) que uno de los valores forwardSparkAzureStorageCredentials , enableServicePrincipalAuth o useAzureMSI esté establecido de manera explícita en true .La variante forward_spark_azure_storage_credentials admitida previamente está en desuso y se omitirá en las versiones futuras. En su lugar, use el nombre en una mezcla de mayúsculas y minúsculas. |
useAzureMSI |
No | false | Si es true , la biblioteca especificará IDENTITY = 'Managed Service Identity' y ningún SECRET para las credenciales con ámbito de base de datos que crea.La versión actual del conector de Azure Synapse requiere (exactamente) que uno de los valores forwardSparkAzureStorageCredentials , enableServicePrincipalAuth o useAzureMSI esté establecido de manera explícita en true . |
enableServicePrincipalAuth |
No | false | Si es true , la biblioteca usará las credenciales de entidad de servicio proporcionadas para conectarse a la cuenta de Azure Storage y a Azure Synapse Analytics a través de JDBC.La versión actual del conector de Azure Synapse requiere (exactamente) que uno de los valores forwardSparkAzureStorageCredentials , enableServicePrincipalAuth o useAzureMSI esté establecido de manera explícita en true . |
tableOptions |
No | CLUSTERED COLUMNSTORE INDEX , DISTRIBUTION = ROUND_ROBIN |
Cadena que se usa para especificar las opciones de tabla al crear la tabla de Azure Synapse establecida a través de dbTable . Esta cadena se pasa literalmente a la cláusula WITH de la instrucción SQL CREATE TABLE que se emite en Azure Synapse.La variante table_options admitida previamente está en desuso y se omitirá en las versiones futuras. En su lugar, use el nombre en una mezcla de mayúsculas y minúsculas. |
preActions |
No | Sin valor predeterminado (cadena vacía) | Lista separada por ; de los comandos SQL que se ejecutarán en Azure Synapse antes de escribir datos en la instancia de Azure Synapse. Estos comandos SQL deben ser comandos válidos aceptados por Azure Synapse.Si se produce un error con cualquiera de estos comandos, se tratará como un error y no se ejecutará la operación de escritura. |
postActions |
No | Sin valor predeterminado (cadena vacía) | Lista separada por ; de los comandos SQL que se ejecutarán en Azure Synapse después de que el conector escribe correctamente datos en la instancia de Azure Synapse. Estos comandos SQL deben ser comandos válidos aceptados por Azure Synapse.Si se produce un error con cualquiera de estos comandos, se tratará como un error y recibirá una excepción después de que los datos se escriban correctamente en la instancia de Azure Synapse. |
maxStrLength |
No | 256 | StringType en Spark se asigna al tipo NVARCHAR(maxStrLength) en Azure Synapse. Puede usar maxStrLength para establecer la longitud de cadena para todas las columnas de tipo NVARCHAR(maxStrLength) que se encuentran en la tabla con el nombre dbTable en Azure Synapse.La variante maxstrlength admitida previamente está en desuso y se omitirá en las versiones futuras. En su lugar, use el nombre en una mezcla de mayúsculas y minúsculas. |
checkpointLocation |
Sí | Sin valor predeterminado | Ubicación en DBFS que Structured Streaming usará para escribir metadatos e información del punto de control. Consulte la sección sobre la recuperación ante errores con punto de control en la guía de programación de Structured Streaming. |
numStreamingTempDirsToKeep |
No | 0 | Indica cuántos directorios temporales (los más recientes) se deben conservar para la limpieza periódica de microlotes en streaming. Cuando se establece en 0 , la eliminación de directorios se desencadena inmediatamente después de confirmar el microlote; de lo contrario, se mantiene el número de microlotes más recientes y se quita el resto de los directorios. Utilice -1 para deshabilitar la limpieza periódica. |
applicationName |
No | Databricks-User-Query |
Etiqueta de la conexión para cada consulta. Si el valor no se especifica o es una cadena vacía, se agrega el valor predeterminado de la etiqueta a la dirección URL de JDBC. El valor predeterminado impide que la herramienta de supervisión de bases de datos de Azure genere alertas falsas de inyección de código SQL con relación a consultas. |
maxbinlength |
No | Sin valor predeterminado | Controla la longitud de las columnas BinaryType . Este parámetro se traduce como VARBINARY(maxbinlength) . |
identityInsert |
No | false | Establecer este valor en true habilita el modo IDENTITY_INSERT , que inserta un valor proporcionado por DataFrame en la columna de identidad de la tabla de Azure Synapse.Consulte Inserción explícita de valores en una columna IDENTITY. |
externalDataSource |
No | Sin valor predeterminado | Origen de datos externo aprovisionado previamente para leer datos de Azure Synapse. Un origen de datos externo solo se puede usar con PolyBase y quita el requisito del permiso CONTROL, ya que el conector no necesita crear una credencial con ámbito y un origen de datos externo para cargar datos. Para ver un ejemplo de uso y la lista de permisos necesarios al usar un origen de datos externo, consulte Permisos de Azure Synapse requeridos para PolyBase con la opción de origen de datos externo. |
maxErrors |
No | 0 | Número máximo de filas que se pueden rechazar durante las lecturas y escrituras antes de que se cancele la operación de carga (PolyBase o COPY). Se omitirán las filas rechazadas. Por ejemplo, si dos de cada diez registros tienen errores, solo se procesarán ocho registros. Consulte REJECT_VALUE documentación de CREATE EXTERNAL TABLE y MAXERRORS en COPY. |
Nota:
tableOptions
,preActions
,postActions
ymaxStrLength
solo son pertinentes al escribir datos de Azure Databricks en una tabla nueva de Azure Synapse.externalDataSource
solo es pertinente al leer datos de Azure Synapse y escribir datos de Azure Databricks en una tabla nueva de Azure Synapse con semántica de PolyBase. No debe especificar otros tipos de autenticación de almacenamiento mientras utilizaexternalDataSource
comoforwardSparkAzureStorageCredentials
ouseAzureMSI
.checkpointLocation
ynumStreamingTempDirsToKeep
solo son pertinentes para escrituras en streaming de Azure Databricks a una tabla nueva de Azure Synapse.- Si bien los nombres de las opciones del origen de datos no distinguen mayúsculas de minúsculas, se recomienda que los especifique en una mezcla de mayúsculas y minúsculas.
Inserción de consultas en Azure Synapse
El conector de Azure Synapse implementa un conjunto de reglas de optimización para insertar los operaciones siguientes en Azure Synapse:
Filter
Project
Limit
Los operadores Project
y Filter
admiten las expresiones siguientes:
- La mayoría de operadores lógicos booleanos
- Comparaciones
- Operaciones aritméticas básicas
- Conversiones numéricas y de cadena
En el caso del operador Limit
, la inserción solo se admite cuando no hay ninguna ordenación especificada. Por ejemplo:
SELECT TOP(10) * FROM table
, pero no SELECT TOP(10) * FROM table ORDER BY col
.
Nota:
El conector de Azure Synapse no insertar expresiones que operan en cadenas, fechas o marcas de tiempo.
La inserción de consultas integrada con el conector de Azure Synapse está habilidad de manera predeterminada.
Si desea deshabilitarla, establezca spark.databricks.sqldw.pushdown
en false
.
Administración de datos temporales
El conector de Azure Synapse no elimina los archivos temporales que crea en el contenedor de Blob Storage.
Por lo tanto, se recomienda eliminar periódicamente estos archivos temporales que se encuentran en la ubicación tempDir
suministrada por el usuario.
Para facilitar la limpieza de los datos, el conector de Azure Synapse no almacena los archivos de datos directamente en tempDir
, sino que crea un subdirectorio con el formato <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
.
Puede configurar trabajos periódicos (con la característica trabajos de Azure Databricks o de otro modo) a fin de eliminar recursivamente los subdirectorios anteriores a un umbral determinado (por ejemplo, 2 días), con el supuesto de que no puede haber trabajos de Spark en ejecución por más tiempo que lo indicado en dicho umbral.
Una alternativa más sencilla es quitar periódicamente todo el contenedor y crear uno con el mismo nombre. Esto requiere que utilice un contenedor dedicado para los datos temporales que genera el conector de Azure Synapse, además de que pueda encontrar una ventana de tiempo en la que pueda garantizar que no hay ninguna consulta en ejecución que implique al conector.
Administración de objetos temporales
El conector de Azure Synapse automatiza la transferencia de datos entre un clúster de Azure Databricks y una instancia de Azure Synapse.
Para leer datos de una consulta o una tabla de Azure Synapse o para escribir datos en una tabla de Azure Synapse, el conector de Azure Synapse crea objetos temporales, incluidos DATABASE SCOPED CREDENTIAL
, EXTERNAL DATA SOURCE
, EXTERNAL FILE FORMAT
y EXTERNAL TABLE
en segundo plano. Estos objetos solo existen durante el trabajo de Spark correspondiente y se quitarán automáticamente después de eso.
Cuando un clúster ejecuta una consulta con el conector de Azure Synapse, si el proceso del controlador de Spark se bloquea o se reinicia por la fuerza o si el clúster se termina o reinicia de manera forzosa, es posible que no se quiten los objetos temporales.
A fin de facilitar la identificación y eliminación manual de estos objetos, el conector de Azure Synapse antepone un prefijo a los nombres de todos los objetos temporales intermedios creados en la instancia de Azure Synapse con una etiqueta con el formato tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
.
Se recomienda buscar periódicamente objetos filtrados mediante consultas como las siguientes:
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'
Administración de tablas de punto de control de streaming
El conector de Azure Synapse no elimina la tabla de punto de control de streaming que se crea al iniciar una consulta de streaming nueva.
Este comportamiento es coherente con checkpointLocation
en DBFS. Por lo tanto, se recomienda eliminar periódicamente las tablas de punto de control al mismo tiempo que quita las ubicaciones de punto de control en DBFS para las consultas que no se van a ejecutar en el futuro o cuya ubicación de punto de control ya se quitó.
De manera predeterminada, todas las tablas de punto de control tienen el nombre <prefix>_<query-id>
, donde <prefix>
es un prefijo que se puede configurar con el valor predeterminado databricks_streaming_checkpoint
y query_id
es un id. de consulta de streaming al que se le quitaron los caracteres _
. Para buscar consultas de streaming obsoletas o eliminadas en todas las tablas de punto de control, ejecute esta consulta:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
Puede configurar el prefijo con la opción de configuración de Spark SQL spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
.
Preguntas más frecuentes
Recibí un error mientras usaba el conector de Azure Synapse. ¿Cómo puedo saber si se trata de un error de Azure Synapse o de Azure Databricks?
Para ayudarlo a depurar los errores, cualquier excepción generada por código específico del conector de Azure Synapse se encapsula en una excepción que extiende el rasgo SqlDWException
. Las excepciones también hacen la distinción siguiente:
SqlDWConnectorException
representa un error generado por el conector de Azure SynapseSqlDWSideException
representa un error generado por la instancia de Azure Synapse conectada
¿Qué tengo que hacer si se produce un error en la consulta que indica que no se encontró ninguna clave de acceso en la configuración de la sesión o en la configuración global de Hadoop?
Este error significa que el conector de Azure Synapse no pudo encontrar la clave de acceso de la cuenta de almacenamiento en la configuración de la sesión del cuaderno o la configuración global de Hadoop para la cuenta de almacenamiento especificada en tempDir
.
Consulte Utilización (lote) para ver ejemplos de cómo configurar correctamente el acceso a una cuenta de almacenamiento. Si se crea una tabla de Spark con el conector de Azure Synapse, de todos modos debe proporciona las credenciales de acceso a la cuenta de almacenamiento a fin de leer o escribir en la tabla de Spark.
¿Puedo usar una firma de acceso compartido (SAS) para acceder al contenedor de Blob Storage especificado por tempDir
?
Azure Synapse no admite el uso de SAS para acceder a Blob Storage. Por lo tanto, el conector de Azure Synapse no admite SAS para acceder al contenedor de Blob Storage especificado por tempDir
.
Creé una tabla de Spark mediante el conector de Azure Synapse con la opción dbTable
, escribí algunos datos en esta tabla de Spark y, luego, quité esta tabla de Spark. ¿Se quitará la tabla creada en Azure Synapse?
No. Azure Synapse se considera un origen de datos externo. La tabla de Azure Synapse con el nombre establecido a través de dbTable
no se quita cuando se quita la tabla de Spark.
Al escribir un DataFrame en Azure Synapse, ¿por qué tengo que usar .option("dbTable", tableName).save()
en lugar de usar solo .saveAsTable(tableName)
?
Esto se debe a que queremos dejar clara esta distinción: .option("dbTable", tableName)
hace referencia a la tabla base de datos (es decir, Azure Synapse), mientras que .saveAsTable(tableName)
hace referencia a la tabla de Spark. De hecho, incluso podría combinar ambas: df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark)
, que crea una tabla en Azure Synapse denominada tableNameDW
y una tabla externa en Spark denominada tableNameSpark
, respaldada por la tabla de Azure Synapse.
Advertencia
Tenga en cuenta esta diferencia entre .save()
y .saveAsTable()
:
- Para
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save()
,writeMode
actúa en la tabla de Azure Synapse, según lo previsto. - Para
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark)
,writeMode
actúa en la tabla spark, mientras quetableNameDW
se sobrescribe silenciosamente si ya existe en Azure Synapse.
Este comportamiento no es diferente de escribir en cualquier otro origen de datos. Es simplemente una advertencia de la API DataFrameWriter de Spark.