Partilhar via


Configurar schema inferência e evolução no Auto Loader

Você pode configurar o Auto Loader para detetar automaticamente o schema de dados carregados, permitindo que você inicialize tables sem declarar explicitamente os dados schema e evolua o tableschema à medida que novos columns são introduzidos. Isso elimina a necessidade de acompanhar e aplicar manualmente as alterações schema ao longo do tempo.

O Auto Loader também pode "resgatar" dados que foram inesperados (por exemplo, de diferentes tipos de dados) em um blob JSON column, que você pode optar por acessar mais tarde usando as APIs de acesso a dados semiestruturados .

Os seguintes formatos são suportados para inferência schema e evolução:

File format Versões suportadas
JSON Todas as versões
CSV Todas as versões
XML Databricks Runtime 14.3 LTS e superior
Avro Databricks Runtime 10.4 LTS e superior
Parquet Databricks Runtime 11.3 LTS e superior
ORC Não suportado
Text Não aplicável (fixo-schema)
Binaryfile Não aplicável (fixo-schema)

Sintaxe para inferência e evolução schema

Especificar um diretório de destino para a opção cloudFiles.schemaLocation permite inferência e evolução schema. Pode optar por utilizar o mesmo diretório especificado para o checkpointLocation. Se usar Delta Live Tables, o Azure Databricks gerirá automaticamente a localização schema e outras informações de ponto de verificação.

Nota

Se tiver mais do que uma localização de origem de dados a ser carregada no destino table, cada processo de ingestão do Auto Loader requer um ponto de verificação de streaming separado.

O exemplo a seguir usa parquet para o cloudFiles.format. Use csv, avroou json para outras fontes de arquivo. Todas as outras configurações de leitura e gravação permanecem as mesmas para os comportamentos padrão para cada formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Como funciona a inferência do Auto Loader schema?

Para inferir o schema ao ler os dados pela primeira vez, o Auto Loader obtém amostras dos primeiros arquivos de 50 GB ou 1000 que descobrir, o que limit for cruzado primeiro. O Auto Loader armazena as informações de schema num diretório _schemas na configuração cloudFiles.schemaLocation para monitorizar as alterações schema nos dados de entrada ao longo do tempo.

Nota

