Gegevensstroomscript (DFS)
VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics
Tip
Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .
Gegevensstromen zijn beschikbaar in Zowel Azure Data Factory als Azure Synapse Pipelines. Dit artikel is van toepassing op toewijzingsgegevensstromen. Als u geen ervaring hebt met transformaties, raadpleegt u het inleidende artikel Gegevens transformeren met behulp van een toewijzingsgegevensstroom.
Dfs (Data Flow Script) is de onderliggende metagegevens, vergelijkbaar met een coderingstaal, die wordt gebruikt om de transformaties uit te voeren die zijn opgenomen in een toewijzingsgegevensstroom. Elke transformatie wordt vertegenwoordigd door een reeks eigenschappen die de benodigde informatie bieden om de taak correct uit te voeren. Het script is zichtbaar en bewerkbaar vanuit ADF door te klikken op de knop Script op het bovenste lint van de browsergebruikersinterface.
In een brontransformatie geeft de service bijvoorbeeld allowSchemaDrift: true,
aan dat alle kolommen uit de brongegevensset in de gegevensstroom moeten worden opgenomen, zelfs als ze niet zijn opgenomen in de schemaprojectie.
Gebruiksgevallen
De DFS wordt automatisch geproduceerd door de gebruikersinterface. U kunt op de knop Script klikken om het script weer te geven en aan te passen. U kunt ook scripts genereren buiten de ADF-gebruikersinterface en deze vervolgens doorgeven aan de PowerShell-cmdlet. Bij het opsporen van fouten in complexe gegevensstromen is het mogelijk gemakkelijker om de scriptcode achter te scannen in plaats van de weergave van de UI-grafiek van uw stromen te scannen.
Hier volgen enkele voorbeelden van use cases:
- Programmatisch veel gegevensstromen produceren die redelijk vergelijkbaar zijn, d.w. 'uitstempelen'-gegevensstromen.
- Complexe expressies die moeilijk te beheren zijn in de gebruikersinterface of leiden tot validatieproblemen.
- Foutopsporing en meer inzicht krijgen in verschillende fouten die tijdens de uitvoering worden geretourneerd.
Wanneer u een gegevensstroomscript bouwt voor gebruik met PowerShell of een API, moet u de opgemaakte tekst samenvouwen tot één regel. U kunt tabs en nieuwe regels behouden als escapetekens. De tekst moet echter worden opgemaakt zodat deze in een JSON-eigenschap past. Onderaan ziet u een knop in de gebruikersinterface van de scripteditor waarmee het script wordt opgemaakt als één regel.
Transformaties toevoegen
Voor het toevoegen van transformaties zijn drie basisstappen vereist: het toevoegen van de kerntransformatiegegevens, het omleiden van de invoerstroom en het opnieuw omleiden van de uitvoerstroom. Dit is het eenvoudigst in een voorbeeld te zien. Stel dat we beginnen met een eenvoudige bron voor het sinken van gegevensstromen, zoals hieronder:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Als we besluiten om een afgeleide transformatie toe te voegen, moeten we eerst de kerntransformatietekst maken, die een eenvoudige expressie heeft om een nieuwe kolom met hoofdletters toe te voegen met de naam upperCaseTitle
:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Vervolgens nemen we de bestaande DFS en voegen we de transformatie toe:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
En nu wordt de binnenkomende stroom omgeleid door te bepalen welke transformatie we willen dat de nieuwe transformatie nakomt (in dit geval source1
) en de naam van de stream naar de nieuwe transformatie kopieert:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Ten slotte identificeren we de transformatie die we na deze nieuwe transformatie willen hebben en vervangen we de invoerstroom (in dit geval sink1
) door de naam van de uitvoerstroom van onze nieuwe transformatie:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Basisprincipes van DFS
Het DFS bestaat uit een reeks verbonden transformaties, waaronder bronnen, sinks en verschillende andere, waarmee nieuwe kolommen kunnen worden toegevoegd, gegevens kunnen worden gefilterd, gegevens kunnen worden samengevoegd en nog veel meer. Meestal begint het script met een of meer bronnen, gevolgd door veel transformaties en eindigend met een of meer sinks.
Bronnen hebben allemaal dezelfde basisconstructie:
source(
source properties
) ~> source_name
Een eenvoudige bron met drie kolommen (movieId, title, genres) is bijvoorbeeld:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
Alle transformaties behalve bronnen hebben dezelfde basisconstructie:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Een eenvoudige afgeleide transformatie die bijvoorbeeld een kolom (titel) gebruikt en deze overschrijft met een hoofdletterversie, is als volgt:
source1 derive(
title = upper(title)
) ~> derive1
En een sink zonder schema zou het volgende zijn:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Scriptfragmenten
Scriptfragmenten zijn deelbare code van Gegevensstroom Script waarmee u gegevensstromen kunt delen. In deze video wordt beschreven hoe u scriptfragmenten gebruikt en Gegevensstroom Script gebruikt om gedeelten van het script achter uw gegevensstroomgrafieken te kopiëren en plakken:
Geaggregeerde samenvattingsstatistieken
Voeg een statistische transformatie toe aan uw gegevensstroom met de naam SummaryStats en plak deze code hieronder voor de statistische functie in uw script, waarbij u de bestaande SummaryStats vervangt. Dit biedt een algemeen patroon voor overzichtsstatistieken van gegevensprofielen.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
U kunt ook het onderstaande voorbeeld gebruiken om het aantal unieke en het aantal afzonderlijke rijen in uw gegevens te tellen. Het onderstaande voorbeeld kan worden geplakt in een gegevensstroom met een statistische transformatie met de naam ValueDistAgg. In dit voorbeeld wordt een kolom met de naam 'title' gebruikt. Zorg ervoor dat u 'titel' vervangt door de tekenreekskolom in uw gegevens die u wilt gebruiken om waardeaantallen op te halen.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Alle kolommen in een aggregaties opnemen
Dit is een algemeen aggregatiespatroon dat laat zien hoe u de resterende kolommen in uw uitvoermetagegevens kunt behouden wanneer u aggregaties bouwt. In dit geval gebruiken we de first()
functie om de eerste waarde in elke kolom te kiezen waarvan de naam niet 'film' is. Als u dit wilt gebruiken, maakt u een statistische transformatie met de naam DistinctRows en plakt u deze vervolgens in uw script boven op het bestaande statistische script distinctRows.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Vingerafdruk van rij-hash maken
Gebruik deze code in uw gegevensstroomscript om een nieuwe afgeleide kolom DWhash
te maken die een sha1
hash van drie kolommen produceert.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
U kunt dit script hieronder ook gebruiken om een rij-hash te genereren met behulp van alle kolommen die aanwezig zijn in uw stream, zonder dat u elke kolom een naam hoeft te geven:
derive(DWhash = sha1(columns())) ~> DWHash
String_agg equivalent
Deze code fungeert als de T-SQL-functie string_agg()
en voegt tekenreekswaarden samen in een matrix. Vervolgens kunt u die matrix naar een tekenreeks casten die u met SQL-bestemmingen wilt gebruiken.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Aantal updates, upserts, invoegingen, verwijderingen
Wanneer u een transformatie van een alter row gebruikt, wilt u mogelijk het aantal updates, upserts, inserts en verwijderingen tellen dat het resultaat is van het beleid Voor het wijzigen van rijen. Voeg een aggregatietransformatie toe nadat u de rij hebt gewijzigd en plak dit Gegevensstroom Script in de statistische definitie voor deze aantallen.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Afzonderlijke rij met alle kolommen
Met dit fragment wordt een nieuwe statistische transformatie toegevoegd aan uw gegevensstroom, die alle binnenkomende kolommen neemt, een hash genereert die wordt gebruikt voor groeperen om duplicaten te elimineren en vervolgens het eerste exemplaar van elke duplicaat als uitvoer op te geven. U hoeft de kolommen niet expliciet een naam te geven. Deze worden automatisch gegenereerd op basis van uw binnenkomende gegevensstroom.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
Controleren op NULL's in alle kolommen
Dit is een fragment dat u in uw gegevensstroom kunt plakken om alle kolommen algemeen te controleren op NULL-waarden. Deze techniek maakt gebruik van schemadrift om alle kolommen in alle rijen te bekijken en maakt gebruik van een voorwaardelijke splitsing om de rijen te scheiden met NULL's uit de rijen zonder NULL's.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
Schemadrift automapen met een selectie
Wanneer u een bestaand databaseschema wilt laden uit een onbekende of dynamische set binnenkomende kolommen, moet u de rechterkolommen in de Sink-transformatie toewijzen. Dit is alleen nodig wanneer u een bestaande tabel laadt. Voeg dit fragment toe voordat uw sink een select maakt waarmee uw kolommen automatisch worden toegewezen. Laat uw Sink-toewijzing staan om automatisch toe te wijzen.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Kolomgegevenstypen behouden
Voeg dit script toe aan een definitie van een afgeleide kolom om de kolomnamen en gegevenstypen van uw gegevensstroom op te slaan in een permanente opslag met behulp van een sink.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Omlaag doorvoeren
U kunt als volgt het veelvoorkomende probleem 'Omlaag doorvoeren' implementeren met gegevenssets wanneer u NULL-waarden wilt vervangen door de waarde van de vorige niet-NULL-waarde in de reeks. Houd er rekening mee dat deze bewerking negatieve gevolgen kan hebben voor de prestaties, omdat u een synthetisch venster moet maken voor uw hele gegevensset met een categoriewaarde dummy. Daarnaast moet u sorteren op een waarde om de juiste gegevensreeks te maken om de vorige niet-NULL-waarde te vinden. Met dit fragment hieronder wordt de synthetische categorie gemaakt als dummy en gesorteerd op een surrogaatsleutel. U kunt de surrogaatsleutel verwijderen en uw eigen gegevensspecifieke sorteersleutel gebruiken. In dit codefragment wordt ervan uitgegaan dat u al een brontransformatie hebt toegevoegd met de naam Brontransformatie source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Zwevend gemiddelde
Zwevend gemiddelde kan heel eenvoudig worden geïmplementeerd in gegevensstromen met behulp van een Windows-transformatie. In dit voorbeeld hieronder wordt een zwevend gemiddelde van de aandelenkoersen voor Microsoft van 15 dagen gemaakt.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Uniek aantal van alle kolomwaarden
U kunt dit script gebruiken om belangrijke kolommen te identificeren en de kardinaliteit van alle kolommen in uw stream te bekijken met één scriptfragment. Voeg dit script toe als een geaggregeerde transformatie aan uw gegevensstroom en biedt automatisch afzonderlijke aantallen van alle kolommen.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Waarden voor vorige of volgende rij vergelijken
Dit voorbeeldfragment laat zien hoe de venstertransformatie kan worden gebruikt om kolomwaarden uit de huidige rijcontext te vergelijken met kolomwaarden uit rijen vóór en na de huidige rij. In dit voorbeeld wordt een afgeleide kolom gebruikt om een dummywaarde te genereren om een vensterpartitie in te schakelen voor de hele gegevensset. Een surrogaatsleuteltransformatie wordt gebruikt om een unieke sleutelwaarde toe te wijzen voor elke rij. Wanneer u dit patroon toepast op uw gegevenstransformaties, kunt u de surrogaatsleutel verwijderen als u een kolom bent waarop u wilt ordenen en kunt u de afgeleide kolom verwijderen als u kolommen hebt om uw gegevens te partitioneren.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Hoeveel kolommen staan er in mijn gegevens?
size(array(columns()))
Gerelateerde inhoud
Verken Gegevensstroom s door te beginnen met het overzichtsartikel over gegevensstromen