Sdílet prostřednictvím


Kopírování dat z objektu blob Azure do služby Azure SQL Database pomocí služby Azure Data Factory

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

V tomto kurzu vytvoříte kanál datové továrny, který kopíruje data ze služby Azure Blob Storage do Azure SQL Database. Schéma konfigurace v tomto kurzu se vztahuje na kopírování z úložiště dat založeného na souborech do relačního úložiště dat. Seznam úložišť dat podporovaných jako zdroje a jímky najdete v podporovaných úložištích a formátech dat.

V tomto kurzu provedete následující kroky:

  • Vytvoření datové továrny
  • Vytvoření propojených služeb Azure Storage a Azure SQL Database
  • Vytvoření datových sad objektů blob Azure a Azure SQL Database
  • Vytvoření kanálu obsahujícího aktivitu kopírování
  • Zahajte spuštění kanálu.
  • Monitorování spuštění aktivit a kanálu

Tento kurz používá .NET SDK. K interakci se službou Azure Data Factory můžete použít jiné mechanismy; projděte si ukázky v části Rychlé starty.

Pokud ještě nemáte předplatné Azure, vytvořte si bezplatný účet Azure před tím, než začnete.

Požadavky

  • Účet služby Azure Storage. Úložiště objektů blob použijete jako zdrojové úložiště dat. Pokud nemáte účet úložiště Azure, přečtěte si téma Vytvoření účtu úložiště pro obecné účely.
  • Azure SQL Database Tuto databázi použijete jako úložiště dat jímky. Pokud nemáte databázi ve službě Azure SQL Database, přečtěte si téma Vytvoření databáze ve službě Azure SQL Database.
  • Visual Studio. Návod v tomto článku používá Visual Studio 2019.
  • Azure SDK pro .NET.
  • Aplikace Microsoft Entra. Pokud nemáte aplikaci Microsoft Entra, přečtěte si část Vytvoření aplikace Microsoft Entra v části Postupy: Použití portálu k vytvoření aplikace Microsoft Entra. Zkopírujte následující hodnoty pro pozdější použití: ID aplikace (klienta), ověřovací klíč a ID adresáře (tenanta). Podle pokynů ve stejném článku přiřaďte aplikaci k roli Přispěvatel .

Vytvoření objektu blob a tabulky SQL

Teď připravte objekty blob Azure a Azure SQL Database na kurz vytvořením zdrojového objektu blob a tabulky SQL jímky.

Vytvoření zdrojového objektu blob

Nejprve vytvořte zdrojový objekt blob tak, že vytvoříte kontejner a nahrajete do něj vstupní textový soubor:

  1. Otevřete Poznámkový blok. Zkopírujte následující text a uložte ho místně do souboru s názvem inputEmp.txt.

    John|Doe
    Jane|Doe
    
  2. Pomocí nástroje, jako je Průzkumník služby Azure Storage, vytvořte kontejner adfv2tutorial a nahrajte do kontejneru inputEmp.txt soubor.

Vytvoření tabulky SQL jímky

Dále vytvořte tabulku SQL jímky:

  1. Pomocí následujícího skriptu SQL vytvořte tabulku dbo.emp v Azure SQL Database.

    CREATE TABLE dbo.emp
    (
        ID int IDENTITY(1,1) NOT NULL,
        FirstName varchar(50),
        LastName varchar(50)
    )
    GO
    
    CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);
    
  2. Povolte službám Azure přístup ke službě SQL Database. Ujistěte se, že na serveru povolíte přístup ke službám Azure, aby služba Data Factory mohla zapisovat data do služby SQL Database. Pokud chcete toto nastavení ověřit a zapnout, proveďte následující kroky:

    1. Přejděte na web Azure Portal a spravujte sql server. Vyhledejte a vyberte SQL servery.

    2. Vyberte server.

    3. V části Zabezpečení nabídky SQL Serveru vyberte Brány firewall a virtuální sítě.

    4. Na stránce Brána firewall a virtuální sítě v části Povolit službám a prostředkům Azure přístup k tomuto serveru vyberte ZAPNUTO.

Vytvoření projektu ve Visual Studiu

Pomocí sady Visual Studio vytvořte konzolovou aplikaci C# .NET.

  1. Otevřete sadu Visual Studio.
  2. V okně Start vyberte Vytvořit nový projekt.
  3. V okně Vytvořit nový projekt zvolte v seznamu typů projektů verzi konzolové aplikace v jazyce C# (.NET Framework). Pak vyberte Další.
  4. V okně Konfigurovat nový projekt zadejte název projektu ADFv2Tutorial. V části Umístění přejděte do adresáře nebo vytvořte adresář pro uložení projektu. Pak vyberte Vytvořit. Nový projekt se zobrazí v integrovaném vývojovém prostředí sady Visual Studio.

