Dela via


Dataflödesskript (DFS)

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Dricks

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

Dataflöden är tillgängliga både i Azure Data Factory och Azure Synapse Pipelines. Den här artikeln gäller för mappning av dataflöden. Om du är nybörjare på transformeringar kan du läsa den inledande artikeln Transformera data med hjälp av ett mappningsdataflöde.

Dataflödesskript (DFS) är underliggande metadata, ungefär som ett kodningsspråk, som används för att köra transformeringar som ingår i ett mappningsdataflöde. Varje transformering representeras av en serie egenskaper som ger nödvändig information för att köra jobbet korrekt. Skriptet är synligt och redigerbart från ADF genom att klicka på "skript"-knappen i det övre menyfliksområdet i webbläsargränssnittet.

Knappen Skript

I en källtransformering uppmanas till exempel allowSchemaDrift: true, tjänsten att inkludera alla kolumner från källdatauppsättningen i dataflödet även om de inte ingår i schemaprojektionen.

Användningsfall

DFS skapas automatiskt av användargränssnittet. Du kan klicka på knappen Skript för att visa och anpassa skriptet. Du kan också generera skript utanför ADF-användargränssnittet och sedan skicka dem till PowerShell-cmdleten. När du felsöker komplexa dataflöden kan det vara enklare att skanna skriptkoden bakom i stället för att skanna UI-grafens representation av dina flöden.

Här är några exempel på användningsfall:

  • Programatiskt producerar många dataflöden som är ganska lika, d.v.s. "stamping-out"-dataflöden.
  • Komplexa uttryck som är svåra att hantera i användargränssnittet eller som resulterar i valideringsproblem.
  • Felsökning och bättre förståelse av olika fel som returneras under körningen.

När du skapar ett dataflödesskript som ska användas med PowerShell eller ett API måste du komprimera den formaterade texten till en enda rad. Du kan behålla flikar och nya radlinjer som escape-tecken. Men texten måste vara formaterad för att få plats i en JSON-egenskap. Det finns en knapp i skriptredigerarens användargränssnitt längst ned som formaterar skriptet som en enda rad åt dig.

Knappen Kopiera

Så här lägger du till transformeringar

Att lägga till transformeringar kräver tre grundläggande steg: att lägga till kärntransformeringsdata, omdirigera indataströmmen och sedan omdirigera utdataströmmen. Detta kan ses enklast i ett exempel. Anta att vi börjar med en enkel källa för att sänka dataflödet så här:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Om vi bestämmer oss för att lägga till en härledd transformering måste vi först skapa kärntransformeringens text, som har ett enkelt uttryck för att lägga till en ny versalkolumn med namnet upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Sedan tar vi den befintliga DFS:en och lägger till transformeringen:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Och nu omdirigerar vi den inkommande dataströmmen igen genom att identifiera vilken transformering vi vill att den nya omvandlingen ska komma efter (i det här fallet source1) och kopiera namnet på strömmen till den nya omvandlingen:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Slutligen identifierar vi den omvandling som vi vill komma efter den här nya omvandlingen och ersätter dess indataström (i det här fallet sink1) med namnet på utdataströmmen för vår nya transformering:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Grunderna i DFS

DFS består av en serie anslutna transformeringar, inklusive källor, mottagare och olika andra som kan lägga till nya kolumner, filtrera data, koppla data och mycket mer. Vanligtvis börjar skriptet med en eller flera källor följt av många transformeringar och slutar med en eller flera mottagare.

Källor har alla samma grundläggande konstruktion:

source(
  source properties
) ~> source_name

Till exempel skulle en enkel källa med tre kolumner (movieId, titel, genrer) vara:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Alla transformeringar förutom källor har samma grundläggande konstruktion:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Till exempel skulle en enkel härledd transformering som tar en kolumn (rubrik) och skriver över den med en versalversion vara följande:

source1 derive(
  title = upper(title)
) ~> derive1

Och en mottagare utan schema skulle vara:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Skriptfragment

Skriptfragment är delningsbar kod för Dataflöde skript som du kan använda för att dela mellan dataflöden. Den här videon nedan beskriver hur du använder skriptfragment och använder Dataflöde Script för att kopiera och klistra in delar av skriptet bakom dina dataflödesdiagram:

Sammanställd sammanfattningsstatistik

Lägg till en aggregeringstransformering i dataflödet med namnet "SummaryStats" och klistra sedan in den här koden nedan för aggregeringsfunktionen i skriptet och ersätt befintliga SummaryStats. Detta ger ett allmänt mönster för sammanfattningsstatistik för dataprofiler.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Du kan också använda exemplet nedan för att räkna antalet unika och antalet distinkta rader i dina data. Exemplet nedan kan klistras in i ett dataflöde med aggregerad transformering med namnet ValueDistAgg. I det här exemplet används en kolumn med namnet "title". Ersätt "rubrik" med strängkolumnen i dina data som du vill använda för att hämta värdeantal.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Ta med alla kolumner i en aggregering

