Čtení a zápis streamovaných dat Avro
Apache Avro je běžně používaný systém serializace dat ve světě streamování. Typickým řešením je umístit data ve formátu Avro do Apache Kafka, metadata v registru schémat Confluent a pak spouštět dotazy s architekturou streamování, která se připojuje k kafka i registru schémat.
Azure Databricks podporuje from_avro
a to_avro
funkce pro vytváření streamovacích kanálů s daty Avro v kafka a metadatech v registru schémat. Funkce to_avro
kóduje sloupec jako binární ve formátu Avro a from_avro
dekóduje binární data Avro do sloupce. Obě funkce transformují jeden sloupec do jiného sloupce a vstupní a výstupní datový typ SQL může být složitý nebo primitivní typ.
Poznámka:
Funkce from_avro
a to_avro
funkce:
- Jsou dostupné v Pythonu, Scala a Javě.
- Funkce SQL je možné předat v dávkových i streamovaných dotazech.
Viz také zdroj dat souboru Avro.
Příklad ručně zadaného schématu
Podobně jako from_json a to_json můžete použít a to_avro
s from_avro
libovolným binárním sloupcem. Schéma Avro můžete zadat ručně, jak je znázorněno v následujícím příkladu:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
příklad jsonFormatSchema
Schéma můžete také zadat jako řetězec JSON. Pokud je to například /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Můžete vytvořit řetězec JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Pak použijte schéma v from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Příklad s registrem schématu
Pokud má váš cluster službu Registr schématu, můžete s ním pracovat, from_avro
abyste nemuseli schéma Avro zadávat ručně.
Následující příklad ukazuje čtení tématu Kafka "t", za předpokladu, že klíč a hodnota jsou již registrovány v registru schémat jako předměty "t-key" a "t-value" typů STRING
a INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Výchozí to_avro
výstupní schéma Avro nemusí odpovídat schématu cílového subjektu ve službě Registr schématu z následujících důvodů:
- Mapování typu Spark SQL na schéma Avro není 1:1. Viz Podporované typy pro převod Spark SQL –> Avro.
- Pokud je převedené výstupní schéma Avro typu záznamu, název záznamu je
topLevelRecord
a ve výchozím nastavení neexistuje žádný obor názvů.
Pokud výchozí výstupní schéma to_avro
odpovídá schématu cílového předmětu, můžete udělat toto:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
V opačném případě je nutné zadat schéma cílového předmětu to_avro
ve funkci:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Ověření v externím registru schématu Confluent
Ve službě Databricks Runtime 12.2 LTS a novějších můžete provést ověření v externím registru schématu Confluent. Následující příklady ukazují, jak nakonfigurovat možnosti registru schématu tak, aby zahrnovaly přihlašovací údaje ověřování a klíče rozhraní API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Použití úložiště důvěryhodnosti a souborů úložiště klíčů ve svazcích katalogu Unity
Ve službě Databricks Runtime 14.3 LTS a vyšší můžete k ověření v registru schémat Confluent použít úložiště důvěryhodnosti a soubory úložiště klíčů ve svazcích katalogu Unity. Aktualizujte konfiguraci v předchozím příkladu pomocí následující syntaxe:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Použití režimu vývoje schématu s využitím from_avro
V Databricks Runtime 14.2 a novějších můžete použít režim vývoje schématu s from_avro
. Povolení režimu vývoje schématu způsobí, že úloha vyvolá UnknownFieldException
po zjištění vývoje schématu. Databricks doporučuje konfigurovat úlohy s režimem vývoje schématu, aby se automaticky restartoval při selhání úlohy. Viz aspekty produkce strukturovaného streamování.
Vývoj schématu je užitečný, pokud očekáváte, že se schéma zdrojových dat v průběhu času vyvíjí a ingestuje všechna pole z vašeho zdroje dat. Pokud už dotazy explicitně určují, která pole se mají dotazovat ve zdroji dat, přidaná pole se ignorují bez ohledu na vývoj schématu.
avroSchemaEvolutionMode
Tuto možnost použijte k povolení vývoje schématu. Následující tabulka popisuje možnosti režimu vývoje schématu:
Možnost | Chování |
---|---|
none |
Výchozí. Ignoruje vývoj schématu a úloha pokračuje. |
restart |
UnknownFieldException Vyvolá při zjišťování vývoje schématu. Vyžaduje restartování úlohy. |
Poznámka:
Tuto konfiguraci můžete změnit mezi úlohami streamování a znovu použít stejný kontrolní bod. Zakázání vývoje schématu může vést k vyřazení sloupců.
Konfigurace režimu analýzy
Režim analýzy můžete nakonfigurovat tak, aby určil, jestli chcete selhat nebo generovat záznamy null, pokud je režim vývoje schématu zakázaný a schéma se vyvíjí zpětně kompatibilním způsobem. Při výchozím nastavení dojde k selhání při from_avro
sledování nekompatibilních změn schématu.
mode
Pomocí možnosti můžete určit režim analýzy. Následující tabulka popisuje možnost parsování režimu:
Možnost | Chování |
---|---|
FAILFAST |
Výchozí. Při analýze dojde k chybě s příponou SparkException errorClass MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Chyba analýzy se ignoruje a vygeneruje se záznam null. |
Poznámka:
S povoleným FAILFAST
vývojem schématu vyvolá výjimky pouze v případě poškození záznamu.
Příklad použití vývoje schématu a nastavení režimu analýzy
Následující příklad ukazuje povolení vývoje schématu a určení FAILFAST
režimu parsování s registrem schémat Confluent:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)