次の方法で共有


Delta Live Tables の Python 言語リファレンス

この記事では、Delta Live Tables の Python プログラミング インターフェイスについて詳しく説明します。

SQL API の詳細については、「Delta Live Tables の SQL 言語リファレンス」を参照してください。

自動ローダーの構成に固有の詳細については、自動ローダーに関する記事を参照してください。

始める前に

Delta Live Tables の Python インターフェイスを使用してパイプラインを実装する場合の重要な考慮事項を次に示します。

  • Python の table() および view() 関数は、パイプライン更新の計画と実行中に複数回呼び出されるため、副作用が生じる可能性のあるコード (たとえば、データを変更するコードや電子メールを送信するコード) をこれらの関数のどれかに含めないでください。 予期しない動作を回避するために、データセットを定義する Python 関数には、テーブルまたはビューを定義するために必要なコードのみを含める必要があります。
  • 電子メールの送信や外部監視サービスとの統合などの操作を (特にデータセットを定義する関数内で) 実行するには、イベント フックを使用します。 データセットを定義する関数にこれらの操作を実装すると、予期しない動作が発生します。
  • Python tableview の関数は DataFrame を返す必要があります。 DataFrames で動作する一部の関数は DataFrame を返さないので、使用しないでください。 これらの操作には、collect()count()toPandas()save()saveAsTable() などの関数が含まれます。 DataFrame 変換は完全なデータフロー グラフが解決された "後" に実行されるため、このような操作を使用すると意図しない副作用が発生する可能性があります。

dlt Python モジュールをインポートする

Delta Live Tables の Python 関数は、dlt モジュールで定義されています。 Python API で実装されたパイプラインは、このモジュールをインポートする必要があります。

import dlt

Delta Live Tables の具体化されたビューまたはストリーミング テーブルを作成する

Python では、Delta Live Tables が定義クエリに基づいて、データセットを具体化されたビューとして更新するか、ストリーミング テーブルとして更新するかを決定します。 @table デコレーターは、具体化されたビューとストリーミング テーブルの両方を定義するのに使用できます。

具体化されたビューを Python で定義するには、データ ソースに対して静的読み取りを実行するクエリに @table を適用します。 ストリーミング テーブルを定義するには、データ ソースに対してストリーミング読み取りを実行するクエリに @table を適用するか、create_streaming_table() 関数を使用します。 次に示すように、どちらのデータセット型も構文仕様は同じです。

Note

cluster_by 引数を使用してリキッド クラスタリングを有効にするには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Delta Live Tables ビューを作成する

Python でビューを定義するには、@view デコレーターを適用します。 @table デコレーターと同様、Delta Live Tables では、ビューを静的データセットまたはストリーミング データセットのいずれかに使用できます。 Python でビューを定義するための構文を次に示します。

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

例: テーブルとビューを定義する

Python でビューまたはテーブルを定義するには、@dlt.view または @dlt.table のデコレーターを関数に適用します。 関数名または name パラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。 次の例では、2 つの異なるデータセットを定義します。JSON ファイルを入力ソースとして受け取る taxi_raw というビューと、taxi_raw ビューを入力として受け取る filtered_data というテーブルです。

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

例: 同じパイプラインで定義されているデータセットにアクセスする

外部データ ソースからの読み取りに加えて、Delta Live Tables の read() 関数を使用して、同じパイプラインで定義されているデータセットにアクセスできます。 次の例は、read() 関数を使用して customers_filtered データセットを作成する方法を示しています。

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

spark.table() 関数を使用して、同じパイプラインで定義されているデータセットにアクセスすることもできます。 spark.table() 関数を使用して、パイプラインで定義されているデータセットにアクセスする場合は、関数の引数で、データセット名の前に LIVE キーワードを追加します。

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

例: メタストアに登録されているテーブルから読み取る

Hive メタストアに登録されているテーブルからデータを読み取るには、関数の引数で、LIVE キーワードを省略し、必要に応じてテーブル名をデータベース名で修飾します。

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Unity Catalog テーブルからの読み取りの例については、「Unity Catalog パイプラインにデータを取り込む」を参照してください。

例: spark.sql を使用してデータセットにアクセスする

クエリ関数で spark.sql 式を使用してデータセットを返すこともできます。 内部データセットから読み取るには、データセット名の前に LIVE. を追加します。

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

ストリーミング操作のターゲットとして使用するテーブルを作成する

create_streaming_table() 関数を使用して、apply_changes()apply_changes_from_snapshot()@append_flow 出力レコードなど、ストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。

Note

create_target_table() 関数と create_streaming_live_table() 関数は非推奨です。 Databricks では、create_streaming_table() 関数を使用するように既存のコードを更新することをお勧めします。

Note

cluster_by 引数を使用してリキッド クラスタリングを有効にするには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
引数
name

型: str

テーブル名。

このパラメーターは必須です。
comment

型: str

テーブルの省略可能な説明。
spark_conf

型: dict

このクエリの実行の Spark 構成の省略可能なリスト。
table_properties

型: dict

テーブルのテーブル プロパティの省略可能なリスト。
partition_cols

型: array

テーブルのパーティション分割に使用する 1 つ以上の列の省略可能なリスト。
cluster_by

型: array

必要に応じて、テーブルでリキッド クラスタリングを有効にし、クラスタリング キーとして使用する列を定義してください。

「Delta テーブルに Liquid Clustering クラスタリングを使用する」を参照してください。
path

型: str

テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。
schema

型: str または StructType

テーブルの省略可能なスキーマ定義。 スキーマは、SQL DDL 文字列としてまたは Python を使用して定義できます
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

型: dict

テーブルのオプションのデータ品質制約。 「複数の期待値」を参照してください。
row_filter (パブリック プレビュー)

型: str

省略可能なテーブルの行フィルター句。 「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。

テーブルが具体化される方法を制御する

テーブルには、その具体化の追加のコントロールも用意されています。

Note

サイズが 1 TB 未満のテーブルの場合、Databricks では、Delta Live Tables でデータ編成を制御できるようにすることをお勧めします。 テーブルのサイズが 1 テラバイトを超えて拡大することが予想される場合を除き、パーティション列を指定しないでください。

例: スキーマ列とパーティション列を指定する

必要に応じて、Python StructType または SQL DDL 文字列を使用して、テーブル スキーマを指定できます。 DDL 文字列で指定する場合、定義には生成された列を含めることができます。

次の例では、Python StructType を使用して指定されたスキーマを使用して、 sales という名前のテーブルを作成します:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

次の例では、DDL 文字列を使用してテーブルのスキーマを指定し、生成された列を定義して、パーティション列を定義します。

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

既定では、スキーマが指定されなかった場合、Delta Live Tables は、table 定義からスキーマを推論します。

ソース ストリーミング テーブルの変更を無視するようにストリーミング テーブルを構成する

Note

  • skipChangeCommits フラグは、option() 関数を使用する spark.readStream でのみ動作します。 このフラグは、dlt.read_stream() 関数で使用することはできません。
  • ソース ストリーミング テーブルが apply_changes() 関数のターゲットとして定義されている場合、skipChangeCommits フラグを使用することはできません。

既定では、ストリーミング テーブルには追加のみのソースが必要です。 あるストリーミング テーブルで、別のストリーミング テーブルがソースとして使用されており、そのソース ストリーミング テーブルに更新または削除が必要な場合、たとえば、GDPR の "忘れられる権利" 処理では、それらの変更を無視するために、ソース ストリーミング テーブルの読み込み時に skipChangeCommits フラグを設定することができます。 このフラグの詳細については、「更新と削除を無視する」を参照してください。

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

例: テーブル制約を定義する

重要

テーブル制約はパブリック プレビュー段階です。

スキーマを指定する際には、主キーと外部キーを定義できます。 制約は情報提供のみであり、強制されるものではありません。 SQL 言語リファレンスの CONSTRAINT 句を参照してください。

次の例では、主キーと外部キーの制約を使用してテーブルを定義します。

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

例: 行フィルターと列マスクの定義

重要

行フィルターと列マスクは、 パブリック プレビュー段階にあります。

行フィルターと列マスクを持つ具体化されたビューまたはストリーミング テーブルを作成するには、ROW FILTER 句MASK 句を使用します。 次の例は、行フィルターと列マスクの両方を持つ具体化されたビューとストリーミング テーブルを定義する方法を示しています。

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

