Función con valores de tabla de transmisión por secuencias read_pulsar
Se aplica a: Databricks SQL Databricks Runtime 14.1 y versiones posteriores
Importante
Esta característica está en versión preliminar pública.
Devuelve una tabla con los registros leídos desde Pulsar.
Esta función con valores de tabla solo admite la transmisión por secuencias y no la consulta por lotes.
Sintaxis
read_pulsar ( { option_key => option_value } [, ...] )
Argumentos
Esta función requiere la invocación de parámetros con nombre para las claves de opción.
Las opciones serviceUrl
y topic
son obligatorias.
Las descripciones de los argumentos son breves aquí. Consulte la documentación de Pulsar de flujo estructurado para obtener descripciones ampliadas.
Opción | Tipo | Valor predeterminado | Descripción |
---|---|---|---|
serviceUrl | STRING | Mandatory | Identificador URI del servicio Pulsar. |
topic | STRING | Mandatory | Tema del que se va a leer. |
predefinedSubscription | STRING | None | Nombre de suscripción predefinido usado por el conector para realizar un seguimiento del progreso de la aplicación de spark. |
subscriptionPrefix | STRING | None | Prefijo usado por el conector para generar una suscripción aleatoria para realizar un seguimiento del progreso de la aplicación de spark. |
pollTimeoutMs | LONG | 120000 | Tiempo de espera para leer mensajes de Pulsar en milisegundos. |
failOnDataLoss | BOOLEAN | true | Controla si se produce un error en una consulta cuando se pierden los datos (por ejemplo, los temas se eliminan o los mensajes se eliminan debido a la directiva de retención). |
startingOffsets | STRING | latest | El punto de inicio cuando se inicia una consulta, ya sea el más antiguo, el más reciente o una cadena JSON que especifica un desplazamiento concreto. Si es el más reciente, el lector lee los registros más nuevos después de empezar a funcionar. Si es el más antiguo, el lector lee a partir del desplazamiento más antiguo. El usuario también puede especificar una cadena JSON que especifique un desplazamiento específico. |
startingTime | STRING | None | Cuando se especifica, el origen de Pulsar leerá los mensajes a partir de la posición del valor de startingTime especificado. |
Los siguientes argumentos se usan para la autentificación del cliente de Pulsar:
Opción | Tipo | Valor predeterminado | Descripción |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | None | Nombre del complemento de autenticación. |
pulsarClientAuthParams | STRING | None | Parámetros para el complemento de autenticación. |
pulsarClientUseKeyStoreTls | STRING | None | Si se va a usar KeyStore para la autenticación tls. |
pulsarClientTlsTrustStoreType | STRING | None | Tipo de archivo TrustStore para la autenticación tls. |
pulsarClientTlsTrustStorePath | STRING | None | Ruta de acceso del archivo TrustStore para la autenticación tls. |
pulsarClientTlsTrustStorePassword | STRING | None | Contraseña de TrustStore para la autenticación tls. |
Estos argumentos se usan para la configuración y autentificación del control de admisión de Pulsar, la configuración del administrador de Pulsar solo es necesaria cuando el control de admisión está habilitado (cuando maxBytesPerTrigger está establecido).
Opción | Tipo | Valor predeterminado | Descripción |
---|---|---|---|
maxBytesPerTrigger | BIGINT | None | Límite temporal del número máximo de bytes que queremos procesar por microlote. Si se especifica esto, también es necesario especificar admin.url. |
adminUrl | STRING | None | Configuración del Pulsar serviceHttpUrl. Solo es necesario cuando se especifica maxBytesPerTrigger. |
pulsarAdminAuthPlugin | STRING | None | Nombre del complemento de autenticación. |
pulsarAdminAuthParams | STRING | None | Parámetros para el complemento de autenticación. |
pulsarClientUseKeyStoreTls | STRING | None | Si se va a usar KeyStore para la autenticación tls. |
pulsarAdminTlsTrustStoreType | STRING | None | Tipo de archivo TrustStore para la autenticación tls. |
pulsarAdminTlsTrustStorePath | STRING | None | Ruta de acceso del archivo TrustStore para la autenticación tls. |
pulsarAdminTlsTrustStorePassword | STRING | None | Contraseña de TrustStore para la autenticación tls. |
Devoluciones
Tabla de registros de Pulsar con el esquema siguiente.
__key STRING NOT NULL
: clave de mensaje de Pulsar.value BINARY NOT NULL
: valor de mensaje de Pulsar.Nota: En el caso de los temas con el esquema Avro o JSON, en lugar de cargar contenido en un campo de valor binario, el contenido se expandirá para conservar los nombres de campo y los tipos de campo del tema Pulsar.
__topic STRING NOT NULL
: nombre del tema Pulsar.__messageId BINARY NOT NULL
: id. de mensaje de Pulsar.__publishTime TIMESTAMP NOT NULL
: tiempo de publicación del mensaje de Pulsar.__eventTime TIMESTAMP NOT NULL
: tiempo de evento de mensaje de Pulsar.__messageProperties MAP<STRING, STRING>
: propiedades del mensaje de Pulsar.
Ejemplos
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.