Delta Live Tables を使用してデータを読み込む
Delta Live Tablesを使用して、Azure Databricks 上の Apache Spark でサポートされている任意のデータ ソースからデータを読み込むことができます。 Spark DataFrame を返すクエリ (ストリーミング DataFrames や Spark DataFrames 用 Pandas など) に対して、Delta Live Tables でデータセット (tables と views) を定義できます。 データ インジェスト タスクの場合、Databricks では、ほとんどのユース ケースでストリーミング tables を使用することをお勧めします。 ストリーミング tables は、自動ローダーを使用してクラウド オブジェクト ストレージから、または Kafka などのメッセージ バスからデータを取り込む場合に適しています。 次の例は、いくつかの一般的なパターンを示しています。
重要
すべてのデータ ソースで SQL がサポートされているわけではありません。 Delta Live Tables パイプラインで SQL ノートブックと Python ノートブックを混在させ、インジェスト以外のすべての操作に SQL を使用できます。
既定で Delta Live Tables にパッケージ化されていないライブラリの操作の詳細については、「Delta Live Tables パイプラインの Python 依存関係を管理する」を参照してください。
クラウド オブジェクト ストレージからファイルを読み込む
Databricks では、クラウド オブジェクト ストレージからのほとんどのデータ インジェスト タスクに対して、Delta Live Tables で自動ローダーを使用することをお勧めします。 自動ローダーと Delta Live Tables は、クラウド ストレージに到着した増え続けるデータを、段階的かつ一貫してシステムに読み込むよう設計されています。 次の例では、自動ローダーを使用して CSV と JSON ファイルからデータセットを作成します。
Note
Unity
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
「自動ローダーとは」および「自動ローダー SQL 構文」を参照してください。
警告
ファイル通知とともに自動ローダーを使用してパイプラインまたはストリーミング table用の完全な refresh を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用してクリーンアップを実行できます。
メッセージ バスからデータを読み込む
ストリーミング tablesを使用してメッセージ バスからデータを取り込む Delta Live Tables パイプラインを構成できます。 Databricks では、ストリーミング tables を継続的な実行と拡張された自動スケールと組み合わせて、メッセージ バスからの待機時間の短い読み込みに最も効率的なインジェストを提供することをお勧めします。 自動スケールが強化された Delta Live
たとえば、次のコードは、Kafka からデータを取り込むためのストリーミング table を構成します。
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
次の例のように、純粋 SQL でダウンストリーム操作を記述して、このデータに対してストリーミング変換を実行できます:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
Event Hubs の使用例については、「Azure Event Hubs を Delta Live Tables データ ソースとして使用する」を参照してください。
「ストリーミング データ ソースを構成する」を参照してください。
外部システムからデータを読み込む
Delta Live Tables では、Azure Databricks でサポートされている任意のデータ ソースからのデータの読み込みがサポートされています。 「データ ソースに接続する」を参照してください。 サポートされているデータ ソースに対して Lakehouse フェデレーションを使用して外部データを読み込むこともできます。 Lakehouse フェデレーションには Databricks Runtime 13.3 LTS 以降が必要であるため、Lakehouse フェデレーションを使用するには、プレビュー チャネルを使用するようにパイプラインを構成する必要があります。
一部のデータ ソースでは、SQL で同等のサポートが提供されていません。 これらのデータ ソースのいずれかで Lakehouse Federation を使用できない場合は、Python ノートブックを使用してソースからデータを取り込むことができます。 Python と SQL のソース コードを同じ Delta Live Tables パイプラインに追加できます。 次の例では、リモート PostgreSQL table内のデータの現在の状態にアクセスするための具体化されたビューを宣言します。
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
クラウド オブジェクト ストレージから小さいデータセットまたは静的データセットを読み込む
Apache Spark の読み込み構文を使用して、小規模または静的なデータセットを読み込むことができます。 Delta Live Tables では、Azure Databricks 上の Apache Spark でサポートされているすべてのファイル形式がサポートされます。 完全な listについては、「データ形式のオプション を参照してください。
次の例では、JSON を読み込んで Delta Live Tablestablesを作成する方法を示します。
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Note
SELECT * FROM format.`path`;
SQL コンストラクトは、Azure Databricks 上のすべての SQL 環境に共通です。 これは、Delta Live Tablesで SQL を使用して直接ファイル にアクセスするための推奨パターンです。
パイプライン内のシークレットを使用してストレージ credentials に安全にアクセスする
Azure Databricks シークレット を使用して、アクセス キーやパスワードなどの credentials を格納できます。 パイプラインでシークレットを構成するには、パイプライン設定クラスター構成で Spark プロパティを使用します。 「Delta Live Tables パイプラインのコンピューティングを構成する」を参照してください。
次の例では、自動ローダーを使用して Azure Data Lake Storage Gen2 (ADLS Gen2) ストレージ アカウントから入力データを読み取るために必要なアクセス キーを格納するために、シークレットを使用します。 この同じ方法を使用して、S3 にアクセスするための AWS キーや Apache Hive メタストアへのパスワードなど、パイプラインで必要などのシークレットでも構成できます。
Azure Data Lake Storage Gen2 の操作の詳細については、「Azure Data Lake Storage Gen2 と Blob Storage に接続する」を参照してください。
Note
シークレット値を設定する spark.hadoop.
構成キーに、spark_conf
プレフィックスを追加する必要があります。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
元
<storage-account-name>
は、ADLS Gen2 ストレージ アカウントの名前に置き換えます。<scope-name>
は、Azure Databricks シークレット スコープの名前に置き換えます。<secret-name>
は、Azure ストレージ アカウントのアクセス キーが含まれるキーの名前に置き換えます。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
元
<container-name>
は、入力データを格納する Azure ストレージ アカウント コンテナーの名前に置き換えます。<storage-account-name>
は、ADLS Gen2 ストレージ アカウントの名前に置き換えます。<path-to-input-dataset>
は、入力データセットへのパスに置き換えます。
Azure Event Hubs からデータを読み込む
Azure Event Hubs は、Apache Kafka 互換インターフェイスを提供するデータ ストリーミング サービスです。 Delta Live Tables ランタイムに含まれている Structured Streaming Kafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、「Delta Live Tables データ ソースとして Azure Event Hubsを使用する」を参照してください。