行フィルターと列マスクの詳細については、「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。

Python Delta Live Tables のプロパティ

次の表では、Delta Live Tables を使用してテーブルとビューを定義するときに指定できるオプションとプロパティについて説明します。

Note

cluster_by 引数を使用してリキッド クラスタリングを有効にするには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。

@table または @view
name

型: str

テーブルまたはビューの省略可能な名前。 定義されていない場合は、関数名がテーブルまたはビューの名前として使用されます。
comment

型: str

テーブルの省略可能な説明。
spark_conf

型: dict

このクエリの実行の Spark 構成の省略可能なリスト。
table_properties

型: dict

テーブルのテーブル プロパティの省略可能なリスト。
path

型: str

テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。
partition_cols

型: a collection of str

テーブルのパーティション分割に使用する 1 つ以上の列の省略可能なコレクション (list など)。
cluster_by

型: array

必要に応じて、テーブルでリキッド クラスタリングを有効にし、クラスタリング キーとして使用する列を定義してください。

「Delta テーブルに Liquid Clustering クラスタリングを使用する」を参照してください。
schema

型: str または StructType

テーブルの省略可能なスキーマ定義。 スキーマは、SQL DDL 文字列としてまたは Python StructType を使用して定義できます。
temporary

型: bool

テーブルは作成しますが、テーブルのメタデータは公開しません。 temporary キーワードは、パイプラインでは使用できるが、パイプラインの外部ではアクセスできないテーブルを作成するように Delta Live Tables に指示します。 処理時間を短縮するために、一時テーブルは、単一の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。

既定値は "False" です。
row_filter (パブリック プレビュー)

型: str

省略可能なテーブルの行フィルター句。 「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。
テーブルまたはビューの定義
def <function-name>()

データセットを定義する Python 関数。 name パラメーターが設定されていない場合は、<function-name> がターゲット データセット名として使用されます。
query

Spark Dataset または Koalas DataFrame を返す Spark SQL ステートメント。

dlt.read() または spark.table() を使用して、同じパイプラインで定義されているデータセットからの完全な読み取りを実行します。 spark.table() 関数を使用して、同じパイプラインで定義されているデータセットから読み取る場合は、関数の引数でデータセット名の前に LIVE キーワードを追加します。 たとえば、customers という名前のデータセットから読み取るには、次のようにします。

spark.table("LIVE.customers")

また、LIVE キーワードを省略し、必要に応じてテーブル名をデータベース名で修飾することにより、spark.table() 関数を使用して、メタストアに登録されているテーブルから読み取ることもできます。

spark.table("sales.customers")

dlt.read_stream() を使用して、同じパイプラインで定義されているデータセットからのストリーミング読み取りを実行します。

spark.sql 関数を使用して、返されるデータセットを作成する SQL クエリを定義します。

PySpark 構文を使用して、Python で Delta Live Tables クエリを定義します。
期待される回答
@expect("description", "constraint")

次によって識別されるデータ品質の制約を宣言します。
description. 行が期待値に違反する場合は、ターゲット データセットに行を含めます。
@expect_or_drop("description", "constraint")

次によって識別されるデータ品質の制約を宣言します。
description. 行が期待値に違反する場合は、ターゲット データセットから行を削除します。
@expect_or_fail("description", "constraint")

次によって識別されるデータ品質の制約を宣言します。
description. 行が期待値に違反する場合は、すぐに実行を停止します。
@expect_all(expectations)

1 つ以上のデータ品質の制約を宣言します。
expectations は Python ディクショナリです。キーは期待値の説明であり、値は期待値の制約です。 行がいずれかの期待値に違反する場合は、ターゲット データセットに行を含めます。
@expect_all_or_drop(expectations)

1 つ以上のデータ品質の制約を宣言します。
expectations は Python ディクショナリです。キーは期待値の説明であり、値は期待値の制約です。 行がいずれかの期待値に違反する場合は、ターゲット データセットから行を削除します。
@expect_all_or_fail(expectations)

1 つ以上のデータ品質の制約を宣言します。
expectations は Python ディクショナリです。キーは期待値の説明であり、値は期待値の制約です。 行がいずれかの期待値に違反する場合は、すぐに実行を停止します。

