教學課程:使用 Spark 作業將資料內嵌至 SQL Server 資料集區
適用於:SQL Server 2019 (15.x)
重要
Microsoft SQL Server 2019 巨量資料叢集附加元件將會淘汰。 SQL Server 2019 巨量資料叢集的支援將於 2025 年 2 月 28 日結束。 平台上將完全支援含軟體保證 SQL Server 2019 的所有現有使用者,而且軟體將會持續透過 SQL Server 累積更新來維護,直到該時間為止。 如需詳細資訊,請參閱公告部落格文章與 Microsoft SQL Server 平台上的巨量資料選項。
本教學課程示範如何使用 Spark 作業,將資料載入 SQL Server 2019 巨量資料叢集的資料集區。
在本教學課程中,您會了解如何:
- 在資料集區中建立外部資料表。
- 建立 Spark 作業,以便從 HDFS 載入資料。
- 查詢外部資料表中的結果。
提示
如果您想要的話,也可以下載並執行用於本教學課程中命令的指令碼。 如需指示,請參閱 GitHub 上的資料集區範例。
必要條件
- 巨量資料工具
- kubectl
- Azure Data Studio
- SQL Server 2019 延伸模組
- 將範例資料載入巨量資料叢集
在資料集區中建立外部資料表
下列步驟會在資料集區中建立名為 web_clickstreams_spark_results 的外部資料表。 然後,此資料表可作為資料內嵌至巨量資料叢集的位置。
在 Azure Data Studio 中,連線到巨量資料叢集的 SQL Server 主要執行個體。 如需詳細資訊,請參閱連線到 SQL Server 主要執行個體。
按兩下 [伺服器] 視窗中的連線,顯示 SQL Server 主要執行個體的伺服器儀表板。 選取 [新增查詢]。
建立 MSSQL-Spark 連接器的權限。
USE Sales CREATE LOGIN sample_user WITH PASSWORD ='<password>' CREATE USER sample_user FROM LOGIN sample_user -- To create external tables in data pools GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user; -- To create external tables GRANT CREATE TABLE TO sample_user; GRANT ALTER ANY SCHEMA TO sample_user; -- To view database state for Sales GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user ALTER ROLE [db_datawriter] ADD MEMBER sample_user
如果資料集區沒有外部資料來源,請加以建立。
USE Sales GO IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool') CREATE EXTERNAL DATA SOURCE SqlDataPool WITH (LOCATION = 'sqldatapool://controller-svc/default');
在資料集區中建立名為 web_clickstreams_spark_results 的外部資料表。
USE Sales GO IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results') CREATE EXTERNAL TABLE [web_clickstreams_spark_results] ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT) WITH ( DATA_SOURCE = SqlDataPool, DISTRIBUTION = ROUND_ROBIN );
建立資料集區的登入,並提供權限給使用者。
EXECUTE( ' Use Sales; CREATE LOGIN sample_user WITH PASSWORD = ''<password>;'') AT DATA_SOURCE SqlDataPool; EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user; ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
資料集區外部資料表的建立是封鎖作業。 當指定的資料表已經在所有後端資料集區節點上建立之後,就會恢復控制權。 如果建立作業期間發生失敗,錯誤訊息會傳回給呼叫者。
啟動 Spark 串流作業
下一個步驟是建立 Spark 串流作業,將來自存放集區 (HDFS) 的 Web 點選流資料載入您在資料集區中建立的外部資料表。 這項資料已在將範例資料載入您的巨量資料叢集中新增至 /clickstream_data。
在 Azure Data Studio 中,連線到巨量資料叢集的主要執行個體。 如需詳細資訊,請參閱連線到巨量資料叢集。
建立新的筆記本並選取 Spark | Scala 作為您的核心。
執行 Spark 擷取作業
- 設定 Spark-SQL 連接器參數
注意
如果使用 Active Directory 整合來部署巨量資料叢集,請取代下列 hostname 值,以包含附加至服務名稱的 FQDN。 例如,hostname=master-p-svc.<domainName>。
import org.apache.spark.sql.types._ import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame} // Change per your installation val user= "username" val password= "<password>" val database = "MyTestDatabase" val sourceDir = "/clickstream_data" val datapool_table = "web_clickstreams_spark_results" val datasource_name = "SqlDataPool" val schema = StructType(Seq( StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true), StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true) )) val hostname = "master-p-svc" val port = 1433 val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
- 定義並執行 Spark 作業
- 每個作業都有兩個部分:readStream 和 writeStream。 以下我們會使用以上定義的結構描述來建立資料框架,然後寫入資料集區中的外部資料表。
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame} val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir) val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => batchDF.write .format("com.microsoft.sqlserver.jdbc.spark") .mode("append") .option("url", url) .option("dbtable", datapool_table) .option("user", user) .option("password", password) .option("dataPoolDataSource",datasource_name).save() }.start() query.awaitTermination(40000) query.stop()
查詢資料
下列步驟顯示 Spark 串流作業已將來自 HDFS 的資料載入資料集區。
在查詢內嵌資料前,請查看 Spark 執行狀態 (包括 Yarn 應用程式識別碼、Spark UI 和驅動程式記錄)。 當您第一次啟動 Spark 應用程式時,此資訊會顯示在筆記本中。
返回您在本教學課程開頭開啟的 SQL Server 主要執行個體查詢視窗。
執行下列查詢,檢查內嵌資料。
USE Sales GO SELECT count(*) FROM [web_clickstreams_spark_results]; SELECT TOP 10 * FROM [web_clickstreams_spark_results];
您也可以在 Spark 中查詢資料。 例如,以下程式碼會印出資料表中的記錄數:
def df_read(dbtable: String, url: String, dataPoolDataSource: String=""): DataFrame = { spark.read .format("com.microsoft.sqlserver.jdbc.spark") .option("url", url) .option("dbtable", dbtable) .option("user", user) .option("password", password) .option("dataPoolDataSource", dataPoolDataSource) .load() } val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name) println("Number of rows is " + new_df.count)
清除
使用下列命令,移除本教學課程所建立的資料庫物件。
DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];
後續步驟
了解如何在 Azure Data Studio 中執行範例筆記本: