Compartilhar via


Padrões de Fluxo Estruturado no Azure Databricks

Isso contém notebooks e códigos de exemplo para padrões comuns para trabalhar com Fluxo Estruturado no Azure Databricks.

Introdução ao Fluxo Estruturado

Se você for novato no Fluxo Estruturado, confira Executar sua primeira carga de trabalho de Fluxo Estruturado.

Gravar no Cassandra como coletor para Fluxo Estruturado no Python

O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalonável e altamente disponível.

O Fluxo Estruturado funciona com o Cassandra por meio do Conector do Cassandra do Spark. Esse conector dá suporte a APIs RDD e DataFrame e tem suporte nativo para gravar dados de streaming. Importante Você deve usar a versão correspondente do spark-cassandra-connector-assembly.

O exemplo a seguir está conectado a um ou mais hosts em um cluster de banco de dados do Cassandra. Ele também especifica configurações de conexão, como o local do ponto de verificação e os nomes de keyspace e tabela específicos:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Gravação no Azure Synapse Analytics usando foreachBatch() no Python

streamingDF.writeStream.foreachBatch() permite reutilizar os gravadores de dados em lotes existentes para gravar a saída de uma consulta de streaming no Azure Synapse Analytics. Confira a documentação do foreachBatch para obter detalhes.

Para executar este exemplo, você precisa do conector do Azure Synapse Analytics. Para obter detalhes sobre o Azure Synapse Analytics, confira Dados de consulta no Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Junções entre fluxos

Esses dois notebooks mostram como usar junções de fluxo de fluxo em Python e Scala.

Junções de fluxo a fluxo no notebook Python

Obter notebook

Junções de fluxo a fluxo no notebook Scala

Obter notebook