次の方法で共有


Azure Databricks での構造化ストリーミング パターン

これには、Azure Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミングの概要

構造化ストリーミングを初めて使用する場合は、「最初の構造化ストリーミング ワークロードを実行する」を参照してください。

Python での構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散型で低遅延の、高可用性 OLTP データベースです。

構造化ストリーミングは、Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、RDD API と DataFrame API の両方をサポートしているとともに、ストリーミング データの書き込みに対してネイティブにサポートしています。 "重要" spark-cassandra-connector-assembly の対応するバージョンを使用する必要があります。

次の例では、Cassandra データベース クラスター内にある 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペースやテーブル名などの接続構成も指定します。

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Python で foreachBatch() を使用して Azure Synapse Analytics に書き込む

streamingDF.writeStream.foreachBatch() を使用すると、既存のバッチ データ ライターを再利用して、ストリーミング クエリの出力を Azure Synapse Analytics に書き込むことができます。 詳細については、foreachBatch のドキュメントを参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、Azure Synapse Analytics のクエリ データに関する記事を参照してください。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

ストリーム同士の結合

これらの 2 つのノートブックでは、Python および Scala でストリーム同士の結合を使用する方法を示しています。

ストリーム同士の結合 Python ノートブック

ノートブックを入手

ストリーム同士の結合 Scala ノートブック

ノートブックを入手