Udostępnij za pośrednictwem


Odczytywanie i zapisywanie przesyłanych strumieniowo danych systemu Avro

Apache Avro to powszechnie używany system serializacji danych w świecie przesyłania strumieniowego. Typowym rozwiązaniem jest umieszczenie danych w formacie Avro na platformie Apache Kafka, metadanych w rejestrze schematów platformy Confluent, a następnie uruchamianie zapytań za pomocą struktury przesyłania strumieniowego, która łączy się zarówno z platformą Kafka, jak i rejestrem schematów.

Usługa Azure Databricks obsługuje from_avro funkcje i to_avro do tworzenia potoków przesyłania strumieniowego przy użyciu danych Avro na platformie Kafka i metadanych w rejestrze schematów. Funkcja to_avro koduje kolumnę jako binarną w formacie Avro i from_avro dekoduje dane binarne Avro do kolumny. Obie funkcje przekształcają jedną kolumnę w inną kolumnę, a typ danych wejściowych/wyjściowych SQL może być typem złożonym lub typem pierwotnym.

Uwaga

Funkcje from_avro i to_avro :

  • Są dostępne w językach Python, Scala i Java.
  • Można przekazać do funkcji SQL zarówno w zapytaniach wsadowych, jak i przesyłanych strumieniowo.

Zobacz również źródło danych pliku Avro.

Przykład ręcznie określonego schematu

Podobnie jak from_json i to_json, można użyć elementów from_avro i to_avro z dowolną kolumną binarną. Schemat Avro można określić ręcznie, jak w poniższym przykładzie:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

Przykład jsonFormatSchema

Możesz również określić schemat jako ciąg JSON. Jeśli na przykład /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Możesz utworzyć ciąg JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Następnie użyj schematu w pliku from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Przykład z rejestrem schematów

Jeśli klaster ma usługę rejestru schematów, może z nim pracować, from_avro aby nie trzeba było ręcznie określać schematu Avro.

W poniższym przykładzie pokazano czytanie tematu platformy Kafka "t", przy założeniu, że klucz i wartość są już zarejestrowane w rejestrze schematów jako podmioty "t-key" i "t-value" typów STRING i INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

W przypadku to_avroprogramu domyślny schemat wyjściowy Avro może nie być zgodny ze schematem podmiotu docelowego w usłudze Rejestru schematów z następujących powodów:

  • Mapowanie z typu Spark SQL na schemat Avro nie jest jeden do jednego. Zobacz Obsługiwane typy konwersji Spark SQL —> Avro.
  • Jeśli przekonwertowany schemat danych wyjściowych Avro jest typu rekordu, nazwa rekordu to topLevelRecord i domyślnie nie ma przestrzeni nazw.

Jeśli domyślny schemat to_avro wyjściowy elementu odpowiada schematowi podmiotu docelowego, możesz wykonać następujące czynności:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

W przeciwnym razie należy podać schemat podmiotu docelowego w to_avro funkcji :

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Uwierzytelnianie w zewnętrznym rejestrze schematów confluent

W środowisku Databricks Runtime 12.2 LTS i nowszym można uwierzytelnić się w zewnętrznym rejestrze schematów confluent. W poniższych przykładach pokazano, jak skonfigurować opcje rejestru schematów w celu uwzględnienia poświadczeń uwierzytelniania i kluczy interfejsu API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Używanie plików magazynu zaufania i magazynu kluczy w woluminach wykazu aparatu Unity

W środowisku Databricks Runtime 14.3 LTS i nowszym można użyć plików magazynu zaufania i magazynu kluczy w woluminach wykazu aparatu Unity, aby uwierzytelnić się w rejestrze schematów platformy Confluent. Zaktualizuj konfigurację w poprzednim przykładzie przy użyciu następującej składni:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Używanie trybu ewolucji schematu z from_avro

W środowisku Databricks Runtime 14.2 lub nowszym można użyć trybu ewolucji schematu z from_avroprogramem . Włączenie trybu ewolucji schematu powoduje, że zadanie zgłasza błąd UnknownFieldException po wykryciu ewolucji schematu. Usługa Databricks zaleca skonfigurowanie zadań w trybie ewolucji schematu w celu automatycznego ponownego uruchomienia w przypadku niepowodzenia zadania. Zobacz Zagadnienia dotyczące produkcji przesyłania strumieniowego ze strukturą.

Ewolucja schematu jest przydatna, jeśli oczekujesz, że schemat danych źródłowych będzie ewoluować wraz z upływem czasu i pozyskać wszystkie pola ze źródła danych. Jeśli zapytania już jawnie określą pola do wykonywania zapytań w źródle danych, dodane pola są ignorowane niezależnie od ewolucji schematu.

avroSchemaEvolutionMode Użyj opcji , aby włączyć ewolucję schematu. W poniższej tabeli opisano opcje trybu ewolucji schematu:

Opcja Zachowanie
none Domyślne. Ignoruje ewolucję schematu, a zadanie jest kontynuowane.
restart Zgłasza błąd UnknownFieldException podczas wykrywania ewolucji schematu. Wymaga ponownego uruchomienia zadania.

Uwaga

Tę konfigurację można zmienić między zadaniami przesyłania strumieniowego i ponownie użyć tego samego punktu kontrolnego. Wyłączenie ewolucji schematu może spowodować usunięcie kolumn.

Konfigurowanie trybu analizy

Możesz skonfigurować tryb analizy, aby określić, czy chcesz uruchomić rekordy o wartości fail lub emitować rekordy o wartości null, gdy tryb ewolucji schematu jest wyłączony, a schemat ewoluuje w sposób niezgodny z poprzednimi wersjami. W przypadku ustawień domyślnych kończy się niepowodzeniem, from_avro gdy obserwuje niezgodne zmiany schematu.

mode Użyj opcji , aby określić tryb analizy. W poniższej tabeli opisano opcję trybu analizy:

Opcja Zachowanie
FAILFAST Domyślne. Błąd analizowania zgłasza SparkException błąd z wartością errorClass MALFORMED_AVRO_MESSAGE.
PERMISSIVE Błąd analizowania jest ignorowany i emitowany jest rekord o wartości null.

Uwaga

Po włączeniu ewolucji schematu zgłasza wyjątki tylko wtedy, FAILFAST gdy rekord jest uszkodzony.

Przykład użycia ewolucji schematu i trybu analizowania ustawień

W poniższym przykładzie pokazano włączanie ewolucji schematu i określanie FAILFAST trybu analizowania przy użyciu rejestru schematów confluent:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)