Formato Delta no Azure Data Factory
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Dica
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!
Este artigo destaca como copiar dados de e para um data lake armazenado no Azure Data Lake Store Gen2 ou no Armazenamento de Blobs do Azure usando o formato delta. Esse conector está disponível como um conjunto de dados embutido no fluxo de dados de mapeamento como uma fonte e um coletor.
Propriedades do fluxo de dados de mapeamento
Esse conector está disponível como um conjunto de dados embutido no fluxo de dados de mapeamento como uma fonte e um coletor.
Propriedades da Origem
A tabela abaixo lista as propriedades com suporte por uma fonte delta. Você pode editar essas propriedades na guia Opções de Origem.
Nome | Descrição | Obrigatório | Valores permitidos | Propriedade do script do Fluxo de Dados |
---|---|---|---|---|
Formatar | O formato deve ser delta |
sim | delta |
format |
Sistema de arquivos | O sistema de contêiner/arquivos do delta lake | sim | String | fileSystem |
Caminho da pasta | O diretório do delta lake | sim | String | folderPath |
Tipo de compactação | O tipo de compactação da tabela delta | não | bzip2 gzip deflate ZipDeflate snappy lz4 |
compressionType |
Nível de Compactação | Escolha se a compactação será concluída o mais rapidamente possível ou se o arquivo resultante deve ter sua compactação otimizada. | obrigatório se compressedType for especificado. |
Optimal ou Fastest |
compressionLevel |
Viagem no tempo | Escolha se deseja consultar um instantâneo mais antigo de uma tabela delta | não | Consultar por carimbo de data/hora: Timestamp Consultar por versão: Inteiro |
timestampAsOf versionAsOf |
Permitir nenhum arquivo encontrado | Se for true, um erro não será gerado caso nenhum arquivo for encontrado | não | true ou false |
ignoreNoFilesFound |
Importar esquema
O Delta só está disponível como um conjunto de dados embutido e, por padrão, não tem um esquema associado. Para obter metadados de coluna, clique no botão Importar esquema na guia Projeção. Isso permite que você referencie os nomes de colunas e os tipos de dados especificados pelo corpus. Para importar o esquema, uma sessão de depuração de fluxo de dados deve estar ativa e você deve ter um arquivo de definição de entidade CDM existente para o qual apontar.
Exemplo de script de origem delta
source(output(movieId as integer,
title as string,
releaseDate as date,
rated as boolean,
screenedOn as timestamp,
ticketPrice as decimal(10,2)
),
store: 'local',
format: 'delta',
versionAsOf: 0,
allowSchemaDrift: false,
folderPath: $tempPath + '/delta'
) ~> movies
Propriedades do coletor
A tabela abaixo lista as propriedades compatíveis com uma fonte delta. Você pode editar essas propriedades na guia Configurações.
Nome | Descrição | Obrigatório | Valores permitidos | Propriedade do script do Fluxo de Dados |
---|---|---|---|---|
Formatar | O formato deve ser delta |
sim | delta |
format |
Sistema de arquivos | O sistema de contêiner/arquivos do delta lake | sim | String | fileSystem |
Caminho da pasta | O diretório do delta lake | sim | String | folderPath |
Tipo de compactação | O tipo de compactação da tabela delta | não | bzip2 gzip deflate ZipDeflate snappy lz4 TarGZip tar |
compressionType |
Nível de Compactação | Escolha se a compactação será concluída o mais rapidamente possível ou se o arquivo resultante deve ter sua compactação otimizada. | obrigatório se compressedType for especificado. |
Optimal ou Fastest |
compressionLevel |
Vacuum | Exclui arquivos mais antigos do que a duração especificada que não é mais relevante para a versão atual da tabela. Quando um valor de 0 ou menos é especificado, a operação de vácuo não é executada. | sim | Integer | vácuo |
Ação tabela | Informa ao ADF o que fazer com a tabela Delta de destino no coletor. Você pode deixá-lo como está e acrescentar novas linhas, substituir a definição de tabela e os dados existentes com novos metadados e dados, ou manter a estrutura de tabela existente, mas primeiro truncar todas as linhas e, em seguida, inserir as novas linhas. | não | Nenhum, truncado, substituir | deltaTruncate, substituir |
Método Update | Ao selecionar "Permitir inserção" sozinho ou gravar em uma nova tabela delta, o destino recebe todas as linhas de entrada, independentemente das políticas de linha definidas. Se seus dados contiverem linhas de outras políticas de linha, elas precisarão ser excluídas usando uma transformação de filtro anterior. Quando todos os métodos de atualização são selecionados, uma mesclagem é executada, na qual as linhas são inseridas/excluídas/upserted/atualizadas de acordo com o conjunto de políticas de linha usando uma transformação de Alterar Linha. |
sim | true ou false |
insertable deletable upsertable Pode ser atualizado |
Gravação otimizada | Obtenha uma taxa de transferência mais alta para a operação de gravação por meio da otimização do modo aleatório interno em executores do Spark. Como resultado, você pode notar menos partições e arquivos de tamanho maior | não | true ou false |
optimizedWrite: true |
Compactar automaticamente | Após a conclusão de qualquer operação de gravação, o Spark executará automaticamente o OPTIMIZE comando para reorganizar os dados, resultando em mais partições, se necessário, para melhor leitura do desempenho no futuro |
não | true ou false |
autoCompact: true |
Exemplo de script do coletor Delta
O script de fluxo de dados associado é:
moviesAltered sink(
input(movieId as integer,
title as string
),
mapColumn(
movieId,
title
),
insertable: true,
updateable: true,
deletable: true,
upsertable: false,
keys: ['movieId'],
store: 'local',
format: 'delta',
vacuum: 180,
folderPath: $tempPath + '/delta'
) ~> movieDB
Coletor delta com remoção de partições
Com essa opção no método de atualização acima (ou seja, atualizar/executar upsert/excluir), você pode limitar o número de partições inspecionadas. Somente as partições que satisfazem essa condição são obtidas do armazenamento de destino. Você pode especificar um conjunto fixo de valores que uma coluna de partição pode assumir.
Exemplo de script do coletor delta com remoção de partições
Um script de exemplo é fornecido a seguir.
DerivedColumn1 sink(
input(movieId as integer,
title as string
),
allowSchemaDrift: true,
validateSchema: false,
format: 'delta',
container: 'deltaContainer',
folderPath: 'deltaPath',
mergeSchema: false,
autoCompact: false,
optimizedWrite: false,
vacuum: 0,
deletable:false,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
pruneCondition:['part_col' -> ([5, 8])],
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink2
O Delta lerá apenas 2 partições em que part_col == 5 e 8 do repositório delta de destino, em vez de todas as partições. part_col é uma coluna pela qual os dados delta de destino são particionados. Ela não precisa estar presente nos dados de origem.
Opções de otimização do coletor delta
Na guia Configurações, você encontra mais três opções para otimizar a transformação do coletor delta.
Quando a opção Mesclar esquema está habilitada, ela permite a evolução do esquema, ou seja, todas as colunas presentes no fluxo de entrada atual, mas não na tabela Delta de destino, são automaticamente adicionadas ao seu esquema. Essa opção tem suporte em todos os métodos de atualização.
Quando a Compactação automática está habilitada, após uma gravação individual, a transformação verifica se os arquivos podem ser compactados e executa um trabalho de otimização rápida (com tamanhos de arquivo de 128 MB em vez de 1 GB) para compactar ainda mais os arquivos para partições que têm o maior número de arquivos pequenos. A compactação automática ajuda a unir um grande número de arquivos pequenos em um número menor de arquivos grandes. A compactação automática só entra em vigor quando há pelo menos 50 arquivos. Depois que uma operação de compactação é executada, ela cria uma versão da tabela e grava um novo arquivo contendo os dados de vários arquivos anteriores em um formulário compactado.
Quando a gravação Otimizar está habilitada, a transformação do coletor otimiza dinamicamente os tamanhos de partição com base nos dados reais, tentando gravar arquivos de 128 MB para cada partição de tabela. Esse é um tamanho aproximado, e pode variar dependendo das características do conjunto de dados. As gravações otimizadas aprimoram a eficiência geral das gravações e leituras posteriores. Ela organiza as partições de forma que o desempenho das leituras subsequentes melhore.
Dica
O processo de gravação otimizado reduzirá o trabalho de ETL geral porque o Coletor emitirá o comando Spark Delta Lake Optimize depois que os dados forem processados. É recomendável usar a Gravação Otimizada com moderação. Por exemplo, se você tiver um pipeline de dados por hora, execute um fluxo de dados com Gravação Otimizada diariamente.
Limitações conhecidas
Ao gravar em um coletor delta, há uma limitação conhecida em que o número de linhas gravadas não aparecerá na saída de monitoramento.
Conteúdo relacionado
- Crie uma transformação de origem no fluxo de dados de mapeamento.
- Crie uma transformação de coletor no fluxo de dados de mapeamento.
- Crie uma alteração prévia de linha para marcar as linhas como inserir, atualizar, upsert ou excluir.