Delta Live Tables での Python を使用した変更フィードからの変更データ キャプチャ

変更データ フィード (CDF) からソース データを処理するには、Python API で apply_changes() 関数を使用して、Delta Live Tables の変更データ キャプチャ (CDC) 機能を使用します。

重要

変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 apply_changes() ターゲット テーブルのスキーマを指定する場合は、sequence_by フィールドと同じデータ型で __START_AT および __END_AT 列を含める必要があります。

必要なターゲット テーブルを作成するには、Delta Live Tables Python インターフェイスで create_streaming_table() 関数を使用できます。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Note

APPLY CHANGES を処理する場合、INSERT イベントと UPDATE イベントの既定の動作では、ソースから CDC イベントを "アップサート" します。つまり、指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETE イベントの処理は、APPLY AS DELETE WHEN 条件で指定できます。

変更フィードを使用した CDC 処理の詳細については、「APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化」を参照してください。 apply_changes() 関数の使用例については、「例: CDF ソース データを使用した SCD タイプ 1 と SCD タイプ 2 の処理」を参照してください。

重要

変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 apply_changes ターゲット テーブルのスキーマを指定する場合は、sequence_by フィールドと同じデータ型で __START_AT および __END_AT 列を含める必要があります。

APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化」を参照してください。

引数
target

型: str

更新するテーブルの名前。 create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。

このパラメーターは必須です。
source

型: str

CDC レコードを含むデータ ソース。

このパラメーターは必須です。
keys

型: list

ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。

次のいずれかを指定できます。

