Aangepaste activiteiten gebruiken in een Azure Data Factory- of Azure Synapse Analytics-pijplijn
VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics
Tip
Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .
Er zijn twee soorten activiteiten die u kunt gebruiken in een Azure Data Factory- of Synapse-pijplijn.
- Activiteiten voor gegevensverplaatsing om gegevens te verplaatsen tussen ondersteunde bron- en sinkgegevensarchieven.
- Activiteiten voor gegevenstransformatie om gegevens te transformeren met behulp van rekenservices zoals Azure HDInsight en Azure Batch.
Als u gegevens wilt verplaatsen naar/van een gegevensarchief dat de service niet ondersteunt of gegevens wilt transformeren/verwerken op een manier die niet wordt ondersteund door de service, kunt u een aangepaste activiteit maken met uw eigen gegevensverplaatsings- of transformatielogica en de activiteit in een pijplijn gebruiken. Met de aangepaste activiteit wordt uw aangepaste codelogica uitgevoerd op een Azure Batch-pool met virtuele machines.
Notitie
Het wordt aanbevolen de Azure Az PowerShell-module te gebruiken om te communiceren met Azure. Zie Azure PowerShell installeren om aan de slag te gaan. Raadpleeg Azure PowerShell migreren van AzureRM naar Az om te leren hoe u naar de Azure PowerShell-module migreert.
Zie de volgende artikelen als u geen gebruik hebt gemaakt van de Azure Batch-service:
- Basisbeginselen van Azure Batch voor een overzicht van de Azure Batch-service.
- Cmdlet New-AzBatchAccount voor het maken van een Azure Batch-account (of) Azure Portal om het Azure Batch-account te maken met behulp van Azure Portal. Zie Het artikel PowerShell gebruiken om het Azure Batch-account te beheren voor gedetailleerde instructies over het gebruik van de cmdlet.
- Cmdlet New-AzBatchPool voor het maken van een Azure Batch-pool.
Belangrijk
Bij het maken van een nieuwe Azure Batch-pool moet VirtualMachineConfiguration worden gebruikt en NIET CloudServiceConfiguration. Raadpleeg de migratierichtlijnen voor Azure Batch-pool voor meer informatie.
Aangepaste activiteiten toevoegen aan een pijplijn met ui
Voer de volgende stappen uit om een aangepaste activiteit in een pijplijn te gebruiken:
Zoek naar Aangepast in het deelvenster Activiteiten van de pijplijn en sleep een aangepaste activiteit naar het pijplijncanvas.
Selecteer de nieuwe aangepaste activiteit op het canvas als deze nog niet is geselecteerd.
Selecteer het tabblad Azure Batch om een nieuwe gekoppelde Azure Batch-service te selecteren of te maken waarmee de aangepaste activiteit wordt uitgevoerd.
Selecteer het tabblad Instellingen en geef een opdracht op die moet worden uitgevoerd in Azure Batch en optionele geavanceerde details.
Gekoppelde Azure Batch-service
Met de volgende JSON wordt een voorbeeld van een gekoppelde Azure Batch-service gedefinieerd. Zie Ondersteunde rekenomgevingen voor meer informatie
{
"name": "AzureBatchLinkedService",
"properties": {
"type": "AzureBatch",
"typeProperties": {
"accountName": "batchaccount",
"accessKey": {
"type": "SecureString",
"value": "access key"
},
"batchUri": "https://batchaccount.region.batch.azure.com",
"poolName": "poolname",
"linkedServiceName": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Zie het artikel over gekoppelde Compute-services voor meer informatie over de gekoppelde Azure Batch-service.
Aangepaste activiteit
Het volgende JSON-fragment definieert een pijplijn met een eenvoudige aangepaste activiteit. De activiteitsdefinitie heeft een verwijzing naar de gekoppelde Azure Batch-service.
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "helloworld.exe",
"folderPath": "customactv2/helloworld",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}]
}
}
In dit voorbeeld is het helloworld.exe een aangepaste toepassing die is opgeslagen in de map customactv2/helloworld van het Azure Storage-account dat wordt gebruikt in de resourceLinkedService. De aangepaste activiteit verzendt deze aangepaste toepassing die moet worden uitgevoerd in Azure Batch. U kunt de opdracht vervangen door elke gewenste toepassing die kan worden uitgevoerd op het doelbewerkingssysteem van de Azure Batch-poolknooppunten.
In de volgende tabel worden namen en beschrijvingen beschreven van eigenschappen die specifiek zijn voor deze activiteit.
Eigenschappen | Beschrijving | Vereist |
---|---|---|
naam | Naam van de activiteit in de pijplijn | Ja |
beschrijving | Tekst die beschrijft wat de activiteit doet. | Nee |
type | Voor aangepaste activiteit is het activiteitstype Aangepast. | Ja |
linkedServiceName | Gekoppelde service aan Azure Batch. Zie het artikel Gekoppelde services berekenen voor meer informatie over deze gekoppelde service. | Ja |
opdracht | Opdracht van de aangepaste toepassing die moet worden uitgevoerd. Als de toepassing al beschikbaar is op het Azure Batch-poolknooppunt, kunnen de resourceLinkedService en folderPath worden overgeslagen. U kunt bijvoorbeeld de opdracht opgeven die moet worden gebruikt cmd /c dir , die systeemeigen wordt ondersteund door het knooppunt Windows Batch-pool. |
Ja |
resourceLinkedService | Gekoppelde Azure Storage-service aan het opslagaccount waarin de aangepaste toepassing wordt opgeslagen | Nee* |
folderPath | Pad naar de map van de aangepaste toepassing en alle bijbehorende afhankelijkheden Als u afhankelijkheden hebt opgeslagen in submappen, dat wil gezegd, in een hiërarchische mapstructuur onder folderPath , wordt de mapstructuur momenteel afgevlakt wanneer de bestanden worden gekopieerd naar Azure Batch. Dat wil gezegd, alle bestanden worden gekopieerd naar één map zonder submappen. U kunt dit gedrag omzeilen door de bestanden te comprimeren, het gecomprimeerde bestand te kopiëren en het vervolgens op te heffen met aangepaste code op de gewenste locatie. |
Nee* |
referenceObjects | Een matrix van bestaande gekoppelde services en gegevenssets. De gekoppelde services en gegevenssets waarnaar wordt verwezen, worden doorgegeven aan de aangepaste toepassing in JSON-indeling, zodat uw aangepaste code kan verwijzen naar resources van de service | Nee |
extendedProperties | Door de gebruiker gedefinieerde eigenschappen die kunnen worden doorgegeven aan de aangepaste toepassing in JSON-indeling, zodat uw aangepaste code kan verwijzen naar aanvullende eigenschappen | Nee |
retentionTimeInDays | De bewaartijd voor de bestanden die zijn verzonden voor aangepaste activiteit. De standaardwaarde is 30 dagen. | Nee |
* De eigenschappen resourceLinkedService
en folderPath
moeten beide worden opgegeven of beide worden weggelaten.
Notitie
Als u gekoppelde services doorgeeft als referenceObjects in Custom Activity, is het een goede beveiligingspraktijk om een gekoppelde Azure Key Vault-service door te geven (omdat deze geen beveiligde tekenreeksen bevat) en de referenties op te halen met behulp van een geheime naam rechtstreeks vanuit Key Vault uit de code. Hier vindt u een voorbeeld dat verwijst naar een gekoppelde SERVICE met AKV, de referenties ophaalt uit Key Vault en vervolgens toegang krijgt tot de opslag in de code.
Notitie
Op dit moment wordt alleen Azure Blob Storage ondersteund voor resourceLinkedService in aangepaste activiteit. Dit is de enige gekoppelde service die standaard wordt gemaakt en geen optie om andere connectors zoals ADLS Gen2 te kiezen.
Aangepaste activiteitsmachtigingen
Met de aangepaste activiteit wordt het azure Batch-account voor automatische gebruikers ingesteld op niet-beheerderstoegang met taakbereik (de standaardspecificatie voor automatische gebruikers). U kunt het machtigingsniveau van het automatische gebruikersaccount niet wijzigen. Zie Taken uitvoeren onder gebruikersaccounts in Batch | Automatische gebruikersaccounts.
Opdrachten uitvoeren
U kunt een opdracht rechtstreeks uitvoeren met behulp van Aangepaste activiteit. In het volgende voorbeeld wordt de opdracht 'echo hello world' uitgevoerd op de doelknooppunten van de Azure Batch-pool en wordt de uitvoer afgedrukt naar stdout.
{
"name": "MyCustomActivity",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "cmd /c echo hello world"
}
}]
}
}
Objecten en eigenschappen doorgeven
In dit voorbeeld ziet u hoe u de referenceObjects en extendedProperties kunt gebruiken om objecten en door de gebruiker gedefinieerde eigenschappen van de service door te geven aan uw aangepaste toepassing.
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "SampleApp.exe",
"folderPath": "customactv2/SampleApp",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
},
"referenceObjects": {
"linkedServices": [{
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
}]
},
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
},
"PropertyBagPropertyName1": "PropertyBagValue1",
"propertyBagPropertyName2": "PropertyBagValue2",
"dateTime1": "2015-04-12T12:13:14Z"
}
}
}]
}
}
Wanneer de activiteit wordt uitgevoerd, worden referenceObjects en extendedProperties opgeslagen in de volgende bestanden die zijn geïmplementeerd in dezelfde uitvoeringsmap van de SampleApp.exe:
activity.json
Slaat extendedProperties en eigenschappen van de aangepaste activiteit op.
linkedServices.json
Slaat een matrix van gekoppelde services op die zijn gedefinieerd in de eigenschap referenceObjects.
datasets.json
Slaat een matrix op van gegevenssets die zijn gedefinieerd in de eigenschap referenceObjects.
De volgende voorbeeldcode laat zien hoe de SampleApp.exe toegang heeft tot de vereiste gegevens uit JSON-bestanden:
using Newtonsoft.Json;
using System;
using System.IO;
namespace SampleApp
{
class Program
{
static void Main(string[] args)
{
//From Extend Properties
dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);
// From LinkedServices
dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
}
}
}
Uitvoer van uitvoering ophalen
U kunt een pijplijnuitvoering starten met behulp van de volgende PowerShell-opdracht:
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Wanneer de pijplijn wordt uitgevoerd, kunt u de uitvoer van de uitvoering controleren met behulp van de volgende opdrachten:
while ($True) {
$result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if(!$result) {
Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
}
elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
}
else {
Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
($result | Format-List | Out-String)
Start-Sleep -Seconds 15
}
Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"
Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"
De stdout en stderr van uw aangepaste toepassing worden opgeslagen in de adfjobs-container in de gekoppelde Azure Storage-service die u hebt gedefinieerd bij het maken van een gekoppelde Azure Batch-service met een GUID van de taak. U kunt het gedetailleerde pad ophalen uit de uitvoer van activiteitsuitvoering, zoals wordt weergegeven in het volgende fragment:
Pipeline ' MyCustomActivity' run finished. Result:
ResourceGroupName : resourcegroupname
DataFactoryName : datafactoryname
ActivityName : MyCustomActivity
PipelineRunId : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName : MyCustomActivity
Input : {command}
Output : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart : 10/5/2017 3:33:06 PM
ActivityRunEnd : 10/5/2017 3:33:28 PM
DurationInMs : 21203
Status : Succeeded
Error : {errorCode, message, failureType, target}
Activity Output section:
"exitcode": 0
"outputs": [
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"
Als u de inhoud van stdout.txt in downstreamactiviteiten wilt gebruiken, kunt u het pad naar het stdout.txt bestand ophalen in de expressie '@activity('MyCustomActivity').output.outputs[0]'.
Belangrijk
- De activity.json, linkedServices.json en datasets.json worden opgeslagen in de runtimemap van de Batch-taak. In dit voorbeeld worden de activity.json, linkedServices.json en datasets.json opgeslagen in
https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/
het pad. Indien nodig moet u ze afzonderlijk opschonen. - Voor gekoppelde services die gebruikmaken van de zelf-hostende Integration Runtime, worden de gevoelige informatie, zoals sleutels of wachtwoorden, versleuteld door de zelf-hostende Integration Runtime om ervoor te zorgen dat de referenties in de door de klant gedefinieerde privénetwerkomgeving blijven. Sommige gevoelige velden kunnen ontbreken wanneer er op deze manier naar uw aangepaste toepassingscode wordt verwezen. Gebruik SecureString in extendedProperties in plaats van indien nodig verwijzingen naar gekoppelde services te gebruiken.
Uitvoer doorgeven aan een andere activiteit
U kunt aangepaste waarden vanuit uw code in een aangepaste activiteit terugsturen naar de service. U kunt dit doen door ze vanuit uw toepassing te outputs.json
schrijven. De service kopieert de inhoud van outputs.json
en voegt deze toe aan de activiteitsuitvoer als de waarde van de customOutput
eigenschap. (De groottelimiet is 2 MB.) Als u de inhoud van downstreamactiviteiten wilt gebruiken, kunt u de waarde ophalen met behulp van outputs.json
de expressie @activity('<MyCustomActivity>').output.customOutput
.
SecureString-uitvoer ophalen
Gevoelige eigenschapswaarden die zijn aangewezen als type SecureString, zoals wordt weergegeven in sommige voorbeelden in dit artikel, worden gemaskeerd op het tabblad Bewaking in de gebruikersinterface. Bij de uitvoering van de pijplijn wordt echter een SecureString-eigenschap geserialiseerd als JSON in het activity.json
bestand als tekst zonder opmaak. Voorbeeld:
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
}
}
Deze serialisatie is niet echt veilig en is niet bedoeld om veilig te zijn. De intentie is een hint voor de service om de waarde op het tabblad Bewaking te maskeren.
Als u toegang wilt krijgen tot eigenschappen van het type SecureString vanuit een aangepaste activiteit, leest u het activity.json
bestand, dat in dezelfde map wordt geplaatst als uw .EXE, deserialiseert u de JSON en opent u vervolgens de JSON-eigenschap (extendedProperties => [propertyName] => value).
Automatisch schalen van Azure Batch
U kunt ook een Azure Batch-pool maken met de functie voor automatisch schalen . U kunt bijvoorbeeld een Azure-batchgroep maken met 0 toegewezen VM's en een formule voor automatische schaalaanpassing op basis van het aantal taken dat in behandeling is.
De voorbeeldformule hier bereikt het volgende gedrag: Wanneer de pool in eerste instantie wordt gemaakt, begint deze met 1 VM. $PendingTasks metrische waarde definieert het aantal taken met de status Actief en Actief (in wachtrij). De formule zoekt het gemiddelde aantal taken in behandeling in de afgelopen 180 seconden en stelt TargetDedicated dienovereenkomstig in. Het zorgt ervoor dat TargetDedicated nooit meer dan 25 VM's overschrijdt. Naarmate er nieuwe taken worden verzonden, groeit de pool automatisch en naarmate taken worden voltooid, worden VM's één voor één vrij en worden deze VM's automatisch geschaald. startingNumberOfVMs en maxNumberofVMs kunnen worden aangepast aan uw behoeften.
Formule voor automatisch schalen:
startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);
Zie Rekenknooppunten automatisch schalen in een Azure Batch-pool voor meer informatie.
Als de pool gebruikmaakt van de standaard autoScaleEvaluationInterval, kan de Batch-service 15-30 minuten duren om de VIRTUELE machine voor te bereiden voordat de aangepaste activiteit wordt uitgevoerd. Als de pool een andere autoScaleEvaluationInterval gebruikt, kan de Batch-service autoScaleEvaluationInterval + 10 minuten duren.
Gerelateerde inhoud
Zie de volgende artikelen waarin wordt uitgelegd hoe u gegevens op andere manieren kunt transformeren: