Delen via


Gestructureerde streamingpatronen in Azure Databricks

Dit bevat notebooks en codevoorbeelden voor veelvoorkomende patronen voor het werken met Structured Streaming in Azure Databricks.

Aan de slag met Gestructureerd streamen

Als u nieuw bent voor Structured Streaming, raadpleegt u Uw eerste workload voor gestructureerd streamen uitvoeren.

Schrijven naar Cassandra als sink voor Gestructureerd streamen in Python

Apache Cassandra is een gedistribueerde, lage latentie, schaalbare, maximaal beschikbare OLTP-database.

Structured Streaming werkt met Cassandra via de Spark Cassandra-connector. Deze connector ondersteunt zowel RDD- als DataFrame-API's en biedt systeemeigen ondersteuning voor het schrijven van streaminggegevens. Belangrijk : u moet de bijbehorende versie van de spark-cassandra-connector-assembly gebruiken.

In het volgende voorbeeld wordt verbinding gemaakt met een of meer hosts in een Cassandra-databasecluster. Er worden ook verbindingsconfiguraties opgegeven, zoals de locatie van het controlepunt en de specifieke keyspace en table namen:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Schrijven naar Azure Synapse Analytics met behulp van foreachBatch() Python

streamingDF.writeStream.foreachBatch() hiermee kunt u bestaande schrijvers van batchgegevens opnieuw gebruiken om de uitvoer van een streamingquery naar Azure Synapse Analytics te schrijven. Zie de foreachBatch-documentatie voor meer informatie.

Als u dit voorbeeld wilt uitvoeren, hebt u de Azure Synapse Analytics-connector nodig. Zie Querygegevens in Azure Synapse Analytics voor meer informatie over de Azure Synapse Analytics-connector.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-stream-joins

Deze twee notebooks laten zien hoe u stream-stream joins gebruikt in Python en Scala.

Stream-Stream voegt python-notebook toe

Get notebook

Scala-notebook koppelen aan Stream-Stream

Get notebook