Delen via


Lees- en schrijfbewerkingen voor het streamen van Avro-gegevens

Apache Avro is een veelgebruikt systeem voor gegevensserialisatie in de streamingwereld. Een typische oplossing is het plaatsen van gegevens in Avro-indeling in Apache Kafka, metagegevens in Confluent Schema Registryen vervolgens query's uitvoeren met een streamingframework dat verbinding maakt met zowel Kafka als Schema Registry.

Azure Databricks ondersteunt de functies from_avro en to_avro voor het bouwen van streamingpijplijnen met Avro-gegevens in Kafka en metagegevens in Schema Registry. De functie to_avro codeert een column als binair bestand in Avro-indeling en from_avro decodeert binaire Avro-gegevens in een column. Beide functies transformeren één column naar een andere column, en het sql-gegevenstype invoer/uitvoer kan een complex type of een primitief type zijn.

Notitie

De from_avro en to_avro functies:

  • Zijn beschikbaar in Python, Scala en Java.
  • Kan worden doorgegeven aan SQL-functies in zowel batch- als streamingquery's.

Zie ook de Avro-bestandsgegevensbron.

Handmatig opgegeven schema voorbeeld

Net als bij from_json en to_jsonkunt u from_avro en to_avro gebruiken met elk binair column. U kunt de Avro-schema handmatig opgeven, zoals in het volgende voorbeeld:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema-voorbeeld

U kunt ook een schema opgeven als een JSON-tekenreeks. Als dit bijvoorbeeld het volgende is:/tmp/user.avsc

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

U kunt een JSON-tekenreeks maken:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Gebruik vervolgens de schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Voorbeeld met Schema-register

Als uw cluster een Schema Registry-service heeft, kunt from_avro ermee werken, zodat u de Avro-schema niet handmatig hoeft op te geven.

In het volgende voorbeeld ziet u hoe u een Kafka-onderwerp 't' leest, ervan uitgaande dat de sleutel en waarde al zijn geregistreerd in Schema Register als onderwerpen 't-sleutel' en 't-waarde' van typen STRING en INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Voor to_avrokomt de standaarduitvoer Avro-schema mogelijk niet overeen met de schema van het doelobject in de Schema Registry-service om de volgende redenen:

  • De toewijzing van het Spark SQL-type aan Avro-schema is niet een-op-een. Zie ondersteunde typen voor Spark SQL -> Avro-conversie.
  • Als de geconverteerde output Avro schema van recordtype is, is de recordnaam topLevelRecord en is er standaard geen namespace.

Als de standaarduitvoer schema van to_avro overeenkomt met de schema van het doelonderwerp, kunt u het volgende doen:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Anders moet u de schema van het doelonderwerp opgeven in de functie to_avro:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Authenticeren bij een extern Confluent Schema Register

In Databricks Runtime 12.2 LTS en hoger kunt u zich verifiëren bij een extern Confluent-Schema-register. In de volgende voorbeelden ziet u hoe u uw schema registeropties configureert om verificatie-credentials en API-sleutels op te nemen.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

TrustStore- en keystore-bestanden gebruiken in Unity Catalogvolumes

In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalogvolumes gebruiken om te verifiëren bij een Confluent Schema Registry. Update de configuratie in het vorige voorbeeld met behulp van de volgende syntaxis:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

De evolutiemodus van schema gebruiken met from_avro

In Databricks Runtime 14.2 en hoger kunt u schema evolutiemodus gebruiken met from_avro. Het inschakelen van schema evolutiemodus zorgt ervoor dat de taak een UnknownFieldException genereert nadat schema evolutie is gedetecteerd. Databricks raadt aan taken te configureren met schema evolutiemodus om automatisch opnieuw op te starten bij taakfouten. Zie Overwegingen voor productie voor gestructureerd streamen.

Schema evolutie is nuttig als u verwacht dat de schema van uw brongegevens zich in de loop van de tijd ontwikkelen en alle velden uit uw gegevensbron opnemen. Als uw query's al expliciet aangeven welke velden moeten worden opgevraagd in uw gegevensbron, worden toegevoegde velden genegeerd, ongeacht schema evolutie.

Gebruik de optie avroSchemaEvolutionMode om schema evolutie mogelijk te maken. In de volgende table worden de opties voor schema evolutiemodus beschreven:

Optie Gedrag
none Standaard. Negeert de evolutie van schema en de taak gaat verder.
restart Genereert een UnknownFieldException bij het detecteren van een schema evolutie. Vereist opnieuw opstarten van een taak.

Notitie

U kunt deze configuratie tussen streamingtaken wijzigen en hetzelfde controlepunt opnieuw gebruiken. Als u schema evolutie uitschakelt, kan dit leiden tot verwijderde columns.

De parseringsmodus configureren

U kunt de parsemodus configureren om te bepalen of u wilt dat het proces mislukt of dat er null records worden uitgegeven wanneer de schema-evolutiemodus is uitgeschakeld en de schema zich op een niet achterwaarts compatibele manier ontwikkelt. Met standaardinstellingen mislukt from_avro wanneer er incompatibele schema wijzigingen worden waargenomen.

Gebruik de optie om de mode parsemodus op te geven. In de volgende table wordt de optie voor de parseringsmodus beschreven:

Optie Gedrag
FAILFAST Standaard. Een parseringsfout genereert een SparkException met een errorClass van MALFORMED_AVRO_MESSAGE.
PERMISSIVE Er wordt een parseringsfout genegeerd en er wordt een null-record verzonden.

Notitie

Als schema evolutie is ingeschakeld, genereert FAILFAST alleen uitzonderingen als een record beschadigd is.

Voorbeeld van het gebruik van schema evolutie en het instellen van de parseringsmodus

In het volgende voorbeeld wordt getoond hoe u schema evolutie inschakelt en de FAILFAST parse modus opgeeft met een Confluent Schema Registry:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)