Delta Lake テーブルを作成する
Delta Lake はテーブル上に構築され、データ レイク内のファイルに対するリレーショナル ストレージの抽象化を提供します。
データフレームからの Delta Lake テーブルの作成
Delta Lake テーブルを作成する最も簡単な方法の 1 つは、データフレームを "デルタ" 形式で保存し、テーブルのデータ ファイルと関連するメタデータ情報を格納するパスを指定することです。
たとえば、次の PySpark コードは、既存のファイルからデータを含むデータフレームを読み込み、そのデータフレームを "デルタ" 形式で新しいフォルダーの場所に保存します。
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
デルタ テーブルを保存した後、指定したパスの場所には、データの Parquet ファイルと (データフレームに読み込んだソース ファイルの形式に関係なく)、テーブルのトランザクション ログを含む _delta_log フォルダーが含まれます。
Note
トランザクション ログには、テーブルに対するすべてのデータ変更が記録されます。 各変更をログに記録することで、トランザクションの一貫性を適用し、テーブルのバージョン管理情報を保持できます。
次に示すように、上書きモードを使うことで、既存の Delta Lake テーブルをデータフレームの内容で置き換えることができます。
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
追加モードを使うことで、データフレームから既存のテーブルに行を追加することもできます。
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
条件付き更新の実行
データフレームでデータを変更してから、それを上書きすることで Delta Lake テーブルを置き換えることができますが、データベースでのさらに一般的なパターンは、個別のトランザクション操作として既存のテーブルの行を挿入、更新、または削除することです。 Delta Lake テーブルに対してそのような変更を行うには、Delta Lake API の DeltaTable オブジェクトを使用できます。これは、"更新"、"削除"、"マージ" 操作をサポートしています。 たとえば、次のコードを使うと、category 列の値が "Accessories" であるすべての行の price 列を更新できます。
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
データの変更はトランザクション ログに記録され、必要に応じてテーブル フォルダーに新しい Parquet ファイルが作成されます。
ヒント
Delta Lake API の使用について詳しくは、Delta Lake API のドキュメントを参照してください。
前のバージョンのファイルのクエリ
Delta Lake テーブルでは、トランザクション ログを使ったバージョン管理がサポートされます。 トランザクション ログには、テーブルに対して行われた変更と、各トランザクションのタイムスタンプとバージョン番号が記録されます。 このログに記録されたバージョン データを使って、以前のバージョンのテーブルを表示できます。この機能は "タイム トラベル" と呼ばれます。
デルタ テーブルの場所からデータフレームにデータを読み取り、versionAsOf
オプションとして必要なバージョンを指定することにより、Delta Lake テーブルの特定のバージョンからデータを取得できます。
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
または、timestampAsOf
オプションを使ってタイムスタンプを指定することもできます。
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)