Azure Stream Analytics: Delta Lake テーブルへの書き込み
Delta Lake は、データ レイクに信頼性、品質とパフォーマンスをもたらすオープンな形式です。 Azure Stream Analytics を使用すると、コードを 1 行も記述することなく、ストリーミング データを Delta Lake テーブルに直接書き込むことができます。
Stream Analytics ジョブは、ネイティブの Delta Lake 出力コネクタを介して、Azure Data Lake Storage Gen2 アカウント内の Delta テーブル (新規または事前に作成されたもの) に書き込むよう構成できます。 このコネクタは、追加モードの Delta テーブルへの高速インジェスト用に最適化されています。 また、データが失われたり重複したりしないことを保証する、厳密に 1 回のセマンティクスも提供されます。 Azure Event Hubs から Delta テーブルにリアルタイム データ ストリームを取り込むことで、アドホックの対話型分析またはバッチ分析を実行できます。
Delta Lake の構成
Delta Lake にデータを書き込むには、Data Lake Storage Gen2 アカウントに接続する必要があります。 次の表に、Delta Lake の構成に関連するプロパティを示します。
プロパティ名 | 説明 |
---|---|
イベントのシリアル化の形式 | 出力データのシリアル化形式。 JSON、CSV、Avro、Parquet がサポートされています。 Delta Lake は、ここにオプションとして一覧表示されます。 Delta Lake が選択されている場合、データは Parquet 形式になります。 |
Delta Path の名前 | 指定されたコンテナー内の Delta Lake テーブルを書き込むために使用されるパス。 これにはテーブル名が含まれます。 詳細については、次のセクションを参照してください。 |
[パーティション列] | 省略可能。 出力データからパーティションへの {field} 名。 サポートされているパーティション列は 1 つだけです。 列の値は、string 型である必要があります。 |
Data Lake Storage Gen2 の構成の完全な一覧については、Azure Data Lake Storage Gen2 の概要に関するページを参照してください。
Delta Path の名前
Delta パス名は、Data Lake Storage Gen2 に格納されている Delta Lake テーブルの場所と名前を指定するために使用されます。
1 つ以上のパス セグメントを使用して、Delta テーブルへのパスと Delta テーブル名を定義できます。 パスのセグメントは、仮想ディレクトリの名前に対応した連続する区切り記号文字 (スラッシュ "/
" など) の間の文字列です。
このセグメント名は英数字であり、スペース、ハイフン、およびアンダースコアを含めることができます。 最後のパス セグメントがテーブル名として使用されます。
Delta パス名の制限事項は次のとおりです。
- フィールド名は大文字小文字が区別されません。 たとえば、このサービスでは列
ID
とid
を区別できません。 - 動的
{field}
名は使用できません。 たとえば、{ID}
はテキスト {ID} として扱われます。 - 名前を構成するパス セグメントの数が 254 個を超えないようにしてください。
例
Delta パス名の例:
- 例 1:
WestUS/CA/factory1/device-table
- 例 2:
Test/demo
- 例 3:
mytable
出力ファイル例:
- 選択したコンテナーの下のディレクトリ パスは
WestEurope/CA/factory1
で、Delta テーブル フォルダー名は device-table です。 - 選択したコンテナーの下のディレクトリ パスは
Test
で、Delta テーブル フォルダー名は demo です。 - 選択したコンテナーの下の Delta テーブル フォルダー名は mytable です。
新しいテーブルの作成
Delta パス名で指定された場所に同じ名前の Delta Lake テーブルがまだ存在していない場合、既定では、Stream Analytics によって新しい Delta テーブルが作成されます。 この新しいテーブルは以下の構成で作成されます。
- ライター バージョン 2 。
- リーダー バージョン 1。
- テーブルはアペンド専用。
- テーブル スキーマは、最初に検出されたレコードのスキーマを使用して作成されます。
テーブルに書き込む
Delta パス名で指定された場所に同じ名前の Delta Lake テーブルが既に存在する場合、既定では、Stream Analytics によって新しいレコードが既存のテーブルに書き込まれます。
厳密に 1 回の配信
トランザクション ログにより、Delta Lake は厳密に 1 回の処理を保証できます。 また、Stream Analytics では、1 回のジョブ実行中にデータを Azure Data Lake Storage Gen2 に出力するときに、厳密に 1 回の配信が行われます。
スキーマの適用
スキーマの適用とは、データ品質を確保するために、テーブルへの新しい書き込みはすべて、書き込み時にターゲット テーブルのスキーマと互換性があるように強制されることを意味します。
出力データのすべてのレコードは、既存のテーブルのスキーマに投影されます。 出力が新しい Delta テーブルに書き込まれる場合、テーブル スキーマは最初のレコードで作成されます。 受信データに既存のテーブル スキーマと比較して余分な列が 1 つある場合は、余分の列を含めずにテーブルに書き込まれます。 受信データに既存のテーブル スキーマと比較して列が 1 つない場合は、その列を null にしてテーブルに書き込まれます。
Delta テーブルのスキーマとストリーミング ジョブのレコードのスキーマの間に交差がない場合、それはスキーマ変換エラーの一例とみなされます。 スキーマ変換エラーとみなされるケースはこれだけではありません。
スキーマ変換が失敗する場合、ジョブの動作は、ジョブ レベルで構成された出力データ エラーの処理ポリシーに従います。
Delta ログ チェックポイント
Stream Analytics ジョブは、Delta ログ チェックポイントを V1 形式で定期的に作成します。 Delta ログ チェックポイントは Delta テーブルのスナップショットであり、通常は Stream Analytics ジョブによって生成されたデータ ファイルの名前が含まれます。 データ ファイルの数が多い場合は、チェックポイントが大きくなり、Stream Analytics ジョブでメモリの問題が発生する可能性があります。
制限事項
- 動的パーティション キー (Delta パスでのレコード スキーマの列名の指定) はサポートされていません。
- 複数のパーティション列はサポートされていません。 複数のパーティション列が必要な場合は、クエリで複合キーを使用し、それをパーティション列として指定することをお勧めします。
- 複合キーはクエリで作成できます。 たとえば
"SELECT concat (col1, col2) AS compositeColumn INTO [blobOutput] FROM [input]"
です。
- 複合キーはクエリで作成できます。 たとえば
- Delta Lake への書き込みは追加のみです。
- クエリ テストのスキーマ チェックは使用できません。
- Stream Analytics では、小さなファイルの圧縮は行われません。
- すべてのデータ ファイルは圧縮されずに作成されます。
- Date 型と Decimal 型はサポートされていません。
- ライター バージョン 7 以上のライター機能を持つ既存のテーブルへの書き込みは失敗します。
- 例: [削除ベクトル] を有効にした既存のテーブルへの書き込みは失敗します。
- ここでの例外は、changeDataFeed と appendOnly ライター エラーです。
- Stream Analytics ジョブでデータのバッチを Delta Lake に書き込むときに、複数のファイルの追加アクションを生成できます。 1 つのバッチに対して生成されるファイルの追加アクションが多すぎると、Stream Analytics ジョブが停止する可能性があります。
- 生成されるファイルの追加アクションの数は、多くの要因によって決まります。
- バッチのサイズ。 これは、データの量とバッチ処理パラメーターの [最小行数] と [最大時間] によって決まります。
- バッチの [パーティション列] の値のカーディナリティ。
- バッチに対して生成されるファイルの追加アクションの数を減らすには:
- バッチ処理構成の [最小行数] と [最大時間] を減らす。
- 入力データを調整するか、別のパーティション列を選択して、[パーティション列] の値のカーディナリティを下げる。
- 生成されるファイルの追加アクションの数は、多くの要因によって決まります。
- Stream Analytics ジョブでは、シングル パート V1 チェックポイントの読み取りと書き込みのみが可能です。 マルチパート チェックポイントとチェックポイント V2 形式はサポートされていません。