Delen via


Azure Synapse toegewezen SQL-poolconnector voor Apache Spark

Introductie

De toegewezen SQL-pool van Azure Synapse Verbinding maken or voor Apache Spark in Azure Synapse Analytics maakt een efficiënte overdracht van grote gegevenssets mogelijk tussen de Apache Spark-runtime en de toegewezen SQL-pool. De connector wordt geleverd als een standaardbibliotheek met Azure Synapse-werkruimte. De connector wordt geïmplementeerd met behulp van Scala de taal. De connector ondersteunt Scala en Python. Als u de Verbinding maken or wilt gebruiken met andere taalopties voor notebooks, gebruikt u de Magic-opdracht van Spark - %%spark.

Op hoog niveau biedt de connector de volgende mogelijkheden:

  • Lezen uit de toegewezen SQL-pool van Azure Synapse:
    • Lees grote gegevenssets uit Synapse Dedicated SQL-pooltabellen (intern en extern) en weergaven.
    • Uitgebreide push-downondersteuning voor predicaat, waarbij filters op DataFrame worden toegewezen aan de bijbehorende SQL-predicaatpush omlaag.
    • Ondersteuning voor het snoeien van kolommen.
    • Ondersteuning voor het pushen van query's.
  • Schrijven naar een toegewezen SQL-pool van Azure Synapse:
    • Gegevens van grote volumes opnemen in interne en externe tabeltypen.
    • Ondersteunt de volgende voorkeuren voor de opslagmodus van DataFrame:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • Schrijven naar extern tabeltype ondersteunt parquet- en tekstbestandsindeling met scheidingstekens (bijvoorbeeld CSV).
    • Als u gegevens naar interne tabellen wilt schrijven, gebruikt de connector nu de COPY-instructie in plaats van de CETAS-/CTAS-benadering.
    • Verbeteringen voor het optimaliseren van end-to-end schrijfdoorvoerprestaties.
    • Introduceert een optionele call-back-handle (een scala-functieargument) die clients kunnen gebruiken om metrische gegevens na schrijven te ontvangen.
      • Enkele voorbeelden zijn: het aantal records, de duur voor het voltooien van een bepaalde actie en de reden van de fout.

Indelingsbenadering

Lezen

A high-level data flow diagram to describe the connector's orchestration of a read request.

Write

A high-level data flow diagram to describe the connector's orchestration of a write request.

Vereisten

Vereisten, zoals het instellen van vereiste Azure-resources en stappen voor het configureren ervan, worden in deze sectie besproken.

Azure-resources

Controleer en stel de volgende afhankelijke Azure-resources in:

De database voorbereiden

Verbinding maken naar de Synapse Dedicated SQL-pooldatabase en voer de volgende installatie-instructies uit:

  • Maak een databasegebruiker die is toegewezen aan de Microsoft Entra-gebruikersidentiteit die wordt gebruikt om u aan te melden bij de Azure Synapse-werkruimte.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Maak een schema waarin tabellen worden gedefinieerd, zodat de Verbinding maken or kan schrijven naar en lezen vanuit respectieve tabellen.

    CREATE SCHEMA [<schema_name>];
    

Verificatie

Verificatie op basis van Microsoft Entra-id

Verificatie op basis van Microsoft Entra ID is een geïntegreerde verificatiemethode. De gebruiker moet zich aanmelden bij de Azure Synapse Analytics-werkruimte.

Basisverificatie

Voor een basisverificatiebenadering moet de gebruiker de configuratie en password opties instellenusername. Raadpleeg de sectie - Configuratieopties voor meer informatie over relevante configuratieparameters voor het lezen van en schrijven naar tabellen in Azure Synapse Dedicated SQL-pool.

Autorisatie

Azure Data Lake Storage Gen2

Er zijn twee manieren om toegangsmachtigingen te verlenen aan Azure Data Lake Storage Gen2 - Opslagaccount:

  • Rol op basis van toegangsbeheer voor opslagblobgegevens
    • Als u de Storage Blob Data Contributor Role gebruiker toewijst, worden de gebruikersmachtigingen verleend voor lezen, schrijven en verwijderen uit de Azure Storage Blob-containers.
    • RBAC biedt een benadering voor grof beheer op containerniveau.
  • Toegangsbeheerlijsten (ACL)
    • Met ACL-benadering kunt u nauwkeurige besturingselementen toepassen op specifieke paden en/of bestanden onder een bepaalde map.
    • ACL-controles worden niet afgedwongen als de gebruiker al machtigingen krijgt met behulp van de RBAC-benadering.
    • Er zijn twee algemene typen ACL-machtigingen:
      • Toegangsmachtigingen (toegepast op een specifiek niveau of object).
      • Standaardmachtigingen (automatisch toegepast op alle onderliggende objecten op het moment dat ze zijn gemaakt).
    • Het type machtigingen zijn onder andere:
      • Execute maakt het mogelijk om door de mappenhiërarchieën te navigeren of te navigeren.
      • Read maakt het mogelijk om te lezen.
      • Write maakt het mogelijk om te schrijven.
    • Het is belangrijk om ACL's te configureren, zodat de Verbinding maken or kan schrijven en lezen vanaf de opslaglocaties.

