Lakehouse-opetusohjelma: Tietojen valmisteleminen ja muuntaminen Lakehousessa
Tässä opetusohjelmassa käytetään muistikirjoja ja Spark runtime -funktiota raakatietojen muuntamiseen ja valmisteluun Lakehousessa.
Edellytykset
Jos sinulla ei ole lakehouse-tallennustila, joka sisältää tietoja, sinun täytyy:
Valmistele tiedot
Edellisissä opetusvaiheissa olemme saaneet raakadataa, joka on saatettu lähteestä Lakehousen Tiedostot-osioon . Nyt voit muuntaa tiedot ja valmistella ne Delta-taulukoiden luomista varten.
Lataa muistikirjat Lakehouse Tutorial Source Code - kansiosta.
Valitse näytön vasemmassa alakulmassa olevasta valitsimesta Tietotekniikka.
Valitse tuontimuistikirja aloitussivun yläreunan Uusi-osasta .
Valitse Lataa tuonnin tila -ruudusta, joka avautuu näytön oikeassa reunassa.
Valitse kaikki muistikirjat, jotka latasit tämän osion ensimmäisessä vaiheessa.
Valitse Avaa. Selainikkunan oikeassa yläkulmassa näkyy ilmoitus tuonnin tilasta.
Kun tuonti on onnistunut, siirry työtilan kohteiden näkymään ja näet uudet tuodut muistikirjat. Valitse wwilakehouse Lakehouse avataksesi sen.
Kun wwilakehouse-lakehouse on avattu, valitse Avoin muistikirja>Olemassa oleva muistikirja yläreunan siirtymisvalikosta.
Valitse olemassa olevien muistikirjojen luettelosta 01 - Luo Delta-taulukot - muistikirja ja valitse Avaa.
Lakehouse Explorerin avoimessa muistikirjassa näet, että muistikirja on jo linkitetty avattuun lakehouseesi.
Muistiinpano
Fabric tarjoaa V-tilausominaisuuden optimoitujen Delta Lake -tiedostojen kirjoittamiseen. Näennäisjärjestys parantaa usein pakkausta kolmesta neljään kertaan ja jopa 10 kertaa suorituskyvyn kiihtyvyyttä Delta Lake -tiedostoissa, joita ei ole optimoitu. Spark in Fabric optimoi osiot dynaamisesti samalla, kun luodaan tiedostoja, joiden oletusarvoinen koko on 128 Mt. Kohdetiedoston kokoa voidaan muuttaa kuormitusvaatimusten mukaan määritysten avulla.
Kirjoitusoptimoinnin avulla Apache Spark -moduuli vähentää kirjoitettujen tiedostojen määrää ja pyrkii lisäämään kirjoitettujen tietojen yksittäistä tiedostokokoa.
Ennen kuin kirjoitat tietoja Delta Lake -taulukoina Lakehousen Taulukot-osiossa, käytät kahta Fabric-ominaisuutta (V-järjestys ja Optimoi kirjoitus) tietojen kirjoittamisen optimointiin ja lukemisen suorituskykyyn. Jotta nämä ominaisuudet voidaan ottaa käyttöön istunnossa, määritä nämä määritykset muistikirjasi ensimmäisessä solussa.
Käynnistä muistikirja ja suorita kaikki järjestyksessä olevat solut valitsemalla Suorita kaikki yläreunan valintanauhassa (Aloitus-kohdassa). Tai jos haluat suorittaa koodin vain tietystä solusta, valitse solun vasemmalla puolella näkyvä Suorita-kuvake , kun viet hiiren osoittimen, tai paina VAIHTO + ENTER -näppäintä näppäimistöllä, kun ohjausobjekti on solussa.
Kun suoritat solua, sinun ei tarvinnut määrittää pohjana olevaa Spark-varannon tai -klusterin tietoja, koska Fabric tarjoaa ne reaaliaikaisen varannon kautta. Jokaiseen Fabric-työtilaan sisältyy oletusarvoinen Spark-varanto, jota kutsutaan reaaliaikaiseksi varannoksi. Tämä tarkoittaa sitä, että kun luot muistikirjoja, sinun ei tarvitse huolehtia Spark-määritysten tai klusterin tietojen määrittämisestä. Kun suoritat ensimmäisen muistikirjakomennon, reaaliaikainen uima-allas on toiminnassa muutamassa sekunnissa. Spark-istunto käynnistyy ja se alkaa suorittaa koodia. Tämän jälkeen koodin suorittaminen on lähes välitöntä tässä muistikirjassa, kun Spark-istunto on aktiivinen.
Seuraavaksi luet raakatiedot Lakehousen Tiedostot-osasta ja lisäät sarakkeita eri päivämääräosiin osana muunnoksen. Lopuksi käytät Spark-ohjelmointirajapinnan osiota tietojen jakamiseen ennen niiden kirjoittamista Delta-taulukkomuotoon, joka perustuu juuri luotuihin tieto-osan sarakkeisiin (Vuosi ja Vuosineljännes).
from pyspark.sql.functions import col, year, month, quarter table_name = 'fact_sale' df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/fact_sale_1y_full') df = df.withColumn('Year', year(col("InvoiceDateKey"))) df = df.withColumn('Quarter', quarter(col("InvoiceDateKey"))) df = df.withColumn('Month', month(col("InvoiceDateKey"))) df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
Kun faktataulukot on ladattu, voit siirtyä muiden dimensioiden tietojen lataamiseen. Seuraava solu luo funktion raakatietojen lukemiseen Lakehousen Tiedostot-osasta kullekin parametrina välitetylle taulukon nimelle. Seuraavaksi se luo dimensiotaulukoiden luettelon. Lopuksi se käy läpi taulukkoluettelon ja luo Delta-taulukon kullekin taulukon nimelle, joka luetaan syöteparametrista. Huomaa, että komentosarja jättää pois tässä esimerkissä nimetyn
Photo
sarakkeen, koska saraketta ei käytetä.from pyspark.sql.types import * def loadFullDataFromSource(table_name): df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/' + table_name) df = df.drop("Photo") df.write.mode("overwrite").format("delta").save("Tables/" + table_name) full_tables = [ 'dimension_city', 'dimension_customer' 'dimension_date', 'dimension_employee', 'dimension_stock_item' ] for table in full_tables: loadFullDataFromSource(table)
Voit vahvistaa luodut taulukot napsauttamalla hiiren kakkospainiketta ja valitsemalla päivitä wwilakehouse Lakehousessa. Taulukot tulevat näkyviin.
Avaa työtilan kohteet-näkymä uudelleen ja valitse wwilakehouse-järvitalo .
Avaa nyt toinen muistikirja. Valitse Lakehouse-näkymässä Avoin muistikirja>Olemassa oleva muistikirja valintanauhasta.
Valitse olemassa olevien muistikirjojen luettelosta 02 - Tietojen muuntaminen - Yritysmuistikirja avataksesi sen.
Lakehouse Explorerin avoimessa muistikirjassa näet, että muistikirja on jo linkitetty avattuun lakehouseesi.
Organisaatiolla voi olla tietoteknikoita, jotka työskentelevät Skalaa/Pythonin kanssa, ja muita SQL:n (Spark SQL:n tai T-SQL:n) kanssa työskenteleviä tietoteknikoita, jotka käyttävät samaa tietojen kopiota. Fabric mahdollistaa näiden eri ryhmien työskentelyn ja yhteistyön monipuolisen käyttökokemuksen ja suosimisen. Kaksi eri lähestymistapaa muuntavat ja luovat yrityskoosteita. Voit valita sinulle sopivan tai yhdistää ja sovittaa nämä menetelmät haluamasi mukaan suorituskyvyn vaarantamatta:
Lähestymistapa 1 : Yhdistä ja koosta tietoja yrityskoosteiden luomista varten PySparkin avulla. Tämä lähestymistapa on suositeltavissa jollekulle, jolla on ohjelmointitausta (Python tai PySpark).
Lähestymistapa 2 : Yhdistä ja koosta tietoja yrityskoosteiden luontia varten Spark SQL :n avulla. Tätä tapaa kannattaa käyttää SQL-taustaista henkilöä, joka on siirtymässä Sparkiin.
Lähestymistapa 1 (sale_by_date_city) – Yhdistä ja koosta tietoja yrityskoosteiden luomista varten PySparkin avulla. Seuraavalla koodilla luot kolme erilaista Spark-tietokehystä, joista kukin viittaa olemassa olevaan Delta-taulukkoon. Sitten yhdistät nämä taulukot käyttämällä tietokehyksiä, luot ryhmittelyperuste-toiminnon koostamista varten, nimeät uudelleen muutamia sarakkeita ja lopuksi kirjoitat sen Delta-taulukoksi Lakehousen Taulukot-osaan tietojen tallentamiseksi.
Tässä solussa luot kolme erilaista Spark-tietokehystä, joista kukin viittaa olemassa olevaan Delta-taulukkoon.
df_fact_sale = spark.read.table("wwilakehouse.fact_sale") df_dimension_date = spark.read.table("wwilakehouse.dimension_date") df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
Lisää seuraava koodi samaan soluun, jos haluat liittää nämä taulukot aiemmin luotujen tietokehysten avulla. Ryhmittelyperuste-asetuksella voit luoda koosteen, nimetä uudelleen muutamia sarakkeita ja kirjoittaa sen lopuksi Delta-taulukoksi Lakehousen Taulukot-osassa .
sale_by_date_city = df_fact_sale.alias("sale") \ .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \ .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \ .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\ .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\ .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\ .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\ .withColumnRenamed("sum(Profit)", "SumOfProfit")\ .orderBy("date.Date", "city.StateProvince", "city.City") sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
Lähestymistapa 2 (sale_by_date_employee) – Yhdistä ja koosta tietoja yrityskoosteiden luontia varten Spark SQL:n avulla. Seuraavan koodin avulla voit luoda väliaikaisen Spark-näkymän liittämällä kolme taulukkoa, tekemällä ryhmittelyperusteen koosteen luomiseksi ja nimeämällä muutamia sarakkeita uudelleen. Lopuksi luet tilapäisestä Spark-näkymästä ja kirjoitat sen lopuksi Delta-taulukkona Lakehousen Taulukot-osaan , jotta tiedot pysyvät käytössä.
Tässä solussa luot väliaikaisen Spark-näkymän yhdistämällä kolme taulukkoa, tekemällä ryhmittelyperusteen koosteen luomiseksi ja nimeämällä muutamia sarakkeita uudelleen.
%%sql CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee AS SELECT DD.Date, DD.CalendarMonthLabel , DD.Day, DD.ShortMonth Month, CalendarYear Year ,DE.PreferredName, DE.Employee ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax ,SUM(FS.TaxAmount) SumOfTaxAmount ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax ,SUM(Profit) SumOfProfit FROM wwilakehouse.fact_sale FS INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
Tässä solussa luet edellisessä solussa luodusta väliaikaisesta Spark-näkymästä ja lopuksi kirjoitat sen Delta-taulukkona Lakehousen Taulukot-osassa .
sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee") sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
Vahvista luodut taulukot napsauttamalla hiiren kakkospainikkeella ja valitsemalla Päivitä wwilakehouse Lakehousessa. Koostetaulukot tulevat näkyviin.
Näiden kahden lähestymistavan tulos on samankaltainen. Voit minimoida tarpeen oppia uutta teknologiaa tai tehdä kompromisseja suorituskyvyssä valitsemalla taustallesi ja haluamallesi mieltymykseen parhaiten sopivan lähestymistavan.
Saatat huomata, että kirjoitat tietoja Delta Lake -tiedostoina. Fabric-sovelluksen automaattinen taulukkojen etsiminen ja rekisteröintitoiminto poimii ja rekisteröi ne metasäilössä. Lausekkeita ei tarvitse kutsua CREATE TABLE
eksplisiittisesti SQL:n kanssa käytettävien taulukoiden luomiseksi.