Wzorce przesyłania strumieniowego ze strukturą w usłudze Azure Databricks
Zawiera to notesy i przykłady kodu dla typowych wzorców pracy ze strukturą przesyłania strumieniowego w usłudze Azure Databricks.
Wprowadzenie do przesyłania strumieniowego ze strukturą
Jeśli dopiero zaczynasz korzystać z przesyłania strumieniowego ze strukturą, zobacz Uruchamianie pierwszego obciążenia przesyłania strumieniowego ze strukturą.
Zapisywanie w systemie Cassandra jako ujście dla przesyłania strumieniowego ze strukturą w języku Python
Apache Cassandra to rozproszona, mała opóźnienia, skalowalna, wysoce dostępna baza danych OLTP.
Przesyłanie strumieniowe ze strukturą współpracuje z rozwiązaniem Cassandra za pośrednictwem łącznika Spark Cassandra. Ten łącznik obsługuje zarówno interfejsy API RDD, jak i DataFrame oraz natywną obsługę zapisywania danych przesyłanych strumieniowo. Ważne Należy użyć odpowiedniej wersji zestawu spark-cassandra-connector-assembly.
Poniższy przykład łączy się z co najmniej jednym hostem w klastrze bazy danych Cassandra. Określa również konfiguracje połączeń, takie jak lokalizacja punktu kontrolnego i określone nazwy przestrzeni kluczy i tabel:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Zapisywanie w usłudze Azure Synapse Analytics przy użyciu języka foreachBatch()
Python
streamingDF.writeStream.foreachBatch()
umożliwia ponowne użycie istniejących składników zapisywania danych wsadowych w celu zapisania danych wyjściowych zapytania przesyłania strumieniowego do usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje, zobacz dokumentację foreachBatch.
Aby uruchomić ten przykład, potrzebny jest łącznik usługi Azure Synapse Analytics. Aby uzyskać szczegółowe informacje na temat łącznika usługi Azure Synapse Analytics, zobacz Query data in Azure Synapse Analytics (Wykonywanie zapytań o dane w usłudze Azure Synapse Analytics).
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Sprzężenia strumień-strumień
Te dwa notesy pokazują, jak używać sprzężeń strumienia w językach Python i Scala.