Partilhar via


Transformação com o Azure Databricks

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Neste tutorial, você cria um pipeline de ponta a ponta que contém as atividades de Validação, Cópia de dados e Bloco de Anotações no Azure Data Factory.

  • A validação garante que seu conjunto de dados de origem esteja pronto para consumo downstream antes de acionar o trabalho de cópia e análise.

  • Copiar dados duplica o conjunto de dados de origem para o armazenamento do coletor, que é montado como DBFS no bloco de anotações do Azure Databricks. Desta forma, o conjunto de dados pode ser consumido diretamente pelo Spark.

  • O bloco de anotações aciona o bloco de anotações Databricks que transforma o conjunto de dados. Ele também adiciona o conjunto de dados a uma pasta processada ou ao Azure Synapse Analytics.

Para simplificar, o modelo neste tutorial não cria um gatilho agendado. Você pode adicionar um, se necessário.

Diagrama do pipeline

Pré-requisitos

  • Uma conta de armazenamento de Blob do Azure com um contêiner chamado sinkdata para uso como coletor.

    Anote o nome da conta de armazenamento, o nome do contêiner e a chave de acesso. Você precisará desses valores posteriormente no modelo.

  • Um espaço de trabalho do Azure Databricks.

Importar um bloco de notas para transformação

Para importar um bloco de anotações de transformação para seu espaço de trabalho Databricks:

  1. Entre no seu espaço de trabalho do Azure Databricks e selecione Importar. Comando de menu para importar um espaço de trabalho O caminho do espaço de trabalho pode ser diferente do mostrado, mas lembre-se dele para mais tarde.

  2. Selecione Importar de: URL. Na caixa de texto, digite https://adflabstaging1.blob.core.windows.net/share/Transformations.html.

    Seleções para importar um bloco de notas

  3. Agora vamos atualizar o bloco de anotações de transformação com suas informações de conexão de armazenamento.

    No bloco de anotações importado, vá para o comando 5 , conforme mostrado no trecho de código a seguir.

    • Substitua <storage name>e <access key> por suas próprias informações de conexão de armazenamento.
    • Use a conta de armazenamento com o sinkdata contêiner.
    # Supply storageName and accessKey values  
    storageName = "<storage name>"  
    accessKey = "<access key>"  
    
    try:  
      dbutils.fs.mount(  
        source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",  
        mount_point = "/mnt/Data Factorydata",  
        extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})  
    
    except Exception as e:  
      # The error message has a long stack track. This code tries to print just the relevant line indicating what failed.
    
    import re
    result = re.findall(r"\^\s\*Caused by:\s*\S+:\s\*(.*)\$", e.message, flags=re.MULTILINE)
    if result:
      print result[-1] \# Print only the relevant error message
    else:  
      print e \# Otherwise print the whole stack trace.  
    
  4. Gere um token de acesso Databricks para o Data Factory acessar o Databricks.

    1. No espaço de trabalho Databricks, selecione o ícone do seu perfil de usuário no canto superior direito.
    2. Selecione Configurações do usuário. Comando de menu para configurações do usuário
    3. Selecione Gerar novo token na guia Tokens de acesso .
    4. Selecione Gerar.

    Salve o token de acesso para uso posterior na criação de um serviço vinculado Databricks. O token de acesso se parece com dapi32db32cbb4w6eee18b7d87e45exxxxxx.

Como usar este modelo

  1. Vá para o modelo Transformação com o Azure Databricks e crie novos serviços vinculados para as seguintes conexões.

    Configuração de conexões

    • Conexão de Blob de Origem - para acessar os dados de origem.

      Para este exercício, você pode usar o armazenamento de blob público que contém os arquivos de origem. Consulte a captura de tela a seguir para a configuração. Use a seguinte URL SAS para se conectar ao armazenamento de origem (acesso somente leitura):

      https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

      Seleções para método de autenticação e URL SAS

    • Conexão de Blob de Destino - para armazenar os dados copiados.

      Na janela Novo serviço vinculado, selecione o blob de armazenamento do coletor.

      Blob de armazenamento do coletor como um novo serviço vinculado

    • Azure Databricks - para se conectar ao cluster Databricks.

      Crie um serviço vinculado ao Databricks usando a chave de acesso gerada anteriormente. Você pode optar por selecionar um cluster interativo, se tiver um. Este exemplo usa a opção Novo cluster de trabalho.

      Seleções para conexão com o cluster

  2. Selecione Utilizar este modelo. Você verá um pipeline criado.

    Criar um pipeline

Introdução e configuração de pipeline

No novo pipeline, a maioria das configurações é configurada automaticamente com valores padrão. Revise as configurações do seu pipeline e faça as alterações necessárias.

  1. No sinalizador Disponibilidade da atividade de validação, verifique se o valor do Conjunto de Dados de origem está definido como SourceAvailabilityDataset o que você criou anteriormente.

    Valor do conjunto de dados de origem

  2. Em Copiar arquivo de atividade de dados para blob, marque as guias Origem e Coletor. Altere as configurações, se necessário.

    • Guia FonteSeparador Origem

    • Aba PiaSeparador Sink

  3. Na atividade Transformação do Bloco de Notas, reveja e atualize os caminhos e as definições conforme necessário.

    O serviço vinculado Databricks deve ser pré-preenchido com o valor de uma etapa anterior, conforme mostrado: Valor preenchido para o serviço vinculado Databricks

    Para verificar as configurações do Bloco de Anotações :

    1. Selecione a guia Configurações . Para Caminho do bloco de anotações, verifique se o caminho padrão está correto. Talvez seja necessário navegar e escolher o caminho correto do bloco de anotações.

      Caminho do bloco de notas

    2. Expanda o seletor Parâmetros básicos e verifique se os parâmetros correspondem ao que é mostrado na captura de tela a seguir. Esses parâmetros são passados para o notebook Databricks do Data Factory.

      Parâmetros de base

  4. Verifique se os parâmetros do pipeline correspondem ao que é mostrado na captura de tela a seguir:Parâmetros do pipeline

  5. Conecte-se aos seus conjuntos de dados.

    Nota

    Nos conjuntos de dados abaixo, o caminho do arquivo foi especificado automaticamente no modelo. Se alguma alteração for necessária, certifique-se de especificar o caminho para o contêiner e o diretório em caso de erro de conexão.

    • SourceAvailabilityDataset - para verificar se os dados de origem estão disponíveis.

      Seleções para serviço vinculado e caminho de arquivo para SourceAvailabilityDataset

    • SourceFilesDataset - para acessar os dados de origem.

      Seleções para serviço vinculado e caminho de arquivo para SourceFilesDataset

    • DestinationFilesDataset - para copiar os dados para o local de destino do coletor. Utilize os seguintes valores:

      • Serviço - sinkBlob_LS vinculado, criado em uma etapa anterior.

      • Caminho do arquivo - sinkdata/staged_sink.

        Seleções para serviço vinculado e caminho de arquivo para DestinationFilesDataset

  6. Selecione Depurar para executar o pipeline. Você pode encontrar o link para os logs do Databricks para obter logs do Spark mais detalhados.

    Link para logs do Databricks da saída

    Você também pode verificar o arquivo de dados usando o Gerenciador de Armazenamento do Azure.

    Nota

    Para correlacionar com execuções de pipeline do Data Factory, este exemplo acrescenta a ID de execução do pipeline do data factory à pasta de saída. Isso ajuda a manter o controle dos arquivos gerados por cada execução. ID de execução do pipeline anexado