Condividi tramite


Esercitazione: Convalidare i dati usando SemPy e Great Expectations (GX)

In questa esercitazione si apprenderà a usare SemPy insieme a Great Expectations (GX) per eseguire la convalida dei dati nei modelli semantici di Power BI.

Questa esercitazione illustra come:

  • Convalidare i vincoli su un set di dati nell'area di lavoro Fabric con l'origine dati Fabric di Great Expectation (basata sul collegamento semantico).
    • Configurare un contesto dati GX, asset di dati e aspettative.
    • Visualizzare i risultati della convalida con un checkpoint GX.
  • Usare il collegamento semantico per analizzare i dati non elaborati.

Prerequisiti

  • Selezionare Aree di lavoro nel riquadro di spostamento sinistro per trovare e selezionare l'area di lavoro. Questa area di lavoro diventa l'area di lavoro corrente.
  • Scaricare il file Retail Analysis Sample PBIX.pbix.
  • Nell'area di lavoro usare il pulsante Carica per caricare il file Retail Analysis Sample PBIX.pbix nell'area di lavoro.

Seguire la procedura in Notebook

great_expectations_tutorial.ipynb è il notebook che accompagna questa esercitazione.

Per aprire il notebook di accompagnamento per questa esercitazione, seguire le istruzioni riportate in Preparare il sistema per le esercitazioni di data science per importare il notebook nell'area di lavoro.

Se si preferisce copiare e incollare il codice da questa pagina, è possibile creare un nuovo notebook.

Assicurarsi di collegare un lakehouse al notebook prima di iniziare a eseguire il codice.

Configurare il notebook

In questa sezione viene configurato un ambiente notebook con i moduli e i dati necessari.

  1. Installare SemPy e le librerie Great Expectations pertinenti da PyPI usando la funzionalità di installazione in linea %pipall'interno del notebook.
# install libraries
%pip install semantic-link 'great-expectations<1.0' great_expectations_experimental great_expectations_zipcode_expectations

# load %%dax cell magic
%load_ext sempy
  1. Eseguire le importazioni necessarie di moduli che serviranno in un secondo momento:
import great_expectations as gx
from great_expectations.expectations.expectation import ExpectationConfiguration
from great_expectations_zipcode_expectations.expectations import expect_column_values_to_be_valid_zip5

Configurare il contesto dati GX e l'origine dati

Per iniziare a usare Great Expectations, è prima necessario configurare un contesto dati GX. Il contesto funge da punto di ingresso per le operazioni GX e contiene tutte le configurazioni pertinenti.

context = gx.get_context()

È ora possibile aggiungere il set di dati di Fabric a questo contesto come origine dati per iniziare a interagire con i dati. Questa esercitazione usa un file Retail Analysis Sample .pbix come modello semantico di esempio standard di Power BI.

ds = context.sources.add_fabric_powerbi("Retail Analysis Data Source", dataset="Retail Analysis Sample PBIX")

Specificare gli asset di dati

Definire gli asset di dati per specificare il subset di dati con cui si vuole lavorare. L'asset può essere semplice come le tabelle complete o essere complesso come una query DAX (Data Analysis Expressions) personalizzata.

In questo caso si aggiungeranno più asset:

Tabella Power BI

Aggiungere una tabella di Power BI come asset di dati.

ds.add_powerbi_table_asset("Store Asset", table="Store")

Misura Power BI

Se il set di dati contiene misure preconfigurate, aggiungere le misure come asset seguendo un'API simile a evaluate_measure di SemPy.

ds.add_powerbi_measure_asset(
    "Total Units Asset",
    measure="TotalUnits",
    groupby_columns=["Time[FiscalYear]", "Time[FiscalMonth]"]
)

DAX

Per definire misure personalizzate o avere un maggiore controllo su righe specifiche, è possibile aggiungere un asset DAX con una query DAX personalizzata. In questo caso viene definita una misura Total Units Ratio dividendo due misure esistenti.

ds.add_powerbi_dax_asset(
    "Total Units YoY Asset",
    dax_string=
    """
    EVALUATE SUMMARIZECOLUMNS(
        'Time'[FiscalYear],
        'Time'[FiscalMonth],
        "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
    )    
    """
)

Query DMV

In alcuni casi, potrebbe essere utile usare calcoli Dynamic Management View (DMV) come parte del processo di convalida dei dati. Ad esempio, è possibile tenere traccia del numero di violazioni dell’integrità referenziale all'interno del set di dati. Per altre informazioni, vedere Pulizia dei dati = report più veloci.

ds.add_powerbi_dax_asset(
    "Referential Integrity Violation",
    dax_string=
    """
    SELECT
        [Database_name],
        [Dimension_Name],
        [RIVIOLATION_COUNT]
    FROM $SYSTEM.DISCOVER_STORAGE_TABLES
    """
)

Aspettative

Per aggiungere vincoli specifici agli asset, è prima necessario configurare gruppi di aspetattive. Dopo aver aggiunto singole aspettative a ogni gruppo, è possibile aggiornare la configurazione del contesto dati all'inizio del nuovo gruppo. Per un elenco completo delle aspettative disponibili, vedere Raccolta aspettative GX.

Per iniziare, aggiungere una "Retail Store Suite" con due aspettative:

  • un codice postale valido
  • una tabella con un numero di righe compreso tra 80 e 200
suite_store = context.add_expectation_suite("Retail Store Suite")

suite_store.add_expectation(ExpectationConfiguration("expect_column_values_to_be_valid_zip5", { "column": "PostalCode" }))
suite_store.add_expectation(ExpectationConfiguration("expect_table_row_count_to_be_between", { "min_value": 80, "max_value": 200 }))

