read_pubsub
-Streamingtabellenwertfunktionen
Gilt für: Databricks SQL Databricks Runtime 13.3 LTS und höher
Gibt eine Tabelle mit Datensätzen zurück, die von Pub/Sub aus einem Thema gelesen wurden. Unterstützt nur Streamingabfragen.
Syntax
read_pubsub( { parameter => value } [, ...])
Argumente
read_pubsub
erfordert einen benannten Parameteraufruf.
Die einzigen erforderlichen Argumente sind subscriptionId
, projectId
und topicId
. Alle anderen Argumente sind optional.
Eine vollständige Beschreibung der Argumente finden Sie unter Konfigurieren von Optionen für Pub/Sub-Streaming-Lesevorgänge.
Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Siehe secret-Funktion.
Ausführliche Informationen zum Konfigurieren des Zugriffs auf Pub/Sub finden Sie unter Konfigurieren des Zugriffs auf Pub/Sub.
Parameter | Typ | Beschreibung |
---|---|---|
subscriptionId |
STRING |
Erforderlich, der eindeutige Bezeichner, der einem Pub/Sub-Abonnement zugewiesen ist. |
projectId |
STRING |
Erforderlich, die Google Cloud-Projekt-ID, die dem Pub/Sub-Thema zugeordnet ist. |
topicId |
STRING |
Erforderlich, die ID oder der Name des Pub/Sub-Themas, das abonniert werden soll. |
clientEmail |
STRING |
Die E-Mail-Adresse, die einem Dienstkonto für die Authentifizierung zugeordnet ist. |
clientId |
STRING |
Die Client-ID, die dem Dienstkonto für die Authentifizierung zugeordnet ist. |
privateKeyId |
STRING |
Die ID des privaten Schlüssels, der dem Dienstkonto zugeordnet ist. |
privateKey |
STRING |
Der private Schlüssel, der dem Dienstkonto für die Authentifizierung zugeordnet ist. |
Diese Argumente werden für die weitere Optimierung beim Lesen von Pub/Sub verwendet:
Parameter | Typ | Beschreibung |
---|---|---|
numFetchPartitions |
STRING |
Optional mit der Standardanzahl von Executors. Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Optional mit Standardwert false . Bei „true“ wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht. |
maxBytesPerTrigger |
STRING |
Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird. Der Standardwert ist „none“. |
maxRecordsPerFetch |
STRING |
Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden. Standardwert: 1.000 |
maxFetchPeriod |
STRING |
Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Standardwert: 10 s. |
Gibt zurück
Eine Tabelle mit Pub/Sub-Datensätzen mit dem folgenden Schema. Die Attributspalte kann NULL sein, aber alle anderen Spalten sind nicht NULL.
Name | Datentyp | Nullable | Standard | BESCHREIBUNG |
---|---|---|---|---|
messageId |
STRING |
Nein | Eindeutiger Bezeichner für die Pub/Sub-Nachricht. | |
payload |
BINARY |
Nein | Der Inhalt der Pub/Sub-Nachricht. | |
attributes |
STRING |
Ja | Schlüssel-Wert-Paare, die die Attribute der Pub/Sub-Nachricht darstellen. Dies ist eine JSON-codierte Zeichenfolge. | |
publishTimestampInMillis |
BIGINT |
Nein | Der Zeitstempel, zu dem die Nachricht veröffentlicht wurde, in Millisekunden. | |
sequenceNumber |
BIGINT |
Nein | Der eindeutige Bezeichner des Datensatzes innerhalb des Shards. |
Beispiele
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Die Daten müssen nun zur weiteren Analyse aus testing.streaming_table
abgefragt werden.
Fehlerhafte Abfragen:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);