Instalace balíčků NuGet

Dále nainstalujte požadované balíčky knihovny pomocí správce balíčků NuGet.

  1. V řádku nabídek zvolte Nástroje>NuGet Správce balíčků> Správce balíčků Konzola.

  2. V podokně konzoly Správce balíčků spusťte následující příkazy pro instalaci balíčků. Informace o balíčku NuGet služby Azure Data Factory najdete v tématu Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Vytvoření klienta datové továrny

Pomocí těchto kroků vytvořte klienta datové továrny.

  1. Otevřete Program.cs a přepište existující using příkazy následujícím kódem a přidejte odkazy na obory názvů.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Do metody, která nastavuje proměnné, přidejte následující kód Main . Nahraďte zástupné symboly 14 vlastními hodnotami.

    Seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, najdete v tématu Produkty dostupné v jednotlivých oblastech. V rozevíracím seznamu Produkty zvolte Procházet>analytickou>datovou továrnu. Potom v rozevíracím seznamu Oblasti vyberte oblasti, které vás zajímají. Zobrazí se mřížka se stavem dostupnosti produktů Data Factory pro vybrané oblasti.

    Poznámka:

    Úložiště dat, jako je Azure Storage a Azure SQL Database, a výpočty, jako je HDInsight, můžou být data Factory v jiných oblastech než to, co zvolíte pro službu Data Factory.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID to create the factory>";
    string resourceGroup = "<your resource group to create the factory>";
    
    string region = "<location to create the data factory in, such as East US>";
    string dataFactoryName = "<name of data factory to create (must be globally unique)>";
    
    // Specify the source Azure Blob information
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    string inputBlobPath = "adfv2tutorial/";
    string inputBlobName = "inputEmp.txt";
    
    // Specify the sink Azure SQL Database information
    string azureSqlConnString =
        "Server=tcp:<your server name>.database.windows.net,1433;" +
        "Database=<your database name>;" +
        "User ID=<your username>@<your server name>;" +
        "Password=<your password>;" +
        "Trusted_Connection=False;Encrypt=True;Connection Timeout=30";
    string azureSqlTableName = "dbo.emp";
    
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string sqlDbLinkedServiceName = "AzureSqlDbLinkedService";
    string blobDatasetName = "BlobDataset";
    string sqlDatasetName = "SqlDataset";
    string pipelineName = "Adfv2TutorialBlobToSqlCopy";
    
  3. Do metody, která vytvoří instanci DataFactoryManagementClient třídy, přidejte následující kódMain. Tento objekt použijete k vytvoření datové továrny, propojené služby, datových sad a kanálu. Použijete ho také k monitorování podrobných informací o spuštění kanálu.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync(
        "https://management.azure.com/", cc
    ).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Vytvoření datové továrny

Do metody, která vytvoří datovou továrnu, přidejte následující kódMain.

// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};

client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);

while (
    client.Factories.Get(
        resourceGroup, dataFactoryName
    ).ProvisioningState == "PendingCreation"
)
{
    System.Threading.Thread.Sleep(1000);
}

Vytvoření propojených služeb

V tomto kurzu vytvoříte dvě propojené služby pro zdroj a jímku v uvedeném pořadí.

Vytvoření propojené služby Azure Storage

Do metody, která vytvoří propojenou službu Azure Storage, přidejte následující kódMain. Informace o podporovanýchvlastnostech

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey
        )
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);

Vytvoření propojené služby Azure SQL Database

Do metody, která vytvoří propojenou službu Azure SQL Database, přidejte následující kódMain. Informace o podporovaných vlastnostech a podrobnostech najdete v tématu Vlastnosti propojené služby Azure SQL Database.

// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");

LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
    new AzureSqlDatabaseLinkedService
    {
        ConnectionString = new SecureString(azureSqlConnString)
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);

Vytvoření datových sad

V této části vytvoříte dvě datové sady: jednu pro zdroj, druhou pro jímku.

Vytvoření datové sady pro zdrojový objekt blob Azure

Do metody, která vytvoří datovou sadu objektů blob Azure, přidejte následující kódMain. Informace o podporovanýchvlastnostech

Nadefinujete datovou sadu, která představuje zdrojová data v objektu blob Azure. Tato datová sada Blob odkazuje na propojenou službu Azure Storage, kterou jste vytvořili v předchozím kroku, a popisuje:

  • Umístění objektu blob, ze které se má kopírovat: FolderPathFileName
  • Formát objektu blob označující, jak analyzovat obsah: TextFormat a jeho nastavení, například oddělovač sloupců
  • Datová struktura, včetně názvů sloupců a datových typů, které se v tomto příkladu mapuje na tabulku SQL jímky
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = inputBlobPath,
        FileName = inputBlobName,
        Format = new TextFormat { ColumnDelimiter = "|" },
        Structure = new List<DatasetDataElement>
        {
            new DatasetDataElement { Name = "FirstName", Type = "String" },
            new DatasetDataElement { Name = "LastName", Type = "String" }
        }
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);

