Delen via


Lees- en schrijfbewerkingen voor het streamen van Avro-gegevens

Apache Avro is een veelgebruikt systeem voor gegevensserialisatie in de streamingwereld. Een typische oplossing is het plaatsen van gegevens in Avro-indeling in Apache Kafka, metagegevens in Confluent Schema Registryen vervolgens query's uitvoeren met een streamingframework dat verbinding maakt met zowel Kafka als Schema Registry.

Azure Databricks ondersteunt de from_avro- en to_avro-functies voor het bouwen van streamingpijplijnen met Avro-gegevens in Kafka en metagegevens in schemaregister. De functie to_avro codeert een kolom als binair in Avro-indeling en de functie from_avro decodeert binaire Avro-gegevens in een kolom. Beide functies transformeren de ene kolom naar een andere kolom en het SQL-gegevenstype voor invoer/uitvoer kan een complex type of een primitief type zijn.

Notitie

De from_avro en to_avro functies:

  • Zijn beschikbaar in Python, Scala en Java.
  • Kan worden doorgegeven aan SQL-functies in zowel batch- als streamingquery's.

Zie ook de Avro-bestandsgegevensbron.

Handmatig opgegeven schemavoorbeeld

Net als bij from_json en to_jsonkunt u from_avro en to_avro gebruiken met elke binaire kolom. U kunt het Avro-schema handmatig opgeven, zoals in het volgende voorbeeld:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema-voorbeeld

U kunt ook een schema opgeven als een JSON-tekenreeks. Als dit bijvoorbeeld het volgende is:/tmp/user.avsc

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

U kunt een JSON-tekenreeks maken:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Gebruik vervolgens het schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Voorbeeld met schemaregister

Als uw cluster een schemaregisterservice heeft, kunt from_avro ermee werken, zodat u het Avro-schema niet handmatig hoeft op te geven.

In het volgende voorbeeld ziet u hoe u een Kafka-onderwerp 't' leest, ervan uitgaande dat de sleutel en waarde al zijn geregistreerd in het schemaregister als onderwerp 't-sleutel' en 't-waarde' van typen STRING en INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
    from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))

Voor to_avrokomt het standaarduitvoer-Avro-schema mogelijk niet overeen met het schema van het doelonderwerp in de Schema Registry-service om de volgende redenen:

  • De toewijzing van een Spark SQL-type aan een Avro-schema is niet een-op-een. Zie ondersteunde typen voor Spark SQL -> Avro-conversie.
  • Als het Avro-uitvoerschema na conversie van recordtype is, wordt de recordnaam topLevelRecord en is er standaard geen naamruimte.

Als het standaarduitvoerschema van to_avro overeenkomt met het schema van het doelonderwerp, kunt u het volgende doen:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Anders moet u het schema van het doelonderwerp opgeven in de to_avro functie:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

verifiëren bij een extern Confluent-schemaregister

In Databricks Runtime 12.2 LTS en hoger kunt u zich verifiëren bij een extern Confluent-schemaregister. In de volgende voorbeelden ziet u hoe u de schemaregisteropties configureert om verificatiereferenties en API-sleutels op te nemen.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
    from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

TrustStore- en keystore-bestanden gebruiken in Unity Catalog-volumes

In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalog-volumes gebruiken om te verifiëren bij een Confluent-schemaregister. Werk de configuratie in het vorige voorbeeld bij met behulp van de volgende syntaxis:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

De modus schemaontwikkeling gebruiken met from_avro

In Databricks Runtime 14.2 en hoger kunt u de evolutiemodus van het schema gebruiken met from_avro. Het inschakelen van de modus schemaontwikkeling zorgt ervoor dat de taak een UnknownFieldException genereert nadat de ontwikkeling van het schema is gedetecteerd. Databricks raadt u aan taken te configureren met de modus schemaontwikkeling om automatisch opnieuw op te starten bij taakfouten. Zie Overwegingen voor productie voor gestructureerd streamen.

Schemaontwikkeling is handig als u verwacht dat het schema van uw brongegevens zich in de loop van de tijd ontwikkelt en alle velden uit uw gegevensbron opneemt. Als uw query's al expliciet aangeven welke velden moeten worden opgevraagd in uw gegevensbron, worden toegevoegde velden genegeerd, ongeacht de ontwikkeling van het schema.

Gebruik de optie avroSchemaEvolutionMode om schemaontwikkeling in te schakelen. In de volgende tabel worden de opties voor de evolutiemodus van het schema beschreven:

Optie Gedrag
none Standaard. Negeert de ontwikkeling van het schema en de taak wordt voortgezet.
restart Genereert een UnknownFieldException bij het detecteren van de ontwikkeling van schema's. Vereist opnieuw opstarten van een taak.

Notitie

U kunt deze configuratie tussen streamingtaken wijzigen en hetzelfde controlepunt opnieuw gebruiken. Het uitschakelen van schemaontwikkeling kan leiden tot verwijderde kolommen.

De parseringsmodus configureren

U kunt de parseringsmodus configureren om te bepalen of u wilt falen of null-records wilt versturen wanneer de modus voor schemaontwikkeling is uitgeschakeld en het schema zich op een niet achterwaarts compatibele manier ontwikkelt. Met standaardinstellingen mislukt from_avro wanneer er incompatibele schemawijzigingen worden waargenomen.

Gebruik de optie om de mode parsemodus op te geven. In de volgende tabel wordt de optie voor de parseringsmodus beschreven:

Optie Gedrag
FAILFAST Standaard. Een parseringsfout genereert een SparkException met een errorClass van MALFORMED_AVRO_MESSAGE.
PERMISSIVE Er wordt een parseringsfout genegeerd en er wordt een null-record verzonden.

Notitie

Als schemaontwikkeling is ingeschakeld, genereert FAILFAST alleen uitzonderingen als een record beschadigd is.

Voorbeeld van het ontwikkelen van schema's en het instellen van de parseringsmodus

In het volgende voorbeeld wordt getoond hoe u de schema-evolutie inschakelt en de FAILFAST-parseringsmodus opgeeft met behulp van een Confluent Schema Registry.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            data = $"key",
            subject = "t-key",
            schemaRegistryAddress = schemaRegistryAddr,
            options = schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      jsonFormatSchema = None,
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)