Odczytywanie i zapisywanie przesyłanych strumieniowo danych systemu Avro
Apache Avro to powszechnie używany system serializacji danych w świecie przesyłania strumieniowego. Typowym rozwiązaniem jest umieszczenie danych w formacie Avro na platformie Apache Kafka, metadanych w rejestrze schematów Confluent, a następnie uruchomienie zapytań za pomocą struktury przesyłania strumieniowego, która łączy się zarówno z platformą Kafka, jak i rejestrem schematów.
Usługa Azure Databricks obsługuje funkcje from_avro
i to_avro
w celu tworzenia potoków przesyłania strumieniowego przy użyciu danych Avro na platformie Kafka i metadanych w Rejestrze Schematów. Funkcja to_avro
koduje kolumnę jako binarną w formacie Avro i from_avro
dekoduje dane binarne Avro do kolumny. Obie funkcje przekształcają jedną kolumnę w inną kolumnę, a typ danych wejściowych/wyjściowych SQL może być typem złożonym lub typem pierwotnym.
Uwaga
Funkcje from_avro
i to_avro
:
- Są dostępne w językach Python, Scala i Java.
- Można przekazać do funkcji SQL zarówno w zapytaniach wsadowych, jak i przesyłanych strumieniowo.
Zobacz również źródło danych pliku Avro.
Przykład ręcznie określonego schematu
Podobnie jak from_json i to_json, można użyć from_avro
i to_avro
z dowolną kolumną binarną. Schemat Avro można określić ręcznie, jak w poniższym przykładzie:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Przykład jsonFormatSchema
Możesz również określić schemat jako ciąg JSON. Jeśli na przykład /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Możesz utworzyć ciąg JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Następnie użyj schematu w from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Przykład z rejestrem schematów
Jeśli klaster ma usługę rejestru schematów, from_avro
może z nim pracować, aby nie trzeba było ręcznie określać schematu Avro.
W poniższym przykładzie pokazano odczytywanie topicu Kafka "t", przy założeniu, że klucz i wartość są już zarejestrowane w Rejestrze Schematów jako tematy "t-key" i "t-value" typów STRING
i INT
.
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
W przypadku to_avro
domyślny schemat wyjściowy Avro może nie być zgodny ze schematem podmiotu docelowego w usłudze Rejestru schematów z następujących powodów:
- Mapowanie z typu Spark SQL na schemat Avro nie jest jednoznaczne. Zobacz > Spark SQL — Avro.
- Jeśli przekonwertowany schemat danych wyjściowych Avro jest typu rekordu, nazwa rekordu jest
topLevelRecord
i domyślnie nie ma przestrzeni nazw.
Jeśli domyślny schemat wyjściowy to_avro
jest zgodny ze schematem podmiotu docelowego, możesz wykonać następujące czynności:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
W przeciwnym razie należy podać schemat podmiotu docelowego w funkcji to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Uwierzytelnij w zewnętrznym rejestrze schematów Confluent
W środowisku Databricks Runtime 12.2 LTS i nowszym można uwierzytelnić się w zewnętrznym rejestrze schematów Confluent. W poniższych przykładach pokazano, jak skonfigurować opcje rejestru schematów w celu uwzględnienia poświadczeń uwierzytelniania i kluczy interfejsu API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Używanie plików magazynu zaufania i magazynu kluczy w woluminach Katalogu Unity
W Databricks Runtime 14.3 LTS i nowszych można używać plików truststore i keystore w woluminach Unity Catalog do uwierzytelniania w Rejestrze Schematów Confluent. Zaktualizuj konfigurację w poprzednim przykładzie przy użyciu następującej składni:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Korzystaj z trybu ewolucji schematu z from_avro
W środowisku Databricks Runtime 14.2 lub nowszym można użyć trybu ewolucji schematu z from_avro
. Włączenie trybu ewolucji schematu powoduje, że zadanie rzuca UnknownFieldException
po wykryciu ewolucji schematu. Usługa Databricks zaleca skonfigurowanie zadań w trybie ewolucji schematu w celu automatycznego ponownego uruchomienia w przypadku niepowodzenia zadania. Zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.
Ewolucja schematu jest przydatna, jeśli oczekujesz, że schemat danych źródłowych będzie ewoluować wraz z upływem czasu i pozyskać wszystkie pola ze źródła danych. Jeśli zapytania już jawnie określą pola do wykonywania zapytań w źródle danych, dodane pola są ignorowane niezależnie od ewolucji schematu.
Użyj opcji avroSchemaEvolutionMode
, aby włączyć ewolucję schematu. W poniższej tabeli opisano opcje trybu ewolucji schematu:
Opcja | Zachowanie |
---|---|
none |
Domyślne. Ignoruje ewolucję schematu, a zadanie jest kontynuowane. |
restart |
Wyrzuca błąd UnknownFieldException podczas wykrywania ewolucji schematu. Wymaga ponownego uruchomienia zadania. |
Uwaga
Tę konfigurację można zmienić między zadaniami przesyłania strumieniowego i ponownie użyć tego samego punktu kontrolnego. Wyłączenie ewolucji schematu może spowodować usunięcie kolumn.
Konfigurowanie trybu analizy
Możesz skonfigurować sposób parsowania, aby określić, czy chcesz zgłaszać błąd, czy emitować rekordy o wartości null, gdy tryb ewolucji schematu jest wyłączony, a schemat ewoluuje w sposób niekompatybilny wstecz. W przypadku ustawień domyślnych from_avro
kończy się niepowodzeniem, gdy napotyka niezgodne zmiany schematu.
mode
Użyj opcji , aby określić tryb analizy. W poniższej tabeli opisano opcję trybu analizy:
Opcja | Zachowanie |
---|---|
FAILFAST |
Domyślne. Błąd analizowania zgłasza SparkException błąd z wartością errorClass MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Błąd analizowania jest ignorowany i emitowany jest rekord o wartości null. |
Uwaga
Po włączeniu ewolucji schematu FAILFAST
zgłasza wyjątki tylko wtedy, gdy rekord jest uszkodzony.
Przykład użycia ewolucji schematu i ustawiania trybu analizy
W poniższym przykładzie pokazano włączanie ewolucji schematu i określanie trybu parsowania FAILFAST
z Rejestrem Schematów Confluent.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)