Använda anpassade aktiviteter i en Azure Data Factory- eller Azure Synapse Analytics-pipeline
GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics
Dricks
Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!
Det finns två typer av aktiviteter som du kan använda i en Azure Data Factory- eller Synapse-pipeline.
- Dataförflyttningsaktiviteter för att flytta data mellan käll- och mottagardatalager som stöds.
- Datatransformeringsaktiviteter för att transformera data med hjälp av beräkningstjänster som Azure HDInsight och Azure Batch.
Om du vill flytta data till/från ett datalager som tjänsten inte stöder, eller transformera/bearbeta data på ett sätt som inte stöds av tjänsten, kan du skapa en anpassad aktivitet med din egen dataflytt eller transformeringslogik och använda aktiviteten i en pipeline. Den anpassade aktiviteten kör din anpassade kodlogik på en Azure Batch-pool med virtuella datorer.
Kommentar
Vi rekommenderar att du använder Azure Az PowerShell-modulen för att interagera med Azure. Se Installera Azure PowerShell för att komma igång. Information om hur du migrerar till Az PowerShell-modulen finns i artikeln om att migrera Azure PowerShell från AzureRM till Az.
Se följande artiklar om du är nybörjare på Azure Batch-tjänsten:
- Grunderna i Azure Batch för en översikt över Azure Batch-tjänsten.
- New-AzBatchAccount-cmdlet för att skapa ett Azure Batch-konto (eller) Azure Portal för att skapa Azure Batch-kontot med hjälp av Azure Portal. Mer information om hur du använder cmdleten finns i Använda PowerShell för att hantera Azure Batch-konto .
- New-AzBatchPool-cmdlet för att skapa en Azure Batch-pool.
Viktigt!
När du skapar en ny Azure Batch-pool måste "VirtualMachineConfiguration" användas och INTE "CloudServiceConfiguration". Mer information finns i migreringsvägledning för Azure Batch Pool.
Lägga till anpassade aktiviteter i en pipeline med användargränssnittet
Utför följande steg för att använda en anpassad aktivitet i en pipeline:
Sök efter Anpassad i fönstret Pipelineaktiviteter och dra en anpassad aktivitet till pipelinearbetsytan.
Välj den nya anpassade aktiviteten på arbetsytan om den inte redan är markerad.
Välj fliken Azure Batch för att välja eller skapa en ny länkad Azure Batch-tjänst som ska köra den anpassade aktiviteten.
Välj fliken Inställningar och ange ett kommando som ska köras på Azure Batch och valfri avancerad information.
Länkad Azure Batch-tjänst
Följande JSON definierar en Azure Batch-länkad exempeltjänst. Mer information finns i Beräkningsmiljöer som stöds
{
"name": "AzureBatchLinkedService",
"properties": {
"type": "AzureBatch",
"typeProperties": {
"accountName": "batchaccount",
"accessKey": {
"type": "SecureString",
"value": "access key"
},
"batchUri": "https://batchaccount.region.batch.azure.com",
"poolName": "poolname",
"linkedServiceName": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Mer information om länkad Azure Batch-tjänst finns i artikeln Compute linked services (Beräkningslänkade tjänster ).
Anpassad aktivitet
Följande JSON-kodfragment definierar en pipeline med en enkel anpassad aktivitet. Aktivitetsdefinitionen har en referens till den länkade Azure Batch-tjänsten.
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "helloworld.exe",
"folderPath": "customactv2/helloworld",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}]
}
}
I det här exemplet är helloworld.exe ett anpassat program som lagras i mappen customactv2/helloworld för Azure Storage-kontot som används i resourceLinkedService. Den anpassade aktiviteten skickar det här anpassade programmet som ska köras i Azure Batch. Du kan ersätta kommandot med valfritt föredraget program som kan köras på målåtgärdssystemet för Azure Batch-poolnoderna.
I följande tabell beskrivs namn och beskrivningar av egenskaper som är specifika för den här aktiviteten.
Property | Beskrivning | Obligatoriskt |
---|---|---|
name | Namnet på aktiviteten i pipelinen | Ja |
description | Text som beskriver vad aktiviteten gör. | Nej |
type | För Anpassad aktivitet är aktivitetstypen Anpassad. | Ja |
linkedServiceName | Länkad tjänst till Azure Batch. Mer information om den här länkade tjänsten finns i artikeln Compute linked services (Beräkningslänkade tjänster ). | Ja |
kommando | Kommando för det anpassade program som ska köras. Om programmet redan är tillgängligt på Azure Batch-poolnoden kan resourceLinkedService och folderPath hoppas över. Du kan till exempel ange kommandot som cmd /c dir , som stöds internt av noden Windows Batch-pool. |
Ja |
resourceLinkedService | Länkad Azure Storage-tjänst till lagringskontot där det anpassade programmet lagras | Nej* |
folderPath | Sökväg till mappen för det anpassade programmet och alla dess beroenden Om du har beroenden som lagras i undermappar, dvs. i en hierarkisk mappstruktur under folderPath , plattas mappstrukturen för närvarande ut när filerna kopieras till Azure Batch. Alla filer kopieras alltså till en enda mapp utan undermappar. Du kan kringgå det här beteendet genom att komprimera filerna, kopiera den komprimerade filen och sedan packa upp den med anpassad kod på önskad plats. |
Nej* |
referenceObjects | En matris med befintliga länkade tjänster och datauppsättningar. De länkade tjänsterna och datauppsättningarna skickas till det anpassade programmet i JSON-format så att din anpassade kod kan referera till tjänstens resurser | Nej |
extendedProperties | Användardefinierade egenskaper som kan skickas till det anpassade programmet i JSON-format så att din anpassade kod kan referera till ytterligare egenskaper | Nej |
retentionTimeInDays | Kvarhållningstiden för de filer som skickas för anpassad aktivitet. Standardvärdet är 30 dagar. | Nej |
* Egenskaperna resourceLinkedService
och folderPath
måste antingen båda anges eller båda utelämnas.
Kommentar
Om du skickar länkade tjänster som referenceObjects i anpassad aktivitet är det en bra säkerhetspraxis att skicka en Azure Key Vault-aktiverad länkad tjänst (eftersom den inte innehåller några säkra strängar) och hämta autentiseringsuppgifterna med hemligt namn direkt från Key Vault från koden. Du hittar ett exempel här som refererar till den AKV-aktiverade länkade tjänsten, hämtar autentiseringsuppgifterna från Key Vault och sedan kommer åt lagringen i koden.
Kommentar
För närvarande stöds endast Azure Blob Storage för resourceLinkedService i anpassad aktivitet, och det är den enda länkade tjänsten som skapas som standard och inget alternativ för att välja andra anslutningsappar som ADLS Gen2.
Behörigheter för anpassad aktivitet
Den anpassade aktiviteten anger det automatiska Azure Batch-användarkontot till Icke-administratörsåtkomst med uppgiftsomfång (standardspecifikationen för automatisk användare). Du kan inte ändra behörighetsnivån för det automatiska användarkontot. Mer information finns i Kör uppgifter under användarkonton i Batch | Automatiska användarkonton.
Köra kommandon
Du kan köra ett kommando direkt med anpassad aktivitet. I följande exempel körs kommandot "echo hello world" på målnoderna för Azure Batch-pooler och skriver ut utdata till stdout.
{
"name": "MyCustomActivity",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "cmd /c echo hello world"
}
}]
}
}
Skicka objekt och egenskaper
Det här exemplet visar hur du kan använda referenceObjects och extendedProperties för att skicka objekt och användardefinierade egenskaper från tjänsten till ditt anpassade program.
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "SampleApp.exe",
"folderPath": "customactv2/SampleApp",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
},
"referenceObjects": {
"linkedServices": [{
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
}]
},
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
},
"PropertyBagPropertyName1": "PropertyBagValue1",
"propertyBagPropertyName2": "PropertyBagValue2",
"dateTime1": "2015-04-12T12:13:14Z"
}
}
}]
}
}
När aktiviteten körs lagras referenceObjects och extendedProperties i följande filer som distribueras till samma körningsmapp i SampleApp.exe:
activity.json
Lagrar extendedProperties och egenskaper för den anpassade aktiviteten.
linkedServices.json
Lagrar en matris med länkade tjänster som definierats i egenskapen referenceObjects.
datasets.json
Lagrar en matris med datauppsättningar som definierats i egenskapen referenceObjects.
Följande exempelkod visar hur SampleApp.exe kan komma åt nödvändig information från JSON-filer:
using Newtonsoft.Json;
using System;
using System.IO;
namespace SampleApp
{
class Program
{
static void Main(string[] args)
{
//From Extend Properties
dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);
// From LinkedServices
dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
}
}
}
Hämta körningsutdata
Du kan starta en pipelinekörning med följande PowerShell-kommando:
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
När pipelinen körs kan du kontrollera körningsutdata med hjälp av följande kommandon:
while ($True) {
$result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if(!$result) {
Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
}
elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
}
else {
Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
($result | Format-List | Out-String)
Start-Sleep -Seconds 15
}
Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"
Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"
Stdout och stderr för ditt anpassade program sparas i adfjobs-containern i den länkade Azure Storage-tjänsten som du definierade när du skapade Azure Batch Linked Service med ett GUID för uppgiften. Du kan hämta den detaljerade sökvägen från aktivitetskörningens utdata enligt följande kodfragment:
Pipeline ' MyCustomActivity' run finished. Result:
ResourceGroupName : resourcegroupname
DataFactoryName : datafactoryname
ActivityName : MyCustomActivity
PipelineRunId : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName : MyCustomActivity
Input : {command}
Output : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart : 10/5/2017 3:33:06 PM
ActivityRunEnd : 10/5/2017 3:33:28 PM
DurationInMs : 21203
Status : Succeeded
Error : {errorCode, message, failureType, target}
Activity Output section:
"exitcode": 0
"outputs": [
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"
Om du vill använda innehållet i stdout.txt i underordnade aktiviteter kan du hämta sökvägen till stdout.txt-filen i uttrycket "@activity('MyCustomActivity').output.outputs[0]".
Viktigt!
- Activity.json, linkedServices.json och datasets.json lagras i körningsmappen för Batch-aktiviteten. I det här exemplet lagras activity.json, linkedServices.json och datasets.json i
https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/
sökvägen. Om det behövs måste du rensa dem separat. - För länkade tjänster som använder lokalt installerad integrationskörning krypteras känslig information som nycklar eller lösenord av integrationskörningen med egen värd för att säkerställa att autentiseringsuppgifterna finns kvar i den kunddefinierade privata nätverksmiljön. Vissa känsliga fält kan saknas när du refererar till din anpassade programkod på det här sättet. Använd SecureString i extendedProperties i stället för att använda länkad tjänstreferens om det behövs.
Skicka utdata till en annan aktivitet
Du kan skicka anpassade värden från koden i en anpassad aktivitet tillbaka till tjänsten. Du kan göra det genom att skriva in dem från outputs.json
ditt program. Tjänsten kopierar innehållet outputs.json
i och lägger till det i aktivitetsutdata som värdet för customOutput
egenskapen. (Storleksgränsen är 2 MB.) Om du vill använda innehållet outputs.json
i i underordnade aktiviteter kan du hämta värdet med uttrycket @activity('<MyCustomActivity>').output.customOutput
.
Hämta SecureString-utdata
Känsliga egenskapsvärden som är avsedda som typen SecureString, som visas i några av exemplen i den här artikeln, maskeras på fliken Övervakning i användargränssnittet. I själva pipelinekörningen serialiseras dock en SecureString-egenskap som JSON i activity.json
filen som oformaterad text. Till exempel:
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
}
}
Den här serialiseringen är inte säker och är inte avsedd att vara säker. Avsikten är ett tips till tjänsten om att maskera värdet på fliken Övervakning.
Om du vill komma åt egenskaper av typen SecureString från en anpassad aktivitet läser activity.json
du filen, som placeras i samma mapp som din .EXE, deserialiserar JSON och kommer sedan åt JSON-egenskapen (extendedProperties => [propertyName] => värde).
Automatisk skalning av Azure Batch
Du kan också skapa en Azure Batch-pool med funktionen autoskalning . Du kan till exempel skapa en Azure-batchpool med 0 dedikerade virtuella datorer och en autoskalningsformel baserat på antalet väntande aktiviteter.
Exempelformeln här uppnår följande beteende: När poolen skapas börjar den med en virtuell dator. $PendingTasks mått definierar antalet aktiviteter som körs + aktivt (i kö). Formeln hittar det genomsnittliga antalet väntande aktiviteter under de senaste 180 sekunderna och anger TargetDedicated i enlighet med detta. Det säkerställer att TargetDedicated aldrig går längre än 25 virtuella datorer. När nya uppgifter skickas växer poolen automatiskt och när aktiviteterna slutförs blir virtuella datorer kostnadsfria en i taget och den automatiska skalningen krymper de virtuella datorerna. startingNumberOfVMs och maxNumberofVMs kan justeras efter dina behov.
Autoskalningsformel:
startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);
Mer information finns i Skala beräkningsnoder automatiskt i en Azure Batch-pool .
Om poolen använder standardvärdet autoScaleEvaluationInterval kan Det ta 15–30 minuter för Batch-tjänsten att förbereda den virtuella datorn innan den anpassade aktiviteten körs. Om poolen använder en annan autoScaleEvaluationInterval kan Batch-tjänsten ta autoScaleEvaluationInterval + 10 minuter.
Relaterat innehåll
Se följande artiklar som förklarar hur du transformerar data på andra sätt: