Compartir vía


Lectura y escritura de datos de Avro en streaming

Apache Avro es un sistema de serialización de datos de uso frecuente en el mundo del streaming. Una solución típica consiste en colocar datos en formato Avro en Apache Kafka, metadatos en el registro de esquema de Confluent y, a continuación, ejecutar consultas con un marco de streaming que se conecte tanto a Kafka como al registro de esquema.

Azure Databricks admite las from_avro y to_avro funciones para crear canalizaciones de streaming con datos de Avro en Kafka y metadatos en el Registro de esquemas. La función to_avro codifica una columna como binaria en formato Avro y from_avro descodifica los datos binarios de Avro en una columna. Ambas funciones transforman una columna en otra, y el tipo de datos de SQL de entrada o salida puede ser de un tipo complejo o primitivo.

Nota

Las funciones from_avro y to_avro:

  • Están disponibles en Python, Scala y Java.
  • Se pueden pasar a las funciones SQL en consultas por lotes y de streaming.

Consulte también Origen de datos de archivos Avro.

Ejemplo de esquema especificado manualmente

De forma similar a from_json y to_json, puede usar from_avro y to_avro con cualquier columna binaria. Puede especificar manualmente el esquema de Avro, como en el ejemplo siguiente:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema example

También puede especificar un esquema como una cadena JSON. Por ejemplo, si /tmp/user.avsc es:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Puede crear una cadena JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

A continuación, use el esquema en from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Ejemplo con el registro de esquema

Si el clúster tiene un servicio de registro de esquemas, from_avro puede utilizarlo para que no sea necesario especificar manualmente el esquema de Avro.

En el ejemplo siguiente se muestra cómo leer un tema de Kafka "t", suponiendo que la clave y el valor ya están registrados en el registro de esquemas como asuntos "t-key" y "t-value" de los tipos STRING y INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Para to_avro, el esquema de salida predeterminado de Avro podría no coincidir con el esquema del sujeto de destino en el servicio de registro de esquemas por los siguientes motivos:

  • La asignación del tipo Spark SQL al esquema de Avro no es uno a uno. Consulte Tipos admitidos para la conversión de Spark SQL a> Avro.
  • Si el esquema de salida convertido de Avro es del tipo de registro, el nombre del registro es topLevelRecord y no hay ningún espacio de nombres de manera predeterminada.

Si el esquema de salida predeterminado de to_avro coincide con el esquema del sujeto de destino, puede hacer lo siguiente:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

De lo contrario, debe proporcionar el esquema del sujeto de destino en la función to_avro:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autenticación en un registro de esquema de Confluent externo

En Databricks Runtime 12.2 LTS y versiones posteriores, puede autenticarse en un registro de esquemas de Confluent externo. En los ejemplos siguientes se muestra cómo configurar las opciones del registro de esquemas para incluir credenciales de autenticación y claves de API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Uso del almacén de confianza y los archivos de almacén de claves en volúmenes de Unity Catalog

En Databricks Runtime 14.3 LTS y posteriores, puede usar archivos de almacén de confianza y almacén de claves en volúmenes de Unity Catalog para autenticarse en un registro de esquemas de Confluent. Actualice la configuración en el ejemplo anterior mediante la sintaxis siguiente:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Uso del modo de evolución del esquema con from_avro

En Databricks Runtime 14.2 y posteriores, puede usar el modo de evolución del esquema con from_avro. Al habilitar el modo de evolución del esquema, el trabajo inicia una UnknownFieldException después de detectar la evolución del esquema. Databricks recomienda configurar trabajos con el modo de evolución del esquema para reiniciarse automáticamente en caso de error de tarea. Consulte Consideraciones de producción para Structured Streaming.

La evolución del esquema es útil si espera que el esquema de los datos de origen evolucione con el tiempo e ingiera todos los campos del origen de datos. Si las consultas ya especifican explícitamente qué campos se van a consultar en el origen de datos, los campos agregados se omiten independientemente de la evolución del esquema.

Use la opción avroSchemaEvolutionMode para habilitar la evolución del esquema. En la tabla siguiente se describen las opciones para el modo de evolución del esquema:

Opción Comportamiento
none Default. Omite la evolución del esquema y el trabajo continúa.
restart Produce una UnknownFieldException al detectar la evolución del esquema. Requiere un reinicio del trabajo.

Nota:

Puede cambiar esta configuración entre los trabajos de streaming y reutilizar el mismo punto de control. Deshabilitar la evolución del esquema puede dar lugar a columnas quitadas.

Configuración del modo de análisis

Puede configurar el modo de análisis para determinar si desea producir un error o emitir registros null cuando el modo de evolución del esquema está deshabilitado y el esquema evoluciona de forma no compatible con versiones anteriores. Con la configuración predeterminada, from_avro produce un error cuando observa cambios de esquema incompatibles.

Use la opción mode para especificar el modo de análisis. La opción del modo de análisis se describe en la siguiente tabla:

Opción Comportamiento
FAILFAST Default. Un error de análisis produce una SparkException con una errorClass de MALFORMED_AVRO_MESSAGE.
PERMISSIVE Se ignora un error de análisis y se emite un registro null.

Nota:

Con la evolución del esquema habilitada, FAILFAST solo produce excepciones si un registro está dañado.

Ejemplo de uso de la evolución del esquema y la configuración del modo de análisis

En el ejemplo siguiente se muestra cómo habilitar la evolución del esquema y especificar el modo de análisis FAILFAST con un registro de esquema de Confluent:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)