Sdílet prostřednictvím


Čtení a zápis streamovaných dat Avro

Apache Avro je běžně používaný systém serializace dat ve světě streamování. Typickým řešením je umístit data ve formátu Avro do Apache Kafka, metadata v registru schémat Confluent a pak spouštět dotazy s architekturou streamování, která se připojuje k kafka i registru schémat.

Azure Databricks podporuje from_avro a to_avro funkce pro vytváření streamovacích kanálů s daty Avro v kafka a metadatech v registru schémat. Funkce to_avro kóduje sloupec jako binární ve formátu Avro a from_avro dekóduje binární data Avro do sloupce. Obě funkce transformují jeden sloupec do jiného sloupce a vstupní a výstupní datový typ SQL může být složitý nebo primitivní typ.

Poznámka:

Funkce from_avro a to_avro funkce:

  • Jsou dostupné v Pythonu, Scala a Javě.
  • Funkce SQL je možné předat v dávkových i streamovaných dotazech.

Viz také zdroj dat souboru Avro.

Příklad ručně zadaného schématu

Podobně jako from_json a to_json můžete použít a to_avro s from_avro libovolným binárním sloupcem. Schéma Avro můžete zadat ručně, jak je znázorněno v následujícím příkladu:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

příklad jsonFormatSchema

Schéma můžete také zadat jako řetězec JSON. Pokud je to například /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Můžete vytvořit řetězec JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Pak použijte schéma v from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Příklad s registrem schématu

Pokud má váš cluster službu Registr schématu, můžete s ním pracovat, from_avro abyste nemuseli schéma Avro zadávat ručně.

Následující příklad ukazuje čtení tématu Kafka "t", za předpokladu, že klíč a hodnota jsou již registrovány v registru schémat jako předměty "t-key" a "t-value" typů STRING a INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Výchozí to_avrovýstupní schéma Avro nemusí odpovídat schématu cílového subjektu ve službě Registr schématu z následujících důvodů:

  • Mapování typu Spark SQL na schéma Avro není 1:1. Viz Podporované typy pro převod Spark SQL –> Avro.
  • Pokud je převedené výstupní schéma Avro typu záznamu, název záznamu je topLevelRecord a ve výchozím nastavení neexistuje žádný obor názvů.

Pokud výchozí výstupní schéma to_avro odpovídá schématu cílového předmětu, můžete udělat toto:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

V opačném případě je nutné zadat schéma cílového předmětu to_avro ve funkci:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Ověření v externím registru schématu Confluent

Ve službě Databricks Runtime 12.2 LTS a novějších můžete provést ověření v externím registru schématu Confluent. Následující příklady ukazují, jak nakonfigurovat možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity

Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte konfiguraci v předchozím příkladu pomocí následující syntaxe:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Použití režimu vývoje schématu s využitím from_avro

V Databricks Runtime 14.2 a novějších můžete použít režim vývoje schématu s from_avro. Povolení režimu vývoje schématu způsobí, že úloha vyvolá UnknownFieldException po zjištění vývoje schématu. Databricks doporučuje konfigurovat úlohy s režimem vývoje schématu, aby se automaticky restartoval při selhání úlohy. Viz aspekty produkce strukturovaného streamování.

Vývoj schématu je užitečný, pokud očekáváte, že se schéma zdrojových dat v průběhu času vyvíjí a ingestuje všechna pole z vašeho zdroje dat. Pokud už dotazy explicitně určují, která pole se mají dotazovat ve zdroji dat, přidaná pole se ignorují bez ohledu na vývoj schématu.

avroSchemaEvolutionMode Tuto možnost použijte k povolení vývoje schématu. Následující tabulka popisuje možnosti režimu vývoje schématu:

Možnost Chování
none Výchozí. Ignoruje vývoj schématu a úloha pokračuje.
restart UnknownFieldException Vyvolá při zjišťování vývoje schématu. Vyžaduje restartování úlohy.

Poznámka:

Tuto konfiguraci můžete změnit mezi úlohami streamování a znovu použít stejný kontrolní bod. Zakázání vývoje schématu může vést k vyřazení sloupců.

Konfigurace režimu analýzy

Režim analýzy můžete nakonfigurovat tak, aby určil, jestli chcete selhat nebo generovat záznamy null, pokud je režim vývoje schématu zakázaný a schéma se vyvíjí zpětně kompatibilním způsobem. Při výchozím nastavení dojde k selhání při from_avro sledování nekompatibilních změn schématu.

mode Pomocí možnosti můžete určit režim analýzy. Následující tabulka popisuje možnost parsování režimu:

Možnost Chování
FAILFAST Výchozí. Při analýze dojde k chybě s příponou SparkException errorClass MALFORMED_AVRO_MESSAGE.
PERMISSIVE Chyba analýzy se ignoruje a vygeneruje se záznam null.

Poznámka:

S povoleným FAILFAST vývojem schématu vyvolá výjimky pouze v případě poškození záznamu.

Příklad použití vývoje schématu a nastavení režimu analýzy

Následující příklad ukazuje povolení vývoje schématu a určení FAILFAST režimu parsování s registrem schémat Confluent:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)