Compartir vía


Patrones de Structured Streaming en Azure Databricks

Este artículo contiene cuadernos y ejemplos de código para los patrones comunes que se dan a la hora de usar Structured Streaming en Azure Databricks.

Introducción a Structured Streaming

Si no está familiarizado con Structured Streaming, consulte Ejecución de la primera carga de trabajo de Structured Streaming.

Escritura en Cassandra como receptor para Structured Streaming en Python

Apache Cassandra es una base de datos OLTP distribuida, de baja latencia, escalable y de alta disponibilidad.

Structured Streaming funciona con Cassandra a través del Conector de Spark Cassandra. Este conector admite las API de RDD y DataFrame, y tiene compatibilidad nativa para escribir datos de streaming. Importante Debe usar la versión correspondiente del spark-cassandra-connector-assembly.

En el ejemplo siguiente se conecta a uno o varios hosts de un clúster de base de datos de Cassandra. También especifica configuraciones de conexión, como la ubicación del punto de control y los nombres de tabla y espacio de claves específicos:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Escritura en Azure Synapse Analytics mediante foreachBatch() en Python

streamingDF.writeStream.foreachBatch() permite reutilizar los escritores de datos por lotes existentes para escribir la salida de una consulta de streaming en Azure Synapse Analytics. Para obtener más detalles, consulte la documentación sobre foreachBatch.

Para ejecutar este ejemplo, necesita el conector de Azure Synapse Analytics. Para más información sobre el conector de Azure Synapse Analytics, consulte Consultar datos en Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Combinaciones de streaming con streaming

En estos dos cuadernos se muestra cómo usar combinaciones de transmisión con transmisión en Python y Scala.

Cuaderno de Python de combinaciones de transmisión con transmisión

Obtener el cuaderno

Cuaderno de Scala de combinaciones de transmisión con transmisión

Obtener el cuaderno