Sdílet prostřednictvím


Vzory strukturovaného streamování v Azure Databricks

Obsahuje poznámkové bloky a ukázky kódu pro běžné vzory pro práci se strukturovaným streamováním v Azure Databricks.

Začínáme se strukturovaným streamováním

Pokud s strukturovaným streamováním teprve začínáte, přečtěte si téma Spuštění první úlohy strukturovaného streamování.

Zápis do Cassandra jako jímky pro strukturované streamování v Pythonu

Apache Cassandra je distribuovaná databáze OLTP s nízkou latencí, škálovatelnou a vysoce dostupnou.

Strukturované streamování funguje s Cassandrou prostřednictvím konektoru Spark Cassandra. Tento konektor podporuje rozhraní API rdD i datového rámce a má nativní podporu pro zápis streamovaných dat. Důležité: Musíte použít odpovídající verzi sestavení spark-cassandra-connector-assembly.

Následující příklad se připojí k jednomu nebo více hostitelům v databázovém clusteru Cassandra. Určuje také konfigurace připojení, jako je umístění kontrolního bodu a konkrétní prostor klíčů a názvy tabulek:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Zápis do Azure Synapse Analytics pomocí foreachBatch() Pythonu

streamingDF.writeStream.foreachBatch() umožňuje opakovaně používat existující dávkové zapisovače dat k zápisu výstupu streamovacího dotazu do Azure Synapse Analytics. Podrobnosti najdete v dokumentaci foreachBatch.

Ke spuštění tohoto příkladu potřebujete konektor Azure Synapse Analytics. Podrobnosti o konektoru Azure Synapse Analytics najdete v tématu Dotazování dat ve službě Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Propojení streamů

Tyto dva poznámkové bloky ukazují, jak používat spojení stream-stream v Pythonu a Scala.

Stream-Stream připojí poznámkový blok Pythonu.

Získat poznámkový blok

Poznámkový blok Scala spojuje stream-Stream

Získat poznámkový blok