共用方式為


查詢串流資料

您可以使用 Azure Databricks 來使用結構化串流來查詢串流數據源。 Azure Databricks 針對 Python 和 Scala 中的串流工作負載提供廣泛的支援,並支援大部分的結構化串流功能與 SQL。

下列範例示範如何在筆記本中的互動式開發期間,使用記憶體接收器手動檢查串流數據。 由於筆記本 UI 中的數據列輸出限制,您可能不會觀察串流查詢所讀取的所有數據。 在生產工作負載中,您應該只藉由將串流查詢寫入目標數據表或外部系統來觸發串流查詢。

注意

串流數據互動式查詢的 SQL 支援僅限於在所有用途計算上執行的筆記本。 您也可以在 Databricks SQL 或 Delta Live Tables 中宣告串流數據表時使用 SQL。 請參閱 使用 Databricks SQL 中的串流數據表載入數據,以及 什麼是 Delta 即時數據表?

從串流系統查詢數據

Azure Databricks 為下列串流系統提供串流數據讀取器:

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

當您針對這些系統初始化查詢時,您必須提供設定詳細數據,這視您設定的環境和您選擇要讀取的系統而有所不同。 請參閱 <設定串流資料來源>。

涉及串流系統的常見工作負載包括將數據擷取至 Lakehouse,以及串流處理以將數據接收至外部系統。 如需串流工作負載的詳細資訊,請參閱 在 Azure Databricks 上串流。

下列範例示範從 Kafka 讀取的互動式串流:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

以串流讀取的形式查詢數據表

根據預設,Azure Databricks 會使用 Delta Lake 建立所有數據表。 當您對 Delta 資料表執行串流查詢時,查詢會在認可數據表版本時自動挑選新的記錄。 根據預設,串流查詢會預期源數據表只包含附加的記錄。 如果您需要處理包含更新和刪除的串流資料,Databricks 建議使用 Delta Live Tables 和 APPLY CHANGES INTO。 請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取

下列範例示範如何執行從資料表讀取的互動式串流:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

使用自動載入器查詢雲端物件記憶體中的數據

您可以使用 Azure Databricks 雲端數據連接器自動載入器,從雲端物件記憶體串流數據。 您可以使用連接器搭配儲存在 Unity 目錄磁碟區或其他雲端物件儲存位置中的檔案。 Databricks 建議使用磁碟區來管理雲端物件記憶體中數據的存取權。 請參閱 連線至數據源

Azure Databricks 會將此連接器優化,以串流擷取儲存在熱門結構化、半結構化和非結構化格式的雲端物件記憶體中的數據。 Databricks 建議以近乎原始的格式儲存擷取的數據,以最大化輸送量,並將因記錄或架構變更而可能遺失的數據降到最低。

如需從雲端物件記憶體擷取數據的詳細資訊,請參閱 將數據內嵌至 Databricks Lakehouse

下列範例示範從磁碟區中 JSON 檔案目錄讀取的互動式串流:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')