Vytvoření datové sady pro Azure SQL Database jímky

Do metody, která vytvoří datovou sadu Azure SQL Database, přidejte následující kódMain. Informace o podporovanýchvlastnostech

Definujete datovou sadu, která představuje data jímky ve službě Azure SQL Database. Tato datová sada odkazuje na propojenou službu Azure SQL Database, kterou jste vytvořili v předchozím kroku. Určuje také tabulku SQL, který obsahuje zkopírovaná data.

// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
    new AzureSqlTableDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = sqlDbLinkedServiceName
        },
        TableName = azureSqlTableName
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);

Vytvořit kanál

Do metody, která vytvoří kanál s aktivitou kopírování, přidejte následující kódMain. V tomto kurzu tento kanál obsahuje jednu aktivitu: CopyActivity, která přebírá datovou sadu objektů blob jako zdroj a datovou sadu SQL jako jímku. Informace o aktivitě kopírování najdete v tématu aktivita Copy ve službě Azure Data Factory.

// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToSQL",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference() { ReferenceName = blobDatasetName }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference { ReferenceName = sqlDatasetName }
            },
            Source = new BlobSource { },
            Sink = new SqlSink { }
        }
    }
};

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);

Vytvoření spuštění kanálu

Do metody, která aktivuje spuštění kanálu, přidejte následující kódMain.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Monitorování spuštění kanálu

Teď vložte kód pro kontrolu stavů spuštění kanálu a získání podrobností o spuštění aktivity kopírování.

  1. Do metody přidejte následující kód Main , který bude průběžně kontrolovat stav spuštění kanálu, dokud nedokončí kopírování dat.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId
        );
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Do metody, která načte podrobnosti o spuštění aktivity kopírování, například velikost načtených nebo zapsaných dat, přidejte následující Main kód.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)
    );
    
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams
    );
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(queryResponse.Value.First().Output);
    }
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Spuštění kódu

Sestavte aplikaci tak, že zvolíte sestavit řešení sestavení>. Potom spusťte aplikaci tak, že zvolíte >Spustit ladění ladění a ověříte spuštění kanálu.

Konzola vytiskne průběh vytváření datové továrny, propojené služby, datové sady, kanál a spuštění kanálu. Potom zkontroluje stav spuštění kanálu. Počkejte, až se zobrazí podrobnosti o spuštění aktivity kopírování s velikostí čtení a zápisu dat. Potom pomocí nástrojů, jako je SQL Server Management Studio (SSMS) nebo Visual Studio, se můžete připojit k cílové službě Azure SQL Database a zkontrolovat, jestli zadaná cílová tabulka obsahuje zkopírovaná data.

Ukázkový výstup

Creating a data factory AdfV2Tutorial...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
      }
    }
  }
}
Creating linked service AzureSqlDbLinkedService...
{
  "properties": {
    "type": "AzureSqlDatabase",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": "adfv2tutorial/",
      "fileName": "inputEmp.txt",
      "format": {
        "type": "TextFormat",
        "columnDelimiter": "|"
      }
    },
    "structure": [
      {
        "name": "FirstName",
        "type": "String"
      },
      {
        "name": "LastName",
        "type": "String"
      }
    ],
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureStorageLinkedService"
    }
  }
}
Creating dataset SqlDataset...
{
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": "dbo.emp"
    },
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureSqlDbLinkedService"
    }
  }
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SqlDataset"
          }
        ],
        "name": "CopyFromBlobToSQL"
      }
    ]
  }
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 18,
  "dataWritten": 28,
  "rowsCopied": 2,
  "copyDuration": 2,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
  "usedDataIntegrationUnits": 2,
  "billedDuration": 2
}

Press any key to exit...

Kanál v této ukázce kopíruje data z jednoho umístění do jiného umístění v úložišti objektů blob Azure. Naučili jste se:

  • Vytvoření datové továrny
  • Vytvoření propojených služeb Azure Storage a Azure SQL Database
  • Vytvoření datových sad objektů blob Azure a Azure SQL Database
  • Vytvořte kanál obsahující aktivitu kopírování.
  • Zahajte spuštění kanálu.
  • Monitorování spuštění aktivit a kanálu

Pokud se chcete dozvědět víc o kopírování dat z místního prostředí do cloudu, přejděte k následujícímu kurzu: