Wykonywanie zapytań dotyczących danych przesyłanych strumieniowo
Usługi Azure Databricks można używać do wykonywania zapytań dotyczących źródeł danych przesyłanych strumieniowo przy użyciu przesyłania strumieniowego ze strukturą. Usługa Azure Databricks zapewnia szeroką obsługę obciążeń przesyłania strumieniowego w językach Python i Scala oraz obsługuje większość funkcji przesyłania strumieniowego ze strukturą za pomocą języka SQL.
W poniższych przykładach pokazano użycie ujścia pamięci do ręcznej inspekcji danych przesyłanych strumieniowo podczas interaktywnego programowania w notesach. Ze względu na limity liczby wierszy wyjściowych w interfejsie notesu, możesz nie widzieć wszystkich danych odczytywanych przez zapytania przesyłane strumieniowo. W przypadku obciążeń produkcyjnych należy wyzwalać zapytania przesyłane strumieniowo tylko przez zapisanie ich w tabeli docelowej lub systemie zewnętrznym.
Uwaga
Obsługa języka SQL dla interakcyjnych zapytań na danych przesyłanych strumieniowo jest ograniczona do notatników działających na uniwersalnych zasobach obliczeniowych. Możesz również użyć języka SQL podczas deklarowania tabel przesyłania strumieniowego w usłudze Databricks SQL lub DLT. Zobacz artykuły Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL i Co to jest DLT?.
Wykonywanie zapytań dotyczących danych z systemów przesyłania strumieniowego
Usługa Azure Databricks udostępnia czytniki danych przesyłanych strumieniowo dla następujących systemów przesyłania strumieniowego:
- Kafka
- Kinesis
- PubSub
- Pulsar
Podczas inicjowania zapytań względem tych systemów należy podać szczegóły konfiguracji, które różnią się w zależności od skonfigurowanego środowiska i wybranego systemu. Zobacz Konfigurowanie źródeł danych przesyłanych strumieniowo.
Typowe obciążenia związane z systemami przesyłania strumieniowego obejmują przesył danych do Lakehouse i przetwarzanie strumieniowe w celu przesyłania danych do systemów zewnętrznych. Aby uzyskać więcej informacji na temat obciążeń przesyłania strumieniowego, zobacz Przesyłanie strumieniowe w usłudze Azure Databricks.
W poniższych przykładach pokazano interakcyjne przesyłanie strumieniowe odczytane z platformy Kafka:
Python
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Wysyłanie zapytania do tabeli jako do strumienia danych.
Usługa Azure Databricks domyślnie tworzy wszystkie tabele przy użyciu usługi Delta Lake. Podczas wykonywania zapytania transmisji strumieniowej względem tabeli Delta zapytanie automatycznie pobiera nowe rekordy po zatwierdzeniu wersji tabeli. Domyślnie zapytania przesyłane strumieniowo oczekują, że tabele źródłowe będą zawierać tylko dołączone rekordy. Jeśli musisz pracować z danymi strumieniowymi zawierającymi aktualizacje i usunięcia, Databricks zaleca użycie DLT i APPLY CHANGES INTO
. Zobacz APLIKACJE DO ZASTOSOWANIA ZMIAN: Upraszczanie przechwytywania danych o zmianach z DLT.
W poniższych przykładach pokazano wykonywanie interakcyjnego przesyłania strumieniowego odczytanego z tabeli:
Python
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Wykonywanie zapytań dotyczących danych w magazynie obiektów w chmurze za pomocą modułu automatycznego ładowania
Możesz przesyłać strumieniowo dane z magazynu obiektów w chmurze przy użyciu modułu automatycznego ładowania, łącznika danych w chmurze usługi Azure Databricks. Można używać łącznika z plikami przechowywanymi w woluminach Unity Catalog lub w innych lokalizacjach magazynowania obiektów w chmurze. Usługa Databricks zaleca używanie woluminów do zarządzania dostępem do danych w magazynie obiektów w chmurze. Zobacz Połącz ze źródłami danych.
Azure Databricks optymalizuje ten łącznik do strumieniowego pobierania danych z magazynu obiektów w chmurze, przechowywanych w popularnych formatach: ustrukturyzowane, częściowo ustrukturyzowane i niestrukturyzowane. Usługa Databricks zaleca przechowywanie pozyskanych danych w niemal nieprzetworzonym formacie, aby zmaksymalizować przepływność i zminimalizować potencjalną utratę danych z powodu uszkodzonych rekordów lub zmian schematu.
Aby uzyskać więcej zaleceń dotyczących pozyskiwania danych z magazynu obiektów w chmurze, zobacz Pozyskiwanie danych do usługi Azure Databricks lakehouse.
W poniższych przykładach pokazano interakcyjne przesyłanie strumieniowe odczytane z katalogu plików JSON w woluminie:
Python
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')