Det här är ett allmänt aggregeringsmönster som visar hur du kan behålla de återstående kolumnerna i utdatametadata när du skapar aggregeringar. I det här fallet använder first() vi funktionen för att välja det första värdet i varje kolumn vars namn inte är "film". Om du vill använda detta skapar du en aggregeringstransformering med namnet DistinctRows och klistrar sedan in den i skriptet ovanpå det befintliga DistinctRows-aggregeringsskriptet.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Skapa ett hash-fingeravtryck för rad

Använd den här koden i dataflödesskriptet för att skapa en ny härledd kolumn med namnet DWhash som ger en sha1 hash med tre kolumner.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Du kan också använda det här skriptet nedan för att generera en radhash med alla kolumner som finns i dataströmmen, utan att behöva namnge varje kolumn:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg motsvarande

Den här koden fungerar som T-SQL-funktionen string_agg() och aggregerar strängvärden i en matris. Du kan sedan omvandla matrisen till en sträng som ska användas med SQL-mål.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Antal uppdateringar, upserts, infogningar, borttagningar

När du använder en Alter Row-transformering kanske du vill räkna antalet uppdateringar, upserts, infogningar, borttagningar som är resultatet av dina Alter Row-principer. Lägg till en aggregeringstransformering efter alter-raden och klistra in det här Dataflöde Script i aggregeringsdefinitionen för dessa antal.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Distinkt rad med alla kolumner

Det här kodfragmentet lägger till en ny aggregerad transformering i dataflödet, som tar alla inkommande kolumner, genererar en hash som används för gruppering för att eliminera dubbletter och anger sedan den första förekomsten av varje dubblett som utdata. Du behöver inte uttryckligen namnge kolumnerna. De genereras automatiskt från din inkommande dataström.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Sök efter NULL:er i alla kolumner

Det här är ett kodfragment som du kan klistra in i dataflödet för att allmänt kontrollera alla dina kolumner efter NULL-värden. Den här tekniken använder schemaavvikelse för att titta igenom alla kolumner i alla rader och använder en villkorsstyrd delning för att separera raderna med NULLs från raderna utan NULLL:er.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

AutoMappa schemaavvikelse med ett val

När du behöver läsa in ett befintligt databasschema från en okänd eller dynamisk uppsättning inkommande kolumner måste du mappa kolumnerna på höger sida i sinktransformeringen. Detta behövs bara när du läser in en befintlig tabell. Lägg till det här kodfragmentet före mottagare för att skapa en Välj som automatiskt mappar dina kolumner. Låt mappningen för mottagare vara automatisk mappning.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Spara kolumndatatyper

Lägg till det här skriptet i en definition av härledd kolumn för att lagra kolumnnamnen och datatyperna från ditt dataflöde till ett beständigt lager med hjälp av en mottagare.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Fyll ned

Så här implementerar du det vanliga problemet "Fyll ned" med datauppsättningar när du vill ersätta NULL-värden med värdet från det tidigare icke-NULL-värdet i sekvensen. Observera att den här åtgärden kan ha negativa prestandakonsekvenser eftersom du måste skapa ett syntetiskt fönster i hela datauppsättningen med kategorivärdet "dummy". Dessutom måste du sortera efter ett värde för att skapa rätt datasekvens för att hitta det tidigare värdet som inte är NULL. Det här kodfragmentet nedan skapar den syntetiska kategorin som "dummy" och sorterar efter en surrogatnyckel. Du kan ta bort surrogatnyckeln och använda din egen dataspecifika sorteringsnyckel. Det här kodfragmentet förutsätter att du redan har lagt till en källtransformering med namnet source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Glidande medelvärde

Glidande medelvärde kan implementeras mycket enkelt i dataflöden med hjälp av en Windows-transformering. I det här exemplet nedan skapas ett glidande medelvärde på 15 dagar för Microsofts aktiekurser.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Distinkt antal av alla kolumnvärden

Du kan använda det här skriptet för att identifiera nyckelkolumner och visa kardinaliteten för alla kolumner i dataströmmen med ett enda skriptfragment. Lägg till det här skriptet som en aggregerad transformering i dataflödet så ger det automatiskt distinkta antal av alla kolumner.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Jämför värden för föregående eller nästa rad

Det här exempelfragmentet visar hur windowstransformeringen kan användas för att jämföra kolumnvärden från den aktuella radkontexten med kolumnvärden från rader före och efter den aktuella raden. I det här exemplet används en härledd kolumn för att generera ett dummy-värde för att aktivera en fönsterpartition i hela datauppsättningen. En surrogatnyckeltransformering används för att tilldela ett unikt nyckelvärde för varje rad. När du använder det här mönstret för dina datatransformeringar kan du ta bort surrogatnyckeln om du är en kolumn som du vill beställa efter och du kan ta bort den härledda kolumnen om du har kolumner att använda för att partitionera dina data med.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Hur många kolumner finns det i mina data?

size(array(columns()))

Utforska Dataflöde genom att börja med översiktsartikeln för dataflöden