Transmisión desde Apache Pulsar
Importante
Esta característica está en versión preliminar pública.
En Databricks Runtime 14.1 y versiones posteriores, puede usar Structured Streaming para transmitir datos de Apache Pulsar en Azure Databricks.
Structured Streaming proporciona una semántica de procesamiento exactamente una vez para los datos leídos de orígenes de Pulsar.
Ejemplo de sintaxis
A continuación se muestra un ejemplo básico del uso de Structured Streaming para leer desde Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Siempre debe proporcionar un service.url
y una de las siguientes opciones para especificar temas:
topic
topics
topicsPattern
Para obtener una lista completa de las opciones, consulte Configurar opciones para la lectura de streaming de Pulsar.
Autenticación en Pulsar
Azure Databricks admite la autenticación de almacén de confianza y almacén de claves en Pulsar. Databricks recomienda usar secretos al almacenar los detalles de configuración.
Puede establecer las siguientes opciones durante la configuración de la transmisión:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Si la transmisión usa un PulsarAdmin
, establezca también lo siguiente:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
En el ejemplo siguiente se muestra cómo configurar las opciones de autenticación:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Esquema de Pulsar
El esquema de registros leídos de Pulsar depende de cómo los temas tengan codificados sus esquemas.
- En los temas con el esquema Avro o JSON, los nombres de campo y los tipos de campo se conservan en el dataframe de Spark resultante.
- Para temas sin esquema o con un tipo de datos simple en Pulsar, la carga se carga en una columna
value
. - Si el lector está configurado para leer varios temas con esquemas diferentes, establezca
allowDifferentTopicSchemas
para cargar el contenido sin procesar en una columnavalue
.
Los registros de Pulsar tienen los siguientes campos de metadatos:
Column | Tipo |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configuración de las opciones de lectura de streaming de Pulsar
Todas las opciones se configuran como parte de una lectura de Structured Streaming mediante la sintaxis .option("<optionName>", "<optionValue>")
. También puede configurar la autenticación mediante opciones. Consulte Autenticación en Pulsar.
En la tabla siguiente se describen las configuraciones necesarias para Pulsar. Debe especificar solo una de las opciones topic
, topics
o topicsPattern
.
Opción | Valor predeterminado | Descripción |
---|---|---|
service.url |
None | Configuración de Pulsar serviceUrl para el servicio Pulsar. |
topic |
None | Cadena de nombre de tema para el tema que se va a consumir. |
topics |
None | Lista separada por comas de los temas que se van a consumir. |
topicsPattern |
None | Cadena regex de Java para que coincida con los temas que se van a consumir. |
En la tabla siguiente se describen otras opciones admitidas para Pulsar:
Opción | Valor predeterminado | Descripción |
---|---|---|
predefinedSubscription |
None | Nombre de suscripción predefinido usado por el conector para realizar un seguimiento del progreso de la aplicación de spark. |
subscriptionPrefix |
None | Prefijo usado por el conector para generar una suscripción aleatoria para realizar un seguimiento del progreso de la aplicación de spark. |
pollTimeoutMs |
120000 | Tiempo de espera para leer mensajes de Pulsar en milisegundos. |
waitingForNonExistedTopic |
false |
Si el conector debe esperar hasta que se creen los temas deseados. |
failOnDataLoss |
true |
Controla si se produce un error en una consulta cuando se pierden los datos (por ejemplo, los temas se eliminan o los mensajes se eliminan debido a la directiva de retención). |
allowDifferentTopicSchemas |
false |
Si se leen varios temas con esquemas diferentes, use este parámetro para desactivar la deserialización automática del valor del tema basado en esquemas. Solo se devuelven los valores sin procesar cuando es true . |
startingOffsets |
latest |
Si es latest , el lector lee los registros más recientes después de empezar a ejecutarse. Si es earliest , el lector lee desde el desplazamiento más antiguo. El usuario también puede especificar una cadena JSON que especifique un desplazamiento específico. |
maxBytesPerTrigger |
None | Límite temporal del número máximo de bytes que queremos procesar por microlote. Si se especifica esto, admin.url también debe especificarse. |
admin.url |
None | Configuración de Pulsar serviceHttpUrl . Solo es necesario cuando maxBytesPerTrigger se especifica. |
También puede especificar cualquier configuración de lector, administrador y cliente de Pulsar mediante los siguientes patrones:
Patrón | Vínculo a las opciones de configuración |
---|---|
pulsar.client.* |
Configuración de cliente de Pulsar |
pulsar.admin.* |
Configuración del administrador de Pulsar |
pulsar.reader.* |
Configuración del lector de Pulsar |
Construcción de desplazamientos iniciales JSON
Puede construir manualmente un identificador de mensaje para especificar un desplazamiento específico y pasarlo como JSON a la opción startingOffsets
. En el ejemplo de código siguiente se muestra esta sintaxis:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()