Notitie

  • Als u notebooks wilt uitvoeren met behulp van Synapse Workspace-pijplijnen, moet u ook hierboven vermelde toegangsmachtigingen verlenen voor de standaard beheerde identiteit van de Synapse-werkruimte. De standaardnaam van de beheerde identiteit van de werkruimte is hetzelfde als de naam van de werkruimte.

  • Als u de Synapse-werkruimte wilt gebruiken met beveiligde opslagaccounts, moet een beheerd privé-eindpunt worden geconfigureerd vanuit het notebook. Het beheerde privé-eindpunt moet worden goedgekeurd vanuit de sectie van Private endpoint connections het ADLS Gen2-opslagaccount in het Networking deelvenster.

Toegewezen SQL-pool van Azure Synapse

Als u een succesvolle interactie met een toegewezen SQL-pool van Azure Synapse wilt inschakelen, is de volgende autorisatie nodig, tenzij u een gebruiker bent die ook is geconfigureerd als een Active Directory Admin toegewezen SQL-eindpunt:

  • Scenario lezen

    • Verdeel de gebruiker db_exporter met behulp van de door het systeem opgeslagen procedure sp_addrolemember.

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Scenario schrijven

    • Verbinding maken or de opdracht COPY gebruikt om gegevens te schrijven van fasering naar de beheerde locatie van de interne tabel.
      • Configureer de vereiste machtigingen die hier worden beschreven.

      • Hier volgt een snel toegangsfragment van hetzelfde:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

API-documentatie

Azure Synapse Dedicated SQL-pool Verbinding maken or voor Documentatie voor Apache Spark - API.

Configuratieopties

Als u de lees- of schrijfbewerking wilt opstarten en organiseren, verwacht de Verbinding maken or bepaalde configuratieparameters. De objectdefinitie: com.microsoft.spark.sqlanalytics.utils.Constants biedt een lijst met gestandaardiseerde constanten voor elke parametersleutel.

Hieronder volgt de lijst met configuratieopties op basis van gebruiksscenario:

  • Lezen met verificatie op basis van Microsoft Entra ID
    • Referenties zijn automatisch toegewezen en de gebruiker hoeft geen specifieke configuratieopties te bieden.
    • Argument voor tabelnaam in drie delen voor synapsesql de methode is vereist om te lezen uit de respectieve tabel in de Toegewezen SQL-pool van Azure Synapse.
  • Lezen met basisverificatie
    • Toegewezen SQL-eindpunt van Azure Synapse
      • Constants.SERVER - Eindpunt van toegewezen SQL-pool synapse (server-FQDN)
      • Constants.USER - SQL-gebruikersnaam.
      • Constants.PASSWORD - SQL-gebruikerswachtwoord.
    • Eindpunt van Azure Data Lake Storage (Gen 2) - Faseringsmappen
      • Constants.DATA_SOURCE - Het opslagpad dat is ingesteld op de locatieparameter van de gegevensbron wordt gebruikt voor het faseren van gegevens.
  • Schrijven met verificatie op basis van Microsoft Entra ID
    • Toegewezen SQL-eindpunt van Azure Synapse
      • Standaard wordt met de Verbinding maken or het Synapse Dedicated SQL-eindpunt afgeleid met behulp van de databasenaam die is ingesteld op de parameter voor de driedelige tabelnaam van de synapsesql methode.
      • Gebruikers kunnen ook de Constants.SERVER optie gebruiken om het SQL-eindpunt op te geven. Zorg ervoor dat het eindpunt als host fungeert voor de bijbehorende database met het respectieve schema.
    • Eindpunt van Azure Data Lake Storage (Gen 2) - Faseringsmappen
      • Voor intern tabeltype:
        • Constants.TEMP_FOLDER Configureer een van beide of Constants.DATA_SOURCE opties.
        • Als de gebruiker ervoor heeft gekozen om een optie op te geven Constants.DATA_SOURCE , wordt de faseringsmap afgeleid met behulp van de location waarde uit de DataSource.
        • Als beide zijn opgegeven, wordt de Constants.TEMP_FOLDER optiewaarde gebruikt.
        • Als er geen optie voor een faseringsmap is, leidt de Verbinding maken or er een af op basis van de runtimeconfiguratie - spark.sqlanalyticsconnector.stagingdir.prefix.
      • Voor extern tabeltype:
        • Constants.DATA_SOURCE is een vereiste configuratieoptie.
        • De connector maakt gebruik van het opslagpad dat is ingesteld op de locatieparameter van de gegevensbron in combinatie met het location argument voor de synapsesql methode en leidt het absolute pad af om externe tabelgegevens op te slaan.
        • Als het argument voor synapsesql de location methode niet is opgegeven, wordt de locatiewaarde door de connector afgeleid als <base_path>/dbName/schemaName/tableName.
  • Schrijven met basisverificatie
    • Toegewezen SQL-eindpunt van Azure Synapse
      • Constants.SERVER - Synapse Toegewezen SQL-pooleindpunt (server-FQDN).
      • Constants.USER - SQL-gebruikersnaam.
      • Constants.PASSWORD - SQL-gebruikerswachtwoord.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY gekoppeld aan opslagaccount dat als host fungeert Constants.TEMP_FOLDERS (alleen interne tabeltypen) of Constants.DATA_SOURCE.
    • Eindpunt van Azure Data Lake Storage (Gen 2) - Faseringsmappen
      • Sql-basisverificatiereferenties zijn niet van toepassing op toegang tot opslageindpunten.
      • Zorg er daarom voor dat u relevante machtigingen voor opslagtoegang toewijst, zoals beschreven in de sectie Azure Data Lake Storage Gen2.

