Conector do Conjunto de SQL Dedicado do Azure Synapse para Apache Spark
Introdução
O Azure Synapse Dedicated SQL Pool Connector for Apache Spark no Azure Synapse Analytics permite a transferência eficiente de grandes conjuntos de dados entre o tempo de execução do Apache Spark e o pool SQL dedicado. O conector é enviado como uma biblioteca predefinida com a Área de Trabalho do Azure Synapse. O conector é implementado usando Scala
linguagem. O conector suporta Scala e Python. Para usar o Conector com outras opções de idioma do bloco de anotações, use o comando mágico do Spark - %%spark
.
Em um alto nível, o conector fornece os seguintes recursos:
- Leia a partir do Azure Synapse Dedicated SQL Pool:
- Leia grandes conjuntos de dados de Synapse Dedicated SQL Pool Tables (Internal and External) e exibições.
- Suporte abrangente a push down de predicados, onde os filtros no DataFrame são mapeados para o push down do predicado SQL correspondente.
- Suporte para poda de colunas.
- Suporte para push down de consulta.
- Escreva no Azure Synapse Dedicated SQL Pool:
- Ingerir dados de grande volume para tipos de tabelas internas e externas.
- Suporta as seguintes preferências do modo de salvamento DataFrame:
Append
ErrorIfExists
Ignore
Overwrite
- O tipo Write to External Table suporta o formato de arquivo Parquet e Delimited Text (exemplo - CSV).
- Para gravar dados em tabelas internas, o conector agora usa a instrução COPY em vez da abordagem CETAS/CTAS.
- Aprimoramentos para otimizar o desempenho completo da taxa de transferência de gravação.
- Introduz um identificador de retorno de chamada opcional (um argumento de função Scala) que os clientes podem usar para receber métricas pós-gravação.
- Alguns exemplos incluem - número de registros, duração para concluir determinada ação e motivo da falha.
Abordagem de orquestração
Leitura
Escrita
Pré-requisitos
Os pré-requisitos, como a configuração dos recursos necessários do Azure e as etapas para configurá-los, são discutidos nesta seção.
Recursos do Azure
Analise e configure os seguintes Recursos do Azure dependentes:
- Azure Data Lake Storage - usado como a conta de armazenamento principal para o Azure Synapse Workspace.
- Azure Synapse Workspace - crie blocos de anotações, crie e implante fluxos de trabalho de entrada e saída baseados em DataFrame.
- Pool SQL dedicado (anteriormente SQL DW) - fornece recursos de armazenamento de dados corporativos.
- Azure Synapse Serverless Spark Pool - Tempo de execução do Spark onde os trabalhos são executados como Aplicativos Spark.
Preparar a base de dados
Conecte-se ao banco de dados Synapse Dedicated SQL Pool e execute as seguintes instruções de instalação:
Crie um usuário de banco de dados mapeado para a Identidade de usuário do Microsoft Entra usada para entrar no Espaço de Trabalho Sinapse do Azure.
CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;
Crie um esquema no qual as tabelas serão definidas, de modo que o Connector possa gravar e ler com êxito as respetivas tabelas.
CREATE SCHEMA [<schema_name>];
Autenticação
Autenticação baseada no Microsoft Entra ID
A autenticação baseada no Microsoft Entra ID é uma abordagem de autenticação integrada. O usuário deve entrar com êxito no Espaço de Trabalho do Azure Synapse Analytics.
Autenticação básica
Uma abordagem de autenticação básica requer que o usuário configure username
e password
opções. Consulte a seção - Opções de configuração para saber mais sobre parâmetros de configuração relevantes para leitura e gravação em tabelas no Azure Synapse Dedicated SQL Pool.
Autorização
Azure Data Lake Storage Gen2 (Armazenamento do Azure Data Lake Gen2)
Há duas maneiras de conceder permissões de acesso ao Azure Data Lake Storage Gen2 - Conta de Armazenamento:
- Função de Controle de Acesso com base em função - função de Colaborador de Dados de Blob de Armazenamento
- A atribuição do concede ao
Storage Blob Data Contributor Role
Usuário permissões para ler, gravar e excluir dos Contêineres de Blob de Armazenamento do Azure. - O RBAC oferece uma abordagem de controle grosseiro no nível do contêiner.
- A atribuição do concede ao
- Listas de controle de acesso (ACL)
- A abordagem ACL permite controles refinados sobre caminhos e/ou arquivos específicos em uma determinada pasta.
- As verificações de ACL não serão aplicadas se o Usuário já tiver recebido permissões usando a abordagem RBAC.
- Existem dois tipos amplos de permissões de ACL:
- Permissões de acesso (aplicadas em um nível ou objeto específico).
- Permissões padrão (aplicadas automaticamente para todos os objetos filho no momento de sua criação).
- Os tipos de permissões incluem:
Execute
Permite a capacidade de percorrer ou navegar nas hierarquias de pastas.Read
permite a capacidade de leitura.Write
permite a capacidade de escrever.
- É importante configurar ACLs para que o conector possa gravar e ler com êxito nos locais de armazenamento.
Nota
Se quiser executar blocos de anotações usando pipelines do Synapse Workspace, você também deve conceder permissões de acesso listadas acima para a identidade gerenciada padrão do Synapse Workspace. O nome de identidade gerenciado padrão do espaço de trabalho é igual ao nome do espaço de trabalho.
Para usar o espaço de trabalho Synapse com contas de armazenamento seguras, um ponto de extremidade privado gerenciado deve ser configurado a partir do bloco de anotações. O ponto de extremidade privado gerenciado deve ser aprovado na seção da conta
Private endpoint connections
de armazenamento ADLS Gen2 noNetworking
painel.
Azure Synapse Dedicated SQL Pool
Para habilitar a interação bem-sucedida com o SQL Pool Dedicado do Azure Synapse, a seguinte autorização é necessária, a menos que você seja um usuário também configurado como um Active Directory Admin
no Ponto de Extremidade SQL Dedicado:
Ler cenário
Conceda ao usuário
db_exporter
usando o procedimentosp_addrolemember
armazenado do sistema .EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
Cenário de escrita
- O Connector usa o comando COPY para gravar dados do preparo no local gerenciado da tabela interna.
Configure as permissões necessárias descritas aqui.
A seguir está um trecho de acesso rápido do mesmo:
--Make sure your user has the permissions to CREATE tables in the [dbo] schema GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com]; GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has INSERT permissions on the target table GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
- O Connector usa o comando COPY para gravar dados do preparo no local gerenciado da tabela interna.
Documentação da API
Azure Synapse Dedicated SQL Pool Connector for Apache Spark - Documentação da API.
Opções de configuração
Para inicializar e orquestrar com êxito a operação de leitura ou gravação, o Conector espera determinados parâmetros de configuração. A definição de objeto - com.microsoft.spark.sqlanalytics.utils.Constants
fornece uma lista de constantes padronizadas para cada chave de parâmetro.
A seguir está a lista de opções de configuração com base no cenário de uso:
- Ler usando a autenticação baseada no Microsoft Entra ID
- As credenciais são mapeadas automaticamente e o usuário não precisa fornecer opções de configuração específicas.
- O argumento de nome de tabela de três partes no método é necessário para ler a respetiva tabela no
synapsesql
Pool SQL Dedicado do Azure Synapse.
- Ler usando autenticação básica
- Ponto de extremidade SQL dedicado do Azure Synapse
Constants.SERVER
- Synapse Dedicated SQL Pool End Point (FQDN do servidor)Constants.USER
- Nome de usuário SQL.Constants.PASSWORD
- Senha de usuário SQL.
- Ponto Final do Armazenamento do Azure Data Lake (Gen 2) - Pastas de Preparo
Constants.DATA_SOURCE
- O caminho de armazenamento definido no parâmetro de localização da fonte de dados é usado para o preparo de dados.
- Ponto de extremidade SQL dedicado do Azure Synapse
- Escrever usando a autenticação baseada no Microsoft Entra ID
- Ponto de extremidade SQL dedicado do Azure Synapse
- Por padrão, o Connector infere o ponto de extremidade Synapse Dedicated SQL usando o nome do banco de dados definido no
synapsesql
parâmetro de nome de tabela de três partes do método. - Como alternativa, os usuários podem usar a
Constants.SERVER
opção para especificar o ponto de extremidade sql. Verifique se o ponto final hospeda o banco de dados correspondente com o respetivo esquema.
- Por padrão, o Connector infere o ponto de extremidade Synapse Dedicated SQL usando o nome do banco de dados definido no
- Ponto Final do Armazenamento do Azure Data Lake (Gen 2) - Pastas de Preparo
- Para o tipo de tabela interna:
- Configure uma ou
Constants.TEMP_FOLDER
Constants.DATA_SOURCE
opção. - Se o usuário optar por fornecer
Constants.DATA_SOURCE
a opção, a pasta de preparo será derivada usando olocation
valor do DataSource. - Se ambos forem fornecidos, o valor da
Constants.TEMP_FOLDER
opção será usado. - Na ausência de uma opção de pasta de preparo, o Conector derivará uma com base na configuração de tempo de execução -
spark.sqlanalyticsconnector.stagingdir.prefix
.
- Configure uma ou
- Para o tipo de tabela externa:
Constants.DATA_SOURCE
é uma opção de configuração necessária.- O conector usa o caminho de armazenamento definido no parâmetro location da fonte de dados em combinação com o argumento para o método e deriva o
location
synapsesql
caminho absoluto para persistir os dados da tabela externa. - Se o argumento para
synapsesql
o método não for especificado, o conector derivará olocation
valor do local como<base_path>/dbName/schemaName/tableName
.
- Para o tipo de tabela interna:
- Ponto de extremidade SQL dedicado do Azure Synapse
- Escrever usando autenticação básica
- Ponto de extremidade SQL dedicado do Azure Synapse
Constants.SERVER
- - Synapse Dedicated SQL Pool End Point (FQDN do servidor).Constants.USER
- Nome de usuário SQL.Constants.PASSWORD
- Senha de usuário SQL.Constants.STAGING_STORAGE_ACCOUNT_KEY
associada à Conta de Armazenamento que hospedaConstants.TEMP_FOLDERS
(somente tipos de tabela interna) ouConstants.DATA_SOURCE
.
- Ponto Final do Armazenamento do Azure Data Lake (Gen 2) - Pastas de Preparo
- As credenciais de autenticação básica do SQL não se aplicam aos pontos finais de armazenamento de acesso.
- Portanto, certifique-se de atribuir permissões de acesso ao armazenamento relevantes, conforme descrito na seção Azure Data Lake Storage Gen2.
- Ponto de extremidade SQL dedicado do Azure Synapse
Modelos de código
Esta seção apresenta modelos de código de referência para descrever como usar e invocar o Azure Synapse Dedicated SQL Pool Connector for Apache Spark.
Nota
Usando o conector em Python-
- O conector é suportado em Python apenas para Spark 3. Para o Spark 2.4 (sem suporte), podemos usar a API do conector Scala para interagir com o conteúdo de um DataFrame no PySpark usando DataFrame.createOrReplaceTempView ou DataFrame.createOrReplaceGlobalTempView. Consulte a Secção - Utilização de dados materializados entre células.
- O identificador de retorno de chamada não está disponível em Python.
Leia a partir do Azure Synapse Dedicated SQL Pool
Solicitação de leitura - synapsesql
assinatura do método
Ler a partir de uma tabela usando a autenticação baseada no Microsoft Entra ID
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
Ler a partir de uma consulta usando a autenticação baseada no Microsoft Entra ID
Nota
Restrições durante a leitura da consulta:
- O nome da tabela e a consulta não podem ser especificados ao mesmo tempo.
- Só são permitidas consultas selecionadas. DDL e DML SQLs não são permitidos.
- As opções de seleção e filtro no dataframe não são enviadas para o pool dedicado SQL quando uma consulta é especificada.
- A leitura de uma consulta só está disponível no Spark 3.1 e 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>")
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
Ler a partir de uma tabela usando autenticação básica
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the table will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
Ler a partir de uma consulta usando autenticação básica
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
Escrever no Azure Synapse Dedicated SQL Pool
Write Request - synapsesql
assinatura do método
A assinatura do método para a versão do conector criada para o Spark 2.4.8 tem um argumento a menos do que o aplicado à versão do Spark 3.1.2. A seguir estão as duas assinaturas de método:
- Spark Pool Versão 2.4.8
synapsesql(tableName:String,
tableType:String = Constants.INTERNAL,
location:Option[String] = None):Unit
- Spark Pool Versão 3.1.2
synapsesql(tableName:String,
tableType:String = Constants.INTERNAL,
location:Option[String] = None,
callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit
Escrever usando a autenticação baseada no Microsoft Entra ID
A seguir está um modelo de código abrangente que descreve como usar o conector para cenários de gravação:
//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"
//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")
//Initialize DataFrame that reads CSV data from a given source
val readDF:DataFrame=spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(1000) //Reads first 1000 rows from the source CSV input.
//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
// 1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
// 2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab.
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
(feedback: Map[String, Any], errorState: Option[Throwable]) => {
println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
errorDuringWrite = errorState
}
//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
write.
//Configure required configurations.
options(writeOptionsWithAADAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
location = None,
//Optional parameter to receive a callback.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get
Escrever usando autenticação básica
O trecho de código a seguir substitui a definição de gravação descrita na seção Gravar usando autenticação baseada em ID do Microsoft Entra, para enviar solicitação de gravação usando a abordagem de autenticação básica do SQL:
//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
//Set database user name
Constants.USER -> "<user_name>",
//Set database user's password
Constants.PASSWORD -> "<user_password>",
//Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
//To be used only when writing to internal tables. Storage path will be used for data staging.
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Configure and submit the request to write to Synapse Dedicated SQL Pool.
readDF.
write.
options(writeOptionsWithBasicAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Not required for writing to an internal table
location = None,
//Optional parameter.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
Em uma abordagem de autenticação básica, para ler dados de um caminho de armazenamento de origem, outras opções de configuração são necessárias. O trecho de código a seguir fornece um exemplo para ler de uma fonte de dados do Azure Data Lake Storage Gen2 usando credenciais da Entidade de Serviço:
//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
"delimiter"->",",
"fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" ->
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id" -> s"$spnClientId",
"fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
"fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
"fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
"fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(100)
Modos de salvamento DataFrame suportados
Os seguintes modos de salvamento são suportados ao gravar dados de origem em uma tabela de destino no Pool SQL Dedicado do Azure Synapse:
- ErrorIfExists (modo de gravação padrão)
- Se a tabela de destino existir, a gravação será anulada com uma exceção retornada ao destinatário. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
- Ignorar
- Se a tabela de destino existir, a gravação ignorará a solicitação de gravação sem retornar um erro. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
- Substituir
- Se a tabela de destino existir, os dados existentes no destino serão substituídos por dados das pastas de preparo. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
- Anexar
- Se a tabela de destino existir, os novos dados serão anexados a ela. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
Identificador de retorno de chamada de solicitação de gravação
As novas alterações na API de caminho de gravação introduziram um recurso experimental para fornecer ao cliente um mapa chave-valor de métricas pós-gravação>. As chaves para as métricas são definidas na nova definição de objeto - Constants.FeedbackConstants
. As métricas podem ser recuperadas como uma cadeia de caracteres JSON passando o identificador de retorno de chamada (a Scala Function
). Segue-se a assinatura da função:
//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit
A seguir estão algumas métricas notáveis (apresentadas no caso do camelo):
WriteFailureCause
DataStagingSparkJobDurationInMilliseconds
NumberOfRecordsStagedForSQLCommit
SQLStatementExecutionDurationInMilliseconds
rows_processed
A seguir está um exemplo de cadeia de caracteres JSON com métricas pós-gravação:
{
SparkApplicationId -> <spark_yarn_application_id>,
SQLStatementExecutionDurationInMilliseconds -> 10113,
WriteRequestReceivedAtEPOCH -> 1647523790633,
WriteRequestProcessedAtEPOCH -> 1647523808379,
StagingDataFileSystemCheckDurationInMilliseconds -> 60,
command -> "COPY INTO [schema_name].[table_name] ...",
NumberOfRecordsStagedForSQLCommit -> 100,
DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
DataStagingSparkJobDurationInMilliseconds -> 5252,
rows_processed -> 100,
SaveModeApplied -> TRUNCATE_COPY,
DurationInMillisecondsToValidateFileFormat -> 75,
status -> Completed,
SparkApplicationName -> <spark_application_name>,
ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
JDBCConfigurationsSetupAtEPOCH -> 193,
StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
SchemaInferenceCheckDurationInMilliseconds -> 91,
SaveModeRequested -> Overwrite,
DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}
Mais exemplos de código
Usando dados materializados entre células
O Spark DataFrame pode createOrReplaceTempView
ser usado para acessar dados obtidos em outra célula, registrando uma exibição temporária.
- Célula onde os dados são obtidos (digamos com a preferência de idioma do Bloco de Anotações como
Scala
)
//Necessary imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Configure options and read from Synapse Dedicated SQL Pool.
val readDF = spark.read.
//Set Synapse Dedicated SQL End Point name.
option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
//Set database user name.
option(Constants.USER, "<user_name>").
//Set database user's password.
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
option(Constants.DATA_SOURCE,"<data_source_name>").
//Set the three-part table name from which the read must be performed.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Optional - specify number of records the DataFrame would read.
limit(10)
//Register the temporary view (scope - current active Spark Session)
readDF.createOrReplaceTempView("<temporary_view_name>")
- Agora, altere a preferência de idioma no Bloco de Anotações
PySpark (Python)
e busque dados do modo de exibição registrado<temporary_view_name>
spark.sql("select * from <temporary_view_name>").show()
Tratamento das respostas
Invocar synapsesql
tem dois estados finais possíveis - Sucesso ou um Estado Falhado. Esta seção descreve como lidar com a resposta de solicitação para cada cenário.
Ler resposta de pedido
Após a conclusão, o trecho de resposta de leitura é exibido na saída da célula. Uma falha na célula atual também cancelará as execuções subsequentes da célula. Informações detalhadas sobre erros estão disponíveis nos logs do aplicativo Spark.
Escrever resposta de solicitação
Por padrão, uma resposta de gravação é impressa na saída da célula. Em caso de falha, a célula atual é marcada como falha e as execuções de célula subsequentes serão anuladas. A outra abordagem é passar a opção de identificador de retorno de chamada para o synapsesql
método. O identificador de retorno de chamada fornecerá acesso programático à resposta de gravação.
Outras considerações
- Ao ler as tabelas do Pool SQL Dedicado do Azure Synapse:
- Considere a aplicação dos filtros necessários no DataFrame para aproveitar o recurso de remoção de colunas do Connector.
- O cenário de leitura não suporta a
TOP(n-rows)
cláusula, ao enquadrar asSELECT
instruções de consulta. A opção para limitar dados é usar a cláusula limit(.) do DataFrame.- Consulte o exemplo - Usando dados materializados entre células seção.
- Ao escrever nas tabelas do Pool SQL Dedicado do Azure Synapse:
- Para tipos de tabelas internas:
- As tabelas são criadas com ROUND_ROBIN distribuição de dados.
- Os tipos de coluna são inferidos a partir do DataFrame que leria dados da origem. As colunas de cadeia de caracteres são mapeadas para
NVARCHAR(4000)
.
- Para tipos de tabelas externas:
- O paralelismo inicial do DataFrame orienta a organização de dados para a tabela externa.
- Os tipos de coluna são inferidos a partir do DataFrame que leria dados da origem.
- Uma melhor distribuição de dados entre executores pode ser alcançada ajustando o parâmetro e o
spark.sql.files.maxPartitionBytes
DataFramerepartition
. - Ao escrever grandes conjuntos de dados, é importante levar em consideração o impacto da configuração de Nível de Desempenho DWU que limita o tamanho da transação.
- Para tipos de tabelas internas:
- Monitore as tendências de utilização do Azure Data Lake Storage Gen2 para identificar comportamentos de limitação que podem afetar o desempenho de leitura e gravação.