Microsoft Fabric Data Warehouse 用 Spark コネクタ
Spark 開発者とデータ科学者は、Fabric Data Warehouse 用 Spark コネクタを使用すると、ウェアハウスとレイクハウスの SQL 分析エンドポイントのデータにアクセスして操作することができます。 コネクタには次のような機能があります。
- ウェアハウスまたは SQL Analytics エンドポイントからのデータは、同じワークスペース内または複数のワークスペース間で操作できます。
- レイクハウスの SQL 分析エンドポイントは、ワークスペースのコンテキストに基づいて自動的に検出されます。
- コネクタは、簡略化された Spark API を提供し、基になる複雑さを抽象化し、1 つのコード行だけで動作します。
- テーブルまたはビューにアクセスしている間、コネクタは SQL エンジン レベルで定義されたセキュリティ モデルを維持します。 これらのモデルには、オブジェクト レベル セキュリティ (OLS)、行レベル セキュリティ (RLS)、列レベル セキュリティ (CLS) が含まれます。
- コネクタは Fabric ランタイム 内にプレインストールされるため、個別にインストールする必要がなくなります。
認証
Microsoft Entra 認証は、統合認証アプローチです。 ユーザーは Microsoft Fabric ワークスペースにサインインし、資格証明は認証と認可のために SQL エンジンに自動的に渡されます。 認証情報が自動マッピングされるため、ユーザーが特定の構成オプションを指定する必要はありません。
権限
SQL エンジンに接続するには、Warehouseまたは SQL 分析エンドポイント (項目レベル) に対する少なくとも読み取りアクセス許可 (SQL Server の 接続 アクセス許可と同様) が必要です。 また、ユーザーは、特定のテーブルまたはビューからデータを読み取るために、詳細なオブジェクト レベルのアクセス許可も必要です。 詳細については、「Microsoft Fabric のデータ ウェアハウスのセキュリティ」を参照してください。
Code テンプレートと例
メソッド シグネチャを使用する
次のコマンドは、読み取り要求の synapsesql
メソッド シグネチャを示しています。 3 部構成 の tableName
引数は、レイクハウスの Warehouse と SQL 分析エンドポイントからテーブルまたはビューにアクセスするために必要です。 シナリオに基づいて、引数を次の名前で更新します。
- パート 1: 倉庫またはレイクハウスの名前。
- パート 2: スキーマの名前。
- パート 3: テーブルまたはビューの名前。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame
このコネクタでは、テーブルまたはビューから直接読み取るだけでなく、カスタム クエリまたはパススルー クエリを指定することもできます。このクエリは SQL エンジンに渡され、結果は Spark に返されます。
spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame
このコネクタは、指定されたウェアハウス/レイクハウスのエンドポイントを自動検出しますが、明示的に指定したい場合は、そのようにできます。
//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>")
同じワークスペース内のデータを読み取る
重要
ノートブックの先頭またはコネクタの使用を開始する前に、これらの インポート ステートメントを実行します。
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
次のコードは、Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
次のコードは、行数制限が 10 の Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
次のコードは、フィルターを適用した後に Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")
次のコードは、選択した列についてのみ Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")
ワークスペース間でデータを読み取る
複数のワークスペースにまたがる 1 つのウェアハウスまたはレイクハウスのデータにアクセスして読み取るには、ウェアハウスまたはレイクハウスが存在するワークスペース ID を指定してから、レイクハウスまたはウェアハウスの項目 ID を指定します。 次の行は、ワークスペース ID とレイクハウスまたはウェアハウス ID を指定して、ウェアハウスまたはレイクハウスから Spark データフレームのテーブルまたはビューのデータを読み取る例を示しています。
# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
注
ノートブックを実行している場合、コネクタは既定で、ノートブックに接続されているレイクハウスのワークスペース内の指定された倉庫またはレイクハウスを検索します。 別のワークスペースのウェアハウスまたはレイクハウスを参照するには、上記のようにワークスペース ID とレイクハウスまたはウェアハウスの項目 ID を指定します。
Warehouse からのデータに基づいてレイクハウス テーブルを作成する
これらのコード行は、Spark データフレームのテーブルまたはビューからデータを読み取り、それを使用してレイクハウス テーブルを作成する例を示しています。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")
Spark データフレーム データをウェアハウス テーブルに書き込む
このコネクタでは、Fabric DW テーブルへの 2 フェーズ書き込みプロセスが採用されています。 最初は、Spark データフレーム データを中間ストレージにステージングし、次に COPY INTO
コマンドを使用して Fabric DW テーブルにデータを取り込みます。 この方法により、データ 量が増えるにつれてスケーラビリティが確保されます。
サポートされている DataFrame の保存モード
データフレームのソース データをウェアハウス内の宛先テーブルに書き込む場合、次の保存モードがサポートされます。
- ErrorIfExists (既定の保存モード): 宛先テーブルが存在する場合、書き込みは中止され、呼び出し先に例外が返されます。 それ以外の場合は、データを含む新しいテーブルが作成されます。
- 無視: 宛先テーブルが存在する場合、書き込みではエラーを返さずに書き込み要求が無視されます。 それ以外の場合は、データを含む新しいテーブルが作成されます。
- 上書き: コピー先テーブルが存在する場合、変換先の既存のデータはデータに置き換えられます。 それ以外の場合は、データを含む新しいテーブルが作成されます。
- 追加: 変換先テーブルが存在する場合は、新しいデータが追加されます。 それ以外の場合は、データを含む新しいテーブルが作成されます。
次のコードは、Spark データフレームのデータを Fabric DW テーブルに書き込む例を示しています。
df.write.synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>") # this uses default mode - errorifexists
df.write.mode("errorifexists").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("ignore").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("append").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("overwrite").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
Note
コネクタでは、Lakehouse の SQL 分析エンドポイントが読み取り専用であるため、Fabric DW テーブルへの書き込みのみがサポートされます。
トラブルシューティング
完了すると、読み取り応答スニペットがセルの出力に表示されます。 現在のセルで失敗が発生すると、後続のノートブックの CELL の実行も取り消されます。 詳細なエラー情報は、Spark アプリケーションのログで確認できます。
このコネクタの使用に関する考慮事項
現時点では、コネクタは次の通りです。
- Lakehouse アイテムの Fabric ウェアハウスと SQL 分析エンドポイントからのデータ取得または読み取りをサポートします。
- 異なる保存モードを使用してウェアハウス テーブルへのデータの書き込みをサポートします。これは、最新の GA ランタイム (ランタイム 1.3) でのみ使用できます。 また、
Private Link
が有効になっていて、Public Access
がブロックされている場合、現在の書き込み操作は機能しません。 - Fabric DW では
Time Travel
がサポートされるようになりましたが、このコネクタは時間移動構文を持つクエリでは機能しません。 - 一貫性を保つため、Azure Synapse Analytics の Synapse Spark に付属しているような使用署名を保持します。 ただし、Azure Synapse Analytics で Dedicated SQL プールに接続して操作することは後方互換性がありません。
- 特殊文字を含む列名は、3 部構成のテーブル/ビュー名に基づいてクエリが送信される前にエスケープ文字を追加することによって処理されます。 カスタムまたはパススルー クエリ ベースの読み取りの場合、ユーザーは特殊文字を含む列名をエスケープする必要があります。