Codesjablonen

In deze sectie vindt u referentiecodesjablonen om te beschrijven hoe u de toegewezen SQL-pool van Azure Synapse gebruikt en aanroept Verbinding maken or voor Apache Spark.

Notitie

De Verbinding maken or gebruiken in Python-

  • De connector wordt alleen ondersteund in Python voor Spark 3. Voor Spark 2.4 (niet-ondersteund) kunnen we de Scala-connector-API gebruiken om te communiceren met inhoud van een DataFrame in PySpark met behulp van DataFrame.createOrReplaceTempView of DataFrame.createOrReplaceGlobalTempView. Zie Sectie: gerealiseerde gegevens in cellen gebruiken.
  • De terugbelgreep is niet beschikbaar in Python.

Lezen uit de toegewezen SQL-pool van Azure Synapse

Aanvraag lezen - synapsesql methodehandtekening

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Lezen uit een tabel met verificatie op basis van Microsoft Entra-id

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Lezen uit een query met behulp van verificatie op basis van Microsoft Entra ID

Notitie

Beperkingen tijdens het lezen van query's:

  • Tabelnaam en query kunnen niet tegelijkertijd worden opgegeven.
  • Alleen selectiequery's zijn toegestaan. DDL- en DML-MCL's zijn niet toegestaan.
  • De opties voor selecteren en filteren in het dataframe worden niet naar de toegewezen SQL-pool gepusht wanneer een query wordt opgegeven.
  • Lezen uit een query is alleen beschikbaar in Spark 3.1 en 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Lezen uit een tabel met basisverificatie

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Lezen uit een query met basisverificatie

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Schrijven naar toegewezen SQL-pool van Azure Synapse

Schrijfaanvraag - synapsesql methodehandtekening

De methodehandtekening voor de Verbinding maken orversie die is gebouwd voor Spark 2.4.8 heeft één minder argument dan die is toegepast op de Versie van Spark 3.1.2. Hieronder ziet u de twee methodehandtekeningen:

  • Spark-pool versie 2.4.8
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Spark-pool versie 3.1.2
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Schrijven met verificatie op basis van Microsoft Entra ID

Hieronder vindt u een uitgebreide codesjabloon waarin wordt beschreven hoe u de Verbinding maken or gebruikt voor schrijfscenario's:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Schrijven met basisverificatie

Met het volgende codefragment wordt de schrijfdefinitie vervangen die wordt beschreven in de sectie Schrijven met behulp van verificatie op basis van Microsoft Entra ID, om schrijfaanvragen in te dienen met behulp van de basisverificatiemethode van SQL:

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

Voor een basisverificatiemethode zijn andere configuratieopties vereist om gegevens uit een bronopslagpad te lezen. Het volgende codefragment biedt een voorbeeld om te lezen uit een Azure Data Lake Storage Gen2-gegevensbron met behulp van service-principalreferenties:

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Ondersteunde dataframe-opslagmodi

