適用於 Apache Spark 的 Azure Synapse 專用 SQL 集區連接器
簡介
Azure Synapse Analytics 中適用於 Apache Spark 的 Azure Synapse 專用 SQL 集區連接器可讓您有效率地在 Apache Spark 運行時間和專用 SQL 集區之間傳輸大型數據集。 連接器會透過 Azure Synapse 工作區以預設程式庫的形式運送。 連接器是使用 Scala
語言實作。 連接器支援 Scala 和 Python。 若要搭配其他筆記本語言選擇使用連接器,請使用 Spark magic 命令 - %%spark
。
在高層級,連接器提供下列功能:
- 從 Azure Synapse 專用 SQL 集區讀取:
- 從 Synapse 專用 SQL 集區數據表 (內部和外部) 和檢視讀取大型數據集。
- 完整的述詞下推支援,其中 DataFrame 上的篩選會對應至對應的 SQL 述詞下推。
- 支援數據行剪除。
- 支援向下推入查詢。
- 寫入 Azure Synapse 專用 SQL 集區:
- 將大型磁碟區數據內嵌至內部和外部數據表類型。
- 支援下列 DataFrame 儲存模式喜好設定:
Append
ErrorIfExists
Ignore
Overwrite
- 寫入外部數據表類型支援 Parquet 和分隔文字檔格式 (範例 - CSV)。
- 若要將數據寫入內部數據表,連接器現在會使用 COPY 語句 ,而不是 CETAS/CTAS 方法。
- 優化端對端寫入輸送量效能的增強功能。
- 引進選擇性的回呼句柄(Scala 函式自變數),用戶端可用來接收寫入後計量。
- 少數範例包括 - 記錄數目、完成特定動作的持續時間,以及失敗原因。
協調流程方法
參閱
寫入
必要條件
本節將討論設定必要 Azure 資源和設定必要條件的步驟。
Azure 資源
檢閱並設定下列相依的 Azure 資源:
- Azure Data Lake Storage - 作為 Azure Synapse 工作區的主要記憶體帳戶。
- Azure Synapse 工作區 - 建立筆記本、建置及部署以數據框架為基礎的輸入輸出工作流程。
- 專用 SQL 集區 (先前稱為 SQL DW) - 提供企業 資料倉儲 功能。
- Azure Synapse 無伺服器 Spark 集 區 - Spark 運行時間,其中作業會以 Spark 應用程式的形式執行。
準備資料庫
線上到 Synapse 專用 SQL 集區資料庫,然後執行下列安裝語句:
建立對應至用來登入 Azure Synapse Workspace 之 Microsoft Entra 使用者身分識別的資料庫使用者。
CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;
建立將定義數據表的架構,讓連接器可以成功寫入和讀取個別數據表。
CREATE SCHEMA [<schema_name>];
驗證
Microsoft以專案標識碼為基礎的驗證
Microsoft Entra ID 型驗證是整合式驗證方法。 用戶必須成功登入 Azure Synapse Analytics 工作區。
基本驗證
基本身份驗證方法需要用戶設定 username
和 password
選項。 請參閱 - 組態選項 ,以瞭解相關組態參數,以讀取和寫入 Azure Synapse 專用 SQL 集區中的數據表。
授權
Azure Data Lake Storage Gen2 \(部分機器翻譯\)
有兩種方式可將訪問許可權授與 Azure Data Lake Storage Gen2 - 記憶體帳戶:
- 角色型 存取控制 角色 - 記憶體 Blob 數據參與者角色
- 指派 授
Storage Blob Data Contributor Role
與使用者讀取、寫入和刪除 Azure 儲存體 Blob 容器的許可權。 - RBAC 在容器層級提供粗略的控制方法。
- 指派 授
- 存取控制 清單 (ACL)
- ACL 方法允許對指定資料夾下的特定路徑和/或檔案進行更細緻的控制。
- 如果使用者已使用 RBAC 方法授與許可權,則不會強制執行 ACL 檢查。
- ACL 權限有兩種廣泛的類型:
- 訪問許可權(套用在特定層級或物件上)。
- 默認許可權(在建立時自動套用到所有子物件)。
- 權限類型包括:
Execute
可讓您周遊或瀏覽資料夾階層。Read
可讓您讀取。Write
可讓您撰寫。
- 請務必設定 ACL,讓連接器能夠成功從記憶體位置寫入和讀取。
注意
Azure Synapse 專用 SQL 集區
若要啟用與 Azure Synapse 專用 SQL 集區的成功互動,除非您是使用者也設定為 Active Directory Admin
專用 SQL 端點上的 ,否則需要下列授權:
讀取案例
使用系統預存程式
sp_addrolemember
授與使用者db_exporter
。EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
寫入案例
- 連接器會使用 COPY 命令將資料從預備環境寫入內部數據表的 Managed 位置。
設定這裡所述的必要許可權。
以下是相同專案的快速存取代碼段:
--Make sure your user has the permissions to CREATE tables in the [dbo] schema GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com]; GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has INSERT permissions on the target table GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
- 連接器會使用 COPY 命令將資料從預備環境寫入內部數據表的 Managed 位置。
API 文件
適用於 Apache Spark 的 Azure Synapse 專用 SQL 集區連接器 - API 檔。
設定選項
若要成功啟動程式並協調讀取或寫入作業,連接器需要特定的組態參數。 物件定義 - com.microsoft.spark.sqlanalytics.utils.Constants
提供每個參數索引鍵的標準化常數清單。
以下是根據使用案例的組態選項清單:
- 使用 Microsoft Entra ID 型驗證進行讀取
- 認證會自動對應,而且使用者不需要提供特定的組態選項。
- 方法上的
synapsesql
三部分數據表名稱自變數必須從 Azure Synapse 專用 SQL 集區中的個別數據表讀取。
- 使用基本身份驗證讀取
- Azure Synapse 專用 SQL 端點
Constants.SERVER
- Synapse 專用 SQL 集區端點 (伺服器 FQDN)Constants.USER
- SQL 用戶名稱。Constants.PASSWORD
- SQL 用戶密碼。
- Azure Data Lake Storage (Gen 2) 端點 - 預備資料夾
Constants.DATA_SOURCE
- 資料來源位置參數上設定的記憶體路徑會用於數據暫存。
- Azure Synapse 專用 SQL 端點
- 使用 Microsoft Entra ID 型驗證撰寫
- Azure Synapse 專用 SQL 端點
- 根據預設,連接器會使用方法三部分數據表名稱參數上
synapsesql
設定的資料庫名稱來推斷 Synapse Dedicated SQL 端點。 - 或者,用戶可以使用
Constants.SERVER
選項來指定 sql 端點。 確定端點裝載具有個別架構的對應資料庫。
- 根據預設,連接器會使用方法三部分數據表名稱參數上
- Azure Data Lake Storage (Gen 2) 端點 - 預備資料夾
- 針對內部資料表型態:
Constants.TEMP_FOLDER
設定或Constants.DATA_SOURCE
選項。- 如果使用者選擇提供
Constants.DATA_SOURCE
選項,預備資料夾會使用location
DataSource的值來衍生。 - 如果兩者都提供,
Constants.TEMP_FOLDER
則會使用選項值。 - 如果沒有預備資料夾選項,連接器會根據執行時間群組態衍生一個 -
spark.sqlanalyticsconnector.stagingdir.prefix
。
- 針對外部資料表型態:
Constants.DATA_SOURCE
是必要的組態選項。- 連接器會使用數據源位置參數上設定的儲存路徑,結合
location
方法的自變數synapsesql
,並衍生絕對路徑來保存外部數據表數據。 - 如果未指定方法的
location
synapsesql
自變數,則連接器會衍生位置值做為<base_path>/dbName/schemaName/tableName
。
- 針對內部資料表型態:
- Azure Synapse 專用 SQL 端點
- 使用基本身份驗證撰寫
- Azure Synapse 專用 SQL 端點
Constants.SERVER
- Synapse 專用 SQL 集區端點(伺服器 FQDN)。Constants.USER
- SQL 用戶名稱。Constants.PASSWORD
- SQL 用戶密碼。Constants.STAGING_STORAGE_ACCOUNT_KEY
與載入Constants.TEMP_FOLDERS
的記憶體帳戶相關聯(僅限內部資料表類型)或Constants.DATA_SOURCE
。
- Azure Data Lake Storage (Gen 2) 端點 - 預備資料夾
- SQL 基本身份驗證認證不適用於存取記憶體端點。
- 因此,請確定指派相關的記憶體訪問許可權,如 Azure Data Lake Storage Gen2 一節所述。
- Azure Synapse 專用 SQL 端點
程式代碼範本
本節提供參考程式代碼範本,說明如何使用和叫用適用於Apache Spark的 Azure Synapse 專用 SQL 集區連接器。
注意
Python 中使用連接器 -
- 只有適用於Spark 3的 Python 支援連接器。 針對 Spark 2.4(不支援),我們可以使用 Scala 連接器 API,透過使用 DataFrame.createOrReplaceTempView 或 DataFrame.createOrReplaceGlobalTempView 與 PySpark 中 DataFrame 的內容互動。 請參閱章節 - 跨數據格使用具體化數據。
- 在 Python 中無法使用回呼句柄。
從 Azure Synapse 專用 SQL 集區讀取
讀取要求 - synapsesql
方法簽章
使用 Microsoft Entra ID 型驗證從數據表讀取
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
使用 Microsoft Entra ID 型驗證從查詢讀取
注意
從查詢讀取時的限制:
- 無法同時指定資料表名稱和查詢。
- 只允許選取查詢。 不允許 DDL 和 DML SQL。
- 指定查詢時,數據框架上的選取和篩選選項不會向下推送至 SQL 專用集區。
- 從查詢讀取僅適用於Spark 3。
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>")
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
使用基本身份驗證從數據表讀取
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the table will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
使用基本身份驗證從查詢讀取
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
寫入 Azure Synapse 專用 SQL 集區
寫入要求 - synapsesql
方法簽章
synapsesql(tableName:String,
tableType:String = Constants.INTERNAL,
location:Option[String] = None,
callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit
使用 Microsoft Entra ID 型驗證撰寫
以下是一個完整的程式代碼範本,描述如何使用連接器進行撰寫案例:
//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"
//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")
//Initialize DataFrame that reads CSV data from a given source
val readDF:DataFrame=spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(1000) //Reads first 1000 rows from the source CSV input.
//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
// 1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
// 2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab.
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
(feedback: Map[String, Any], errorState: Option[Throwable]) => {
println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
errorDuringWrite = errorState
}
//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
write.
//Configure required configurations.
options(writeOptionsWithAADAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
location = None,
//Optional parameter to receive a callback.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get
使用基本身份驗證撰寫
下列代碼段會取代使用 Microsoft Entra ID 型驗證一節中所述 的寫入定義,以使用 SQL 基本身份驗證 方法提交寫入要求:
//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
//Set database user name
Constants.USER -> "<user_name>",
//Set database user's password
Constants.PASSWORD -> "<user_password>",
//Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
//To be used only when writing to internal tables. Storage path will be used for data staging.
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Configure and submit the request to write to Synapse Dedicated SQL Pool.
readDF.
write.
options(writeOptionsWithBasicAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Not required for writing to an internal table
location = None,
//Optional parameter.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
在基本身份驗證方法中,若要從來源記憶體路徑讀取數據,則需要其他組態選項。 下列代碼段提供使用服務主體認證從 Azure Data Lake Storage Gen2 數據源讀取的範例:
//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
"delimiter"->",",
"fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" ->
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id" -> s"$spnClientId",
"fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
"fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
"fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
"fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(100)
支援的 DataFrame 儲存模式
將源數據寫入 Azure Synapse 專用 SQL 集區中的目的地數據表時,支援下列儲存模式:
- ErrorIfExists (預設儲存模式)
- 如果目的地數據表存在,則寫入會中止,並傳回給被呼叫端的例外狀況。 否則,系統會使用來自暫存資料夾的數據來建立新的資料表。
- 忽視
- 如果目的地數據表存在,則寫入會忽略寫入要求,而不會傳回錯誤。 否則,系統會使用來自暫存資料夾的數據來建立新的資料表。
- 覆寫
- 如果目的地數據表存在,則目的地中的現有數據會取代為預備資料夾中的數據。 否則,系統會使用來自暫存資料夾的數據來建立新的資料表。
- 附加
- 如果目的地數據表存在,則會將新數據附加至該數據表。 否則,系統會使用來自暫存資料夾的數據來建立新的資料表。
寫入要求回呼句柄
新的寫入路徑 API 變更引進了實驗性功能,以提供用戶端寫入後計量的索引鍵/>值對應。 計量的索引鍵是在新的物件定義中定義 - Constants.FeedbackConstants
。 您可以藉由傳入回呼句柄 (a Scala Function
) 來擷取計量作為 JSON 字串。 以下是函式簽章:
//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit
以下是一些值得注意的計量(在駱駝案例中顯示):
WriteFailureCause
DataStagingSparkJobDurationInMilliseconds
NumberOfRecordsStagedForSQLCommit
SQLStatementExecutionDurationInMilliseconds
rows_processed
以下是具有寫入後計量的範例 JSON 字串:
{
SparkApplicationId -> <spark_yarn_application_id>,
SQLStatementExecutionDurationInMilliseconds -> 10113,
WriteRequestReceivedAtEPOCH -> 1647523790633,
WriteRequestProcessedAtEPOCH -> 1647523808379,
StagingDataFileSystemCheckDurationInMilliseconds -> 60,
command -> "COPY INTO [schema_name].[table_name] ...",
NumberOfRecordsStagedForSQLCommit -> 100,
DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
DataStagingSparkJobDurationInMilliseconds -> 5252,
rows_processed -> 100,
SaveModeApplied -> TRUNCATE_COPY,
DurationInMillisecondsToValidateFileFormat -> 75,
status -> Completed,
SparkApplicationName -> <spark_application_name>,
ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
JDBCConfigurationsSetupAtEPOCH -> 193,
StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
SchemaInferenceCheckDurationInMilliseconds -> 91,
SaveModeRequested -> Overwrite,
DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}
更多程式碼範例
跨數據格使用具體化數據
Spark DataFrame createOrReplaceTempView
可以藉由註冊暫存檢視,來存取另一個數據格中擷取的數據。
- 擷取數據的數據格(例如,以筆記本語言喜好設定為
Scala
)
//Necessary imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Configure options and read from Synapse Dedicated SQL Pool.
val readDF = spark.read.
//Set Synapse Dedicated SQL End Point name.
option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
//Set database user name.
option(Constants.USER, "<user_name>").
//Set database user's password.
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
option(Constants.DATA_SOURCE,"<data_source_name>").
//Set the three-part table name from which the read must be performed.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Optional - specify number of records the DataFrame would read.
limit(10)
//Register the temporary view (scope - current active Spark Session)
readDF.createOrReplaceTempView("<temporary_view_name>")
- 現在,將 Notebook 上的語言喜好設定變更為
PySpark (Python)
,並從已註冊的檢視擷取數據<temporary_view_name>
spark.sql("select * from <temporary_view_name>").show()
回應處理
叫用 synapsesql
有兩個可能的結束狀態 - 成功或失敗狀態。 本節說明如何處理每個案例的要求回應。
讀取要求回應
完成時,讀取回應代碼段會顯示在儲存格的輸出中。 目前儲存格中的失敗也會取消後續的儲存格執行。 Spark 應用程式記錄中提供詳細的錯誤資訊。
寫入要求回應
根據預設,寫入回應會列印至數據格輸出。 失敗時,目前的儲存格標示為失敗,後續的儲存格執行將會中止。 另一種方法是將 回呼句柄 選項傳遞至 synapsesql
方法。 回呼句柄會以程序設計方式存取寫入回應。
其他考量
- 從 Azure Synapse 專用 SQL 集區資料表讀取時:
- 請考慮在 DataFrame 上套用必要的篩選,以利用連接器的數據行剪除功能。
- 讀取案例不支援
TOP(n-rows)
子句,在框架SELECT
查詢語句時。 限制數據的選擇是使用 DataFrame 的 limit(.) 子句。- 請參閱範例 - 跨儲存格 區段使用具體化數據。
- 寫入 Azure Synapse 專用 SQL 集區資料表時:
- 監視 Azure Data Lake Storage Gen2 使用率趨勢,以找出可能會影響讀取和寫入效能的節流行為。