Freigeben über


read_pubsub-Streamingtabellenwertfunktionen

Gilt für: Häkchen gesetzt ja Databricks SQL Häkchen gesetzt ja Databricks Runtime 13.3 LTS und höher

Gibt eine Tabelle mit Datensätzen zurück, die von Pub/Sub aus einem Thema gelesen wurden. Unterstützt nur Streamingabfragen.

Syntax

read_pubsub( { parameter => value } [, ...])

Argumente

read_pubsub erfordert einen benannten Parameteraufruf.

Die einzigen erforderlichen Argumente sind subscriptionId, projectId und topicId. Alle anderen Argumente sind optional.

Eine vollständige Beschreibung der Argumente finden Sie unter Konfigurieren von Optionen für Pub/Sub-Streaming-Lesevorgänge.

Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Siehe secret-Funktion.

Ausführliche Informationen zum Konfigurieren des Zugriffs auf Pub/Sub finden Sie unter Konfigurieren des Zugriffs auf Pub/Sub.

Parameter Typ Beschreibung
subscriptionId STRING Erforderlich, der eindeutige Bezeichner, der einem Pub/Sub-Abonnement zugewiesen ist.
projectId STRING Erforderlich, die Google Cloud-Projekt-ID, die dem Pub/Sub-Thema zugeordnet ist.
topicId STRING Erforderlich, die ID oder der Name des Pub/Sub-Themas, das abonniert werden soll.
clientEmail STRING Die E-Mail-Adresse, die einem Dienstkonto für die Authentifizierung zugeordnet ist.
clientId STRING Die Client-ID, die dem Dienstkonto für die Authentifizierung zugeordnet ist.
privateKeyId STRING Die ID des privaten Schlüssels, der dem Dienstkonto zugeordnet ist.
privateKey STRING Der private Schlüssel, der dem Dienstkonto für die Authentifizierung zugeordnet ist.

Diese Argumente werden für die weitere Optimierung beim Lesen von Pub/Sub verwendet:

Parameter Typ Beschreibung
numFetchPartitions STRING Optional mit der Standardanzahl von Executors. Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen.
deleteSubscriptionOnStreamStop BOOLEAN Optional mit Standardwert false. Bei „true“ wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht.
maxBytesPerTrigger STRING Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird. Der Standardwert ist „none“.
maxRecordsPerFetch STRING Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden. Standardwert: 1.000
maxFetchPeriod STRING Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Standardwert: 10 s.

Gibt zurück

Eine Tabelle mit Pub/Sub-Datensätzen mit dem folgenden Schema. Die Attributspalte kann NULL sein, aber alle anderen Spalten sind nicht NULL.

Name Datentyp Nullable Standard BESCHREIBUNG
messageId STRING Nein Eindeutiger Bezeichner für die Pub/Sub-Nachricht.
payload BINARY Nein Der Inhalt der Pub/Sub-Nachricht.
attributes STRING Ja Schlüssel-Wert-Paare, die die Attribute der Pub/Sub-Nachricht darstellen. Dies ist eine JSON-codierte Zeichenfolge.
publishTimestampInMillis BIGINT Nein Der Zeitstempel, zu dem die Nachricht veröffentlicht wurde, in Millisekunden.
sequenceNumber BIGINT Nein Der eindeutige Bezeichner des Datensatzes innerhalb des Shards.

Beispiele

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Die Daten müssen nun zur weiteren Analyse aus testing.streaming_table abgefragt werden.

Fehlerhafte Abfragen:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);