De volgende opslagmodi worden ondersteund bij het schrijven van brongegevens naar een doeltabel in een toegewezen SQL-pool van Azure Synapse:

  • ErrorIfExists (standaard opslaanmodus)
    • Als de doeltabel bestaat, wordt de schrijfbewerking afgebroken met een uitzondering die wordt geretourneerd naar de aangeroepene. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
  • Negeren
    • Als de doeltabel bestaat, negeert de schrijfbewerking de schrijfaanvraag zonder een fout te retourneren. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
  • Overschrijven
    • Als de doeltabel bestaat, worden bestaande gegevens in de bestemming vervangen door gegevens uit de faseringsmappen. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
  • Toevoegen
    • Als de doeltabel bestaat, worden de nieuwe gegevens eraan toegevoegd. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.

Callback-handle voor write-aanvragen

Met de nieuwe api voor schrijfpadwijzigingen is een experimentele functie geïntroduceerd om de client een sleutelwaardetoewijzing> van metrische gegevens na schrijven te bieden. Sleutels voor de metrische gegevens worden gedefinieerd in de nieuwe objectdefinitie - Constants.FeedbackConstants. Metrische gegevens kunnen worden opgehaald als een JSON-tekenreeks door te geven in de callback-handle (a Scala Function). Hier volgt de functiehandtekening:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

Hieronder volgen enkele belangrijke metrische gegevens (gepresenteerd in kameel geval):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

Hier volgt een voorbeeld van een JSON-tekenreeks met metrische gegevens na schrijven:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Meer codevoorbeelden

Gerealiseerde gegevens in cellen gebruiken

Spark DataFrame's createOrReplaceTempView kunnen worden gebruikt voor toegang tot gegevens die zijn opgehaald in een andere cel door een tijdelijke weergave te registreren.

  • Cel waarin gegevens worden opgehaald (bijvoorbeeld met voorkeur voor notebooktaal)Scala
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • Wijzig nu de taalvoorkeur in het notitieblok in PySpark (Python) en haal gegevens op uit de geregistreerde weergave <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Verwerking van antwoorden

Aanroepen synapsesql heeft twee mogelijke eindstatussen: geslaagd of mislukt. In deze sectie wordt beschreven hoe u het aanvraagantwoord voor elk scenario kunt verwerken.

Antwoord op aanvraag lezen

Na voltooiing wordt het leesantwoordfragment weergegeven in de uitvoer van de cel. Als de huidige cel niet werkt, worden ook volgende celuitvoeringen geannuleerd. Gedetailleerde foutinformatie is beschikbaar in de Spark-toepassingslogboeken.

Antwoord van schrijfaanvraag

Standaard wordt een schrijfantwoord afgedrukt naar de uitvoer van de cel. Bij een fout wordt de huidige cel gemarkeerd als mislukt en worden de volgende celuitvoeringen afgebroken. De andere methode is het doorgeven van de callback-handleoptie aan de synapsesql methode. De callback-ingang biedt programmatische toegang tot het schrijfantwoord.

Overige overwegingen

  • Wanneer u leest vanuit de tabellen met toegewezen SQL-pool van Azure Synapse:
    • Overweeg om de benodigde filters toe te passen op het DataFrame om te profiteren van de functie voor kolomsnoeien van de Verbinding maken or.
    • Leesscenario biedt geen ondersteuning voor de TOP(n-rows) component bij het inlijsten van de SELECT query-instructies. De keuze om gegevens te beperken, is door de component limit(.) van het DataFrame te gebruiken.
  • Wanneer u naar de toegewezen SQL-pooltabellen van Azure Synapse schrijft:
    • Voor interne tabeltypen:
      • Tabellen worden gemaakt met ROUND_ROBIN gegevensdistributie.
      • Kolomtypen worden afgeleid uit het DataFrame dat gegevens uit de bron zou lezen. Tekenreekskolommen worden toegewezen aan NVARCHAR(4000).
    • Voor externe tabeltypen:
      • De initiële parallelle uitvoering van DataFrame bepaalt de gegevensorganisatie voor de externe tabel.
      • Kolomtypen worden afgeleid uit het DataFrame dat gegevens uit de bron zou lezen.
    • Een betere gegevensdistributie tussen uitvoerders kan worden bereikt door de parameter van het spark.sql.files.maxPartitionBytes DataFrame en de parameter van repartition het DataFrame af te stemmen.
    • Bij het schrijven van grote gegevenssets is het belangrijk om rekening te houden met de impact van dwU-prestatieniveau dat de transactiegrootte beperkt.
  • Bewaak azure Data Lake Storage Gen2-gebruikstrends om beperkingsgedrag te herkennen dat invloed kan hebben op de lees- en schrijfprestaties.

Verwijzingen