次の方法で共有


Google Pub/Sub にサブスクライブする

Azure Databricks には、Databricks Runtime 13.3 LTS 以降で Google Pub/Sub にサブスクライブするための組み込みコネクタが用意されています。 このコネクタは、サブスクライバーからのレコードに対して 1 回だけ処理セマンティクスを提供します。

Note

Pub/Sub は重複するレコードを発行する可能性があり、レコードがサブスクライバーに順不同で到着する可能性があります。 重複するレコードと順序が整ったレコードを処理するには、Azure Databricks コードを記述する必要があります。

構文の例

次のコード例は、Pub/Sub から読み取られた構造化ストリーミングを構成するための基本的な構文を示しています:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

その他の構成オプションについては、「Pub/Sub ストリーミング読み取りのオプションを構成する」を参照してください。

Pub/Sub へのアクセスを構成する

Databricks では、承認オプションを提供するときにシークレットを使用することをお勧めします。 接続を承認するには、次のオプションが必要です:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

次のテーブルでは、構成された資格情報に必要なロールについて説明します:

ロール 必須または省略可能 使用方法
roles/pubsub.viewer または roles/viewer 必須 サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得する
roles/pubsub.subscriber 必須 サブスクリプションからデータをフェッチする
roles/pubsub.editor または roles/editor 省略可能 サブスクリプションが存在しない場合にサブスクリプションを作成し、deleteSubscriptionOnStreamStop を使用してストリーム終了時にサブスクリプションを削除できるようにします

Pub/Sub スキーマ

ストリームのスキーマは、次のテーブルに示すように、Pub/Sub からフェッチされたレコードと一致します:

フィールド Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Pub/Sub ストリーミング読み取りのオプションを構成する

次のテーブルで、Pub/Sub でサポートされているその他のオプションについて説明します。 すべてのオプションは、.option("<optionName>", "<optionValue>") 構文を使用して構造化ストリーミング読み取りの一部として構成されます。

Note

一部の Pub/Sub 構成オプションでは、マイクロバッチ ではなく、フェッチ 概念を使用します。 これは内部実装の詳細を反映し、オプションは他の構造化ストリーミング コネクタのコロールと同様に機能しますが、レコードがフェッチされて処理される点が異なります。

オプション 既定値 説明
numFetchPartitions ストリーム初期化時に存在する Executor の数の半分に設定します。 サブスクリプションからレコードをフェッチする並列 Spark タスクの数。
deleteSubscriptionOnStreamStop false true の場合、ストリームに渡されたサブスクリプションは、ストリーミング ジョブの終了時に削除されます。
maxBytesPerTrigger なし トリガーされる各マイクロバッチの間に処理されるバッチ サイズのソフト制限。
maxRecordsPerFetch 1000 レコードを処理する前にタスクごとにフェッチするレコードの数。
maxFetchPeriod 10 秒 レコードを処理する前に取得する各タスクの時間。 Databricks では、既定値を使用することが推奨されています。

Pub/Sub の増分バッチ処理セマンティクス

Trigger.AvailableNow を使用して、Pub/Sub ソースから増分バッチから使用可能なレコードを使用できます。

Azure Databricks では、Trigger.AvailableNow 設定で読み取りを開始したときにタイムスタンプが記録されます。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプより小さいタイムスタンプを持つ新しく発行されたレコードが含まれます。

増分バッチ処理の構成」を参照してください。

ストリーミング メトリックの監視

構造化ストリーミングの進行状況メトリックは、フェッチされて処理できるレコードの数、フェッチされて処理できる状態のレコードのサイズ、およびストリームの開始後に表示された重複の数を報告します。 これらのメトリックの例を次に示します:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

制限事項

Pub/Sub では、投機的実行 (spark.speculation) はサポートされていません。