Udostępnij za pośrednictwem


Wykonywanie zapytań dotyczących danych przesyłanych strumieniowo

Usługi Azure Databricks można używać do wykonywania zapytań dotyczących źródeł danych przesyłanych strumieniowo przy użyciu przesyłania strumieniowego ze strukturą. Usługa Azure Databricks zapewnia szeroką obsługę obciążeń przesyłania strumieniowego w językach Python i Scala oraz obsługuje większość funkcji przesyłania strumieniowego ze strukturą za pomocą języka SQL.

W poniższych przykładach pokazano użycie ujścia pamięci do ręcznej inspekcji danych przesyłanych strumieniowo podczas interaktywnego programowania w notesach. Ze względu na limity danych wyjściowych wierszy w interfejsie użytkownika notesu możesz nie obserwować wszystkich danych odczytywanych przez zapytania przesyłane strumieniowo. W przypadku obciążeń produkcyjnych należy wyzwalać zapytania przesyłane strumieniowo tylko przez zapisanie ich w tabeli docelowej lub systemie zewnętrznym.

Uwaga

Obsługa języka SQL w przypadku interakcyjnych zapytań dotyczących danych przesyłanych strumieniowo jest ograniczona do notesów działających w obliczeniach wszystkich celów. Można również użyć języka SQL podczas deklarowania tabel przesyłania strumieniowego w usłudze Databricks SQL lub delta live tables. Zobacz Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL i Co to są tabele delta live?.

Wykonywanie zapytań dotyczących danych z systemów przesyłania strumieniowego

Usługa Azure Databricks udostępnia czytniki danych przesyłanych strumieniowo dla następujących systemów przesyłania strumieniowego:

  • Kafka
  • Kinesis
  • PubSub
  • Pulsar

Podczas inicjowania zapytań względem tych systemów należy podać szczegóły konfiguracji, które różnią się w zależności od skonfigurowanego środowiska i wybranego systemu. Zobacz Konfigurowanie źródeł danych przesyłanych strumieniowo.

Typowe obciążenia obejmujące systemy przesyłania strumieniowego obejmują pozyskiwanie danych do usługi Lakehouse i przetwarzanie strumieniowe w celu ujścia danych do systemów zewnętrznych. Aby uzyskać więcej informacji na temat obciążeń przesyłania strumieniowego, zobacz Przesyłanie strumieniowe w usłudze Azure Databricks.

W poniższych przykładach pokazano interakcyjne przesyłanie strumieniowe odczytane z platformy Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Wykonywanie zapytań względem tabeli jako odczytanego strumieniowo

Usługa Azure Databricks domyślnie tworzy wszystkie tabele przy użyciu usługi Delta Lake. Podczas wykonywania zapytania przesyłania strumieniowego względem tabeli delty zapytanie automatycznie pobiera nowe rekordy po zatwierdzeniu wersji tabeli. Domyślnie zapytania przesyłane strumieniowo oczekują, że tabele źródłowe będą zawierać tylko dołączone rekordy. Jeśli musisz pracować z danymi przesyłanymi strumieniowo zawierającymi aktualizacje i usunięcia, usługa Databricks zaleca korzystanie z funkcji Delta Live Tables i APPLY CHANGES INTO. Zobacz Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo.

W poniższych przykładach pokazano wykonywanie interakcyjnego przesyłania strumieniowego odczytanego z tabeli:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Wykonywanie zapytań dotyczących danych w magazynie obiektów w chmurze za pomocą modułu automatycznego ładowania

Możesz przesyłać strumieniowo dane z magazynu obiektów w chmurze przy użyciu modułu automatycznego ładowania, łącznika danych w chmurze usługi Azure Databricks. Łącznik można używać z plikami przechowywanymi w woluminach wykazu aparatu Unity lub w innych lokalizacjach przechowywania obiektów w chmurze. Usługa Databricks zaleca używanie woluminów do zarządzania dostępem do danych w magazynie obiektów w chmurze. Zobacz Nawiązywanie połączenia ze źródłami danych.

Usługa Azure Databricks optymalizuje ten łącznik pod kątem przesyłania strumieniowego danych w magazynie obiektów w chmurze przechowywanych w popularnych formatach ustrukturyzowanych, częściowo ustrukturyzowanych i bez struktury. Usługa Databricks zaleca przechowywanie pozyskanych danych w niemal nieprzetworzonym formacie, aby zmaksymalizować przepływność i zminimalizować potencjalną utratę danych z powodu uszkodzonych rekordów lub zmian schematu.

Aby uzyskać więcej zaleceń dotyczących pozyskiwania danych z magazynu obiektów w chmurze, zobacz Pozyskiwanie danych do magazynu lakehouse usługi Databricks.

W poniższych przykładach pokazano interakcyjne przesyłanie strumieniowe odczytane z katalogu plików JSON w woluminie:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')