- 文字列のリスト: ["userId", "orderId"]
- Spark SQL col() 関数の一覧: [col("userId"), col("orderId"]

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは必須です。
sequence_by

型: str または col()

ソース データ内の CDC イベントの論理順序を指定する列名。 Delta Live Tables では、このシーケンス処理を使用して、順不同で到着する変更イベントを処理します。

次のいずれかを指定できます。

- 文字列: "sequenceNum"
- Spark SQL col() 関数: col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

指定された列は、並べ替え可能なデータ型である必要があります。

このパラメーターは必須です。
ignore_null_updates

型: bool

ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、ignore_null_updatesTrue の場合、null を持つ列はターゲット内の既存の値を保持します。 これは、null の値を持つ入れ子になった列にも適用されます。 ignore_null_updatesFalse の場合、既存の値は null 値で上書きされます。

このパラメーターは省略可能です。

既定値は、False です。
apply_as_deletes

型: str または expr()

CDC イベントをアップサートでなく DELETE として扱う必要がある場合に指定します。 順序の誤ったデータを処理するために、削除された行は基になる Delta テーブルの廃棄標識として一時的に保持され、これらの廃棄標識をフィルターで除外するビューがメタストアに作成されます。 データ保持間隔は次を使用して構成できます。
pipelines.cdc.tombstoneGCThresholdInSeconds table プロパティ。

次のいずれかを指定できます。

- 文字列: "Operation = 'DELETE'"
- Spark SQL expr() 関数: expr("Operation = 'DELETE'")

このパラメーターは省略可能です。
apply_as_truncates

型: str または expr()

CDC イベントを完全なテーブル TRUNCATE として扱う必要がある場合に指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースでのみ使用する必要があります。

この apply_as_truncates パラメーターは、SCD タイプ 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。

次のいずれかを指定できます。

- 文字列: "Operation = 'TRUNCATE'"
- Spark SQL expr() 関数: expr("Operation = 'TRUNCATE'")

このパラメーターは省略可能です。
column_list

except_column_list

型: list

ターゲット テーブルに含める列のサブセット。 column_list を使用して、含める列の完全な一覧を指定します。 except_column_list を使用して、除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは省略可能です。

既定では、column_list 引数または except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。
stored_as_scd_type

型: str または int

レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。

SCD タイプ 1 の場合は 1 に、SCD タイプ 2 の場合は 2 に設定します。

この句は省略可能です。

既定値は SCD タイプ 1 です。
track_history_column_list

track_history_except_column_list

型: list

ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。 用途
track_history_except_column_list を使用して、追跡から除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは省略可能です。

既定では、track_history_column_listがないか以下の場合、ターゲット テーブルのすべての列が含まれます
track_history_except_column_list引数が関数に渡されます。

Delta Live Tables での Python を使用した変更フィードからの変更データ キャプチャ

重要

APPLY CHANGES FROM SNAPSHOT API はパブリック プレビュー段階です。

データベース スナップショットからソース データを処理するには、Python API で apply_changes_from_snapshot() 関数を使用して、Delta Live Tables の変更データ キャプチャ (CDC) 機能を使用します。

重要

変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 apply_changes_from_snapshot() ターゲット テーブルのスキーマを指定する場合は、sequence_by フィールドと同じデータ型で __START_AT および __END_AT 列も含める必要があります。

必要なターゲット テーブルを作成するには、Delta Live Tables Python インターフェイスで create_streaming_table() 関数を使用できます。

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Note

APPLY CHANGES FROM SNAPSHOT を処理する場合、既定の動作では、ターゲットに同じキーを持つ一致するレコードが存在しないときに、新しい行を挿入します。 一致するレコードが存在する場合は、行の値のいずれかが変更された場合にのみ更新されます。 ターゲットに存在するが、ソースには存在しなくなったキーを持つ行は削除されます。

スナップショットを使用した CDC 処理の詳細については、「APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化」を参照してください。 apply_changes_from_snapshot() 関数の使用例については、定期的なスナップショット インジェスト履歴スナップショット インジェストの例を参照してください。

引数
target

型: str

更新するテーブルの名前。 create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。

このパラメーターは必須です。
source

型: str または lambda function

定期的にスナップショットを作成するテーブルまたはビューの名前、または処理されるスナップショット DataFrame とスナップショット バージョンを返す Python ラムダ関数。 「source 引数を実装する」を参照してください。

このパラメーターは必須です。
keys

型: list

ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。

次のいずれかを指定できます。

- 文字列のリスト: ["userId", "orderId"]
- Spark SQL col() 関数の一覧: [col("userId"), col("orderId"]

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは必須です。
stored_as_scd_type

型: str または int

レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。

SCD タイプ 1 の場合は 1 に、SCD タイプ 2 の場合は 2 に設定します。

この句は省略可能です。

既定値は SCD タイプ 1 です。
track_history_column_list

track_history_except_column_list

型: list

ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。 用途
track_history_except_column_list を使用して、追跡から除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは省略可能です。

既定では、track_history_column_listがないか以下の場合、ターゲット テーブルのすべての列が含まれます
track_history_except_column_list引数が関数に渡されます。

source 引数を実装する

apply_changes_from_snapshot() 関数には source 引数が含まれます。 履歴スナップショットを処理する場合、source 引数は、処理されるスナップショット データが含まれる Python DataFrame と、スナップショット バージョンの 2 つの値を apply_changes_from_snapshot() 関数に返す、Python ラムダ関数であることが期待されます。

次に、ラムダ関数のシグネチャを示します。

lambda Any => Optional[(DataFrame, Any)]
  • ラムダ関数に対する引数は、直近で処理されたスナップショット バージョンです。
  • ラムダ関数の戻り値は、None または 2 つの値のタプルです。タプルの最初の値は、処理されるスナップショットが含まれる DataFrame です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。

ラムダ関数を実装して呼び出す例:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Delta Live Tables ランタイムでは、apply_changes_from_snapshot() 関数が含まれるパイプラインがトリガーされるたびに、次のステップを実行します。

  1. next_snapshot_and_version 関数を実行して、次のスナップショット DataFrame と対応するスナップショット バージョンを読み込みます。
  2. DataFrame が返されない場合、その実行は終了し、パイプラインの更新は完了としてマークされます。
  3. 新しいスナップショットの変更を検出し、それらをターゲット テーブルに増分的に適用します。
  4. ステップ #1に戻り、次のスナップショットとそのバージョンを読み込みます。

制限事項

Delta Live Tables の Python インターフェイスには、次の制限があります。

pivot() 関数はサポートされません。 Spark での pivot 操作では、出力のスキーマを計算するために、入力データを積極的に読み込む必要があります。 この機能は、Delta Live Tables ではサポートされていません。