Para alterar o tamanho do exemplo usado, você pode set as configurações SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(byte string, por exemplo 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(inteiro)

Por padrão, a inferência de schema do Auto Loader procura evitar problemas de evolução schema devido a incompatibilidades de tipo. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Auto Loader infere todos os columns como cadeias de caracteres (incluindo campos aninhados em arquivos JSON). Para formatos com tipo schema (Parquet e Avro), o Auto Loader captura amostras de um subconjunto de arquivos e mescla os esquemas desses arquivos. Esse comportamento é resumido no seguinte table:

File format Tipo de dados inferidos padrão
JSON String
CSV String
XML String
Avro Tipos codificados em Avro schema
Parquet Tipos codificados em Parquet schema

O Apache Spark DataFrameReader usa um comportamento diferente para inferência schema, selecionando tipos de dados para columns em fontes JSON, CSV e XML com base em dados de exemplo. Para habilitar esse comportamento com o Auto Loader, set a opção cloudFiles.inferColumnTypes para true.

Nota

Ao inferir o schema para dados CSV, o Auto Loader assume que os arquivos contêm cabeçalhos. Se os seus ficheiros CSV não contiverem cabeçalhos, forneça a opção .option("header", "false"). Além disso, o Auto Loader mescla os esquemas de todos os arquivos na amostra para criar um schemaglobal. Auto Loader pode então ler cada arquivo de acordo com seu cabeçalho e analisar o CSV corretamente.

Nota

Quando um column tem diferentes tipos de dados em dois ficheiros Parquet, o Auto Loader escolhe o tipo mais amplo. Você pode usar schemaHints para substituir essa escolha. Quando você especifica schema dicas, o Auto Loader não converte o column para o tipo especificado, mas diz ao leitor de Parquet para ler o column como o tipo especificado. No caso de uma incompatibilidade, o column é resgatado nos dados resgatados column.

Como funciona a evolução do Auto Loader schema?

O Auto Loader deteta a adição de novos columns à medida que processa os seus dados. Quando o Auto Loader deteta um novo column, o fluxo é interrompido por um UnknownFieldException. Antes que seu fluxo lance esse erro, o Auto Loader executa schema inferência no microlote de dados mais recente e atualiza o local do schema com o schema mais recente, mesclando novos columns até o final do schema. Os tipos de dados dos columns existentes permanecem inalterados.

A Databricks recomenda configurar os fluxos do Auto Loader com Databricks Jobs para reiniciar automaticamente após essas schema alterações.

Auto Loader suporta os seguintes modos para schema evolução, que você set na opção cloudFiles.schemaEvolutionMode:

Modo Comportamento na leitura de novas column
addNewColumns (padrão) O fluxo falha. Novos columns são adicionados ao schema. Os columns existentes não evoluem os tipos de dados.
rescue Schema nunca é alterado e o fluxo de dados não falha devido às alterações de schema. Todos os novos columns são gravados nos dados resgatados column.
failOnNewColumns O fluxo falha. O fluxo não é reiniciado a menos que o schema fornecido seja atualizado ou o arquivo de dados ofensivo seja removido.
none Não evolui o schema, novos columns são ignorados e os dados não são resgatados a menos que a opção rescuedDataColumn seja set. O fluxo não falha devido a alterações no schema.

Nota

addNewColumns modo é o padrão quando um schema não é fornecido, mas none é o padrão quando você fornece um schema. addNewColumns não é permitido quando a schema do fluxo é fornecida, mas funciona se você fornecer seu schema como uma dica schema.

Como funcionam as partições com o Carregador Automático?

O Auto Loader tenta inferir partitioncolumns a partir da estrutura de diretórios subjacente dos dados se os dados estiverem dispostos no particionamento no estilo Hive. Por exemplo, o caminho do arquivo base_path/event=click/date=2021-04-01/f0.json resulta na inferência de date e event como partitioncolumns. Se a estrutura de diretórios subjacente contiver partições Hive conflitantes ou não contiver particionamento no estilo Hive, partitioncolumns serão ignoradas.

Os formatos de arquivo binário (binaryFile) e text têm esquemas de dados fixos, mas suportam inferência partitioncolumn. O Databricks recomenda a configuração cloudFiles.schemaLocation para esses formatos de arquivo. Isso evita possíveis erros ou perda de informações e impede a inferência de partições columns cada vez que um carregador automático começa.

Partition columns não são considerados para a evolução de schema. Se tivesses uma estrutura de diretório inicial como base_path/event=click/date=2021-04-01/f0.jsone começasses a receber novos arquivos como base_path/event=click/date=2021-04-01/hour=01/f1.json, o Auto Loader ignora a hora column. Para capturar informações para as novas partitioncolumns, setcloudFiles.partitionColumns e event,date,hour.

Nota

A opção cloudFiles.partitionColumns usa uma list separada por vírgulas de column nomes. Apenas os columns que existem como pares key=value na sua estrutura de diretórios são analisados.

Quais são os dados resgatados column?

Quando o Auto Loader infere o schema, dados resgatados de column são automaticamente adicionados ao seu schema como _rescued_data. Você pode renomear o column ou incluí-lo em casos de where ao fornecer um schema definindo a opção rescuedDataColumn.

Os dados resgatados column garantem que columns que não correspondem ao schema sejam resgatados em vez de descartados. Os dados resgatados column contêm qualquer dado que não seja analisado pelos seguintes motivos:

  • O column está ausente do schema.
  • Incompatibilidades de tipo.
  • Incompatibilidades de casos.

O dado resgatado column inclui um JSON com o columns resgatado e o caminho do arquivo de origem do registo.

Nota

Os analisadores JSON e CSV suportam três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando usado em conjunto com rescuedDataColumno , as incompatibilidades de tipo de dados não fazem com que os registros sejam descartados no DROPMALFORMED modo ou gerem um erro no FAILFAST modo. Somente registros corrompidos são descartados ou geram erros, como JSON ou CSV incompletos ou malformados. Se você usar badRecordsPath ao analisar JSON ou CSV, as incompatibilidades de tipo de dados não serão consideradas registros incorretos ao usar o rescuedDataColumn. Somente registros JSON ou CSV incompletos e malformados são armazenados no badRecordsPath.

Alterar o comportamento que diferencia maiúsculas de minúscul

A menos que a diferenciação entre maiúsculas e minúsculas esteja ativada, os columnsabc, Abce ABC são considerados iguais ao column para efeitos de inferência schema. O caso escolhido é arbitrário e depende dos dados amostrados. Você pode usar schema dicas para impor qual caso deve ser usado. Uma vez que uma seleção tenha sido feita e o schema seja inferido, o Auto Loader não considera as variantes de carcaça que não foram selecionadas consistentes com o schema.

Quando column de dados resgatados está habilitada, os campos nomeados em um caso diferente do schema são carregados para o _rescued_datacolumn. Altere esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê dados de forma que não diferencia maiúsculas de minúsculas.

Substituir schema inferência por schema dicas

Você pode usar schema dicas para impor as informações schema que você sabe e espera em um schemainferido. Quando sabes que um column é de um tipo de dados específico, ou se quiseres escolher um tipo de dados mais geral (por exemplo, um double em vez de um integer), podes fornecer um número arbitrário de dicas para tipos de dados column como uma string usando a sintaxe de especificação SQL schema, tal como o seguinte:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consulte a documentação sobre os tipos de dados para a list dos tipos de dados suportados.

Se um column não estiver presente no início do fluxo, pode também usar sugestões de schema para adicionar esse column ao schemainferido.

Aqui está um exemplo de uma schema inferida para observar o comportamento com as sugestões de schema.

Inferido schema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Especificando os seguintes schema indicadores:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

Você get:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota

O suporte a dicas de schema de matriz e mapa está disponível no Databricks Runtime 9.1 LTS e superior.

Aqui está um exemplo de um schema inferido com tipos de dados complexos para ver o comportamento com as sugestões de schema.

Inferido schema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Especificando as seguintes schema dicas:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

Você get:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota

Schema dicas são usadas apenas se você não fornecer schema para o Auto Loader. Você pode usar schema dicas se cloudFiles.inferColumnTypes está habilitado ou desativado.