Delta Lake テーブルを作成する

完了

Delta Lake はテーブル上に構築され、データ レイク内のファイルに対するリレーショナル ストレージの抽象化を提供します。

データフレームからの Delta Lake テーブルの作成

Delta Lake テーブルを作成する最も簡単な方法の 1 つは、データフレームを "デルタ" 形式で保存し、テーブルのデータ ファイルと関連するメタデータ情報を格納するパスを指定することです。

たとえば、次の PySpark コードは、既存のファイルからデータを含むデータフレームを読み込み、そのデータフレームを "デルタ" 形式で新しいフォルダーの場所に保存します。

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

デルタ テーブルを保存した後、指定したパスの場所には、データの Parquet ファイルと (データフレームに読み込んだソース ファイルの形式に関係なく)、テーブルのトランザクション ログを含む _delta_log フォルダーが含まれます。

Note

トランザクション ログには、テーブルに対するすべてのデータ変更が記録されます。 各変更をログに記録することで、トランザクションの一貫性を適用し、テーブルのバージョン管理情報を保持できます。

次に示すように、上書きモードを使うことで、既存の Delta Lake テーブルをデータフレームの内容で置き換えることができます。

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

追加モードを使うことで、データフレームから既存のテーブルに行を追加することもできます。

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

条件付き更新の実行

データフレームでデータを変更してから、それを上書きすることで Delta Lake テーブルを置き換えることができますが、データベースでのさらに一般的なパターンは、個別のトランザクション操作として既存のテーブルの行を挿入、更新、または削除することです。 Delta Lake テーブルに対してそのような変更を行うには、Delta Lake API の DeltaTable オブジェクトを使用できます。これは、"更新"、"削除"、"マージ" 操作をサポートしています。 たとえば、次のコードを使うと、category 列の値が "Accessories" であるすべての行の price 列を更新できます。

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

データの変更はトランザクション ログに記録され、必要に応じてテーブル フォルダーに新しい Parquet ファイルが作成されます。

ヒント

Delta Lake API の使用について詳しくは、Delta Lake API のドキュメントを参照してください。

前のバージョンのファイルのクエリ

Delta Lake テーブルでは、トランザクション ログを使ったバージョン管理がサポートされます。 トランザクション ログには、テーブルに対して行われた変更と、各トランザクションのタイムスタンプとバージョン番号が記録されます。 このログに記録されたバージョン データを使って、以前のバージョンのテーブルを表示できます。この機能は "タイム トラベル" と呼ばれます。

デルタ テーブルの場所からデータフレームにデータを読み取り、versionAsOf オプションとして必要なバージョンを指定することにより、Delta Lake テーブルの特定のバージョンからデータを取得できます。

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

または、timestampAsOf オプションを使ってタイムスタンプを指定することもできます。

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)