context.add_or_update_expectation_suite(expectation_suite=suite_store)

Misura TotalUnits

Aggiungere una "Retail Measure Suite" con una sola aspettativa:

  • I valori di colonna devono essere maggiori di 50.000
suite_measure = context.add_expectation_suite("Retail Measure Suite")
suite_measure.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "TotalUnits",
        "min_value": 50000
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_measure)

DAX Total Units Ratio

Aggiungere una "Retail DAX Suite" con una sola aspettativa:

  • I valori di colonna per il rapporto unità totali devono essere compresi tra 0,8 e 1,5
suite_dax = context.add_expectation_suite("Retail DAX Suite")
suite_dax.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "[Total Units Ratio]",
        "min_value": 0.8,
        "max_value": 1.5
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_dax)

Violazioni dell'integrità referenziale (DMV)

Aggiungere una "Retail DMV Suite" con una sola aspettativa:

  • il RIVIOLATION_COUNT deve essere 0
suite_dmv = context.add_expectation_suite("Retail DMV Suite")
# There should be no RI violations
suite_dmv.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_in_set", 
    {
        "column": "RIVIOLATION_COUNT",
        "value_set": [0]
    }
))
context.add_or_update_expectation_suite(expectation_suite=suite_dmv)

Convalida

Per eseguire effettivamente le aspettative specificate sui dati, creare prima un checkpoint e aggiungerlo al contesto. Per altre informazioni sulla configurazione del checkpoint, vedere Flusso di lavoro di convalida dei dati.

checkpoint_config = {
    "name": f"Retail Analysis Checkpoint",
    "validations": [
        {
            "expectation_suite_name": "Retail Store Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Store Asset",
            },
        },
        {
            "expectation_suite_name": "Retail Measure Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DAX Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units YoY Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DMV Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Referential Integrity Violation",
            },
        },
    ],
}
checkpoint = context.add_checkpoint(
    **checkpoint_config
)

Eseguire ora il checkpoint ed estrarre i risultati come DataFrame pandas per una formattazione semplice.

result = checkpoint.run()

Elaborare e stampare i risultati.

import pandas as pd

data = []

for run_result in result.run_results:
    for validation_result in result.run_results[run_result]["validation_result"]["results"]:
        row = {
            "Batch ID": run_result.batch_identifier,
            "type": validation_result.expectation_config.expectation_type,
            "success": validation_result.success
        }

        row.update(dict(validation_result.result))
        
        data.append(row)

result_df = pd.DataFrame.from_records(data)    

result_df[["Batch ID", "type", "success", "element_count", "unexpected_count", "partial_unexpected_list"]]

La tabella mostra i risultati della convalida.

Da questi risultati è possibile notare che tutte le aspettative hanno superato la convalida, ad eccezione del "Total Units YoY Asset" definito tramite una query DAX personalizzata.

Diagnostica

Usando il collegamento semantico, è possibile recuperare i dati di origine per comprendere quali anni esatti sono al di fuori dell'intervallo. Il collegamento semantico fornisce un magic inline per l'esecuzione di query DAX. Usare il collegamento semantico per eseguire la stessa query passata nell'asset di dati GX e visualizzare i valori risultanti.

%%dax "Retail Analysis Sample PBIX"

EVALUATE SUMMARIZECOLUMNS(
    'Time'[FiscalYear],
    'Time'[FiscalMonth],
    "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)

La tabella mostra i risultati del riepilogo delle query DAX.

Salvare questi risultati in un DataFrame.

df = _

Tracciare i risultati.

import matplotlib.pyplot as plt

df["Total Units % Change YoY"] = (df["[Total Units Ratio]"] - 1)

df.set_index(["Time[FiscalYear]", "Time[FiscalMonth]"]).plot.bar(y="Total Units % Change YoY")

plt.axhline(0)

plt.axhline(-0.2, color="red", linestyle="dotted")
plt.axhline( 0.5, color="red", linestyle="dotted")

None

Il tracciato mostra i risultati del riepilogo delle query DAX.

Dal tracciato, si può vedere che aprile e luglio erano leggermente fuori dall’intervallo e quindi si possono eseguire ulteriori operazioni per indagare.

Archiviazione delle configurazioni GX

Man mano che i dati nel set di dati cambiano nel tempo, è possibile eseguire nuovamente le convalide GX appena eseguite. Attualmente, il contesto dati (contenente gli asset di dati connessi, le suite di aspettative e il checkpoint) vive in modo temporaneo, ma può essere convertito in un contesto di file per un uso futuro. In alternativa, è possibile creare un'istanza di un contesto di file (vedere Creare un'istanza di un contesto dati).

context = context.convert_to_file_context()

Dopo aver salvato il contesto, copiare la directory gx nel lakehouse.

Importante

Questa cella presuppone che sia stato aggiunto un lakehouse al notebook. Se non è presente alcun lakehouse collegato, non verrà visualizzato un errore, ma non sarà possibile ottenere il contesto in un secondo momento. Se si aggiunge ora un lakehouse, il kernel verrà riavviato, quindi sarà necessario eseguire nuovamente l'intero notebook per tornare a questo punto.

# copy GX directory to attached lakehouse
!cp -r gx/ /lakehouse/default/Files/gx

È ora possibile creare contesti futuri con context = gx.get_context(project_root_dir="<your path here>") per usare tutte le configurazioni di questa esercitazione.

Ad esempio, in un nuovo notebook collegare lo stesso lakehouse e usare context = gx.get_context(project_root_dir="/lakehouse/default/Files/gx") per recuperare il contesto.

Vedere altre esercitazioni per il collegamento semantico/SemPy: