Ejercicio: Diseño e implementación de una dimensión de variación lenta de tipo 1 con flujos de datos de asignación
En este ejercicio, creará un flujo de datos para una SCD de tipo 1 con un grupo de SQL dedicado de Azure Synapse como origen y destino. Este flujo de datos se podría agregar a una canalización de Synapse y ejecutarse como parte del proceso de extracción, transformación y carga (ETL).
Configuración del origen y la tabla de dimensiones
Para este ejercicio, quiere cargar una tabla de dimensiones en Azure Synapse desde datos de origen que pueden proceder de muchos tipos de sistema diferentes, como Azure SQL, almacenamiento de Azure, etc. Para que ejemplo sea sencillo, creará los datos de origen en la base de datos de Azure Synapse.
En Synapse Studio, vaya al centro Data (Datos).
Seleccione la pestaña Workspace (Área de trabajo) (1), expanda Databases (Bases de datos) y, después, haga clic con el botón derecho en SQLPool01 (2). Seleccione New SQL script (3) (Nuevo script SQL) y después Empty script (4) (Script vacío).
Pegue el script siguiente en la ventana del script vacío, después seleccione Run (Ejecutar) o presione
F5
para ejecutar la consulta:CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
Creación de un flujo de datos de asignación
Los flujos de datos de asignación son actividades de canalización que proporcionan una forma visual de especificar cómo se deben transformar los datos a través de una experiencia sin código. A continuación, creará un flujo de datos de asignación para crear un SCD de tipo 1.
Vaya al centro Develop (Desarrollar).
Seleccione + y, después, Flujo de datos.
En el panel de propiedades del nuevo flujo de datos, escriba
UpdateCustomerDimension
en el campo Name (Nombre) (1) y, después, seleccione el botón Properties (Propiedades) (2) para ocultar el panel de propiedades.Seleccione Add Source (Agregar origen) en el lienzo.
En
Source settings
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
SourceDB
- Tipo de origen: seleccione
Dataset
- Opciones: active
Allow schema drift
y deje desactivadas las demás opciones. - Muestreo: seleccione
Disable
. - Conjunto de datos: seleccione + New (+ Nuevo) para crear un conjunto de datos.
- Nombre de la secuencia de salida: escriba
En el cuadro de diálogo del nuevo conjunto de datos de integración, seleccione Azure Synapse Analytics y después Continuar.
En las propiedades del conjunto de datos, configure lo siguiente:
- Nombre: escriba
CustomerSource
- Servicio vinculado: seleccione el servicio vinculado del área de trabajo de Synapse
- Nombre de tabla: seleccione el botón Actualizar junto a la lista desplegable
- Nombre: escriba
En el campo Valor, escriba el nombre del grupo de SQL y seleccione Aceptar.
Seleccione
dbo.CustomerSource
en Nombre de tabla, seleccioneFrom connection/store
en Importar esquema y, después Aceptar para crear el conjunto de datos.Seleccione Abrir junto al conjunto de datos
CustomerSource
que ha agregado.Escriba el nombre del grupo de SQL en el campo Valor situado junto a
DBName
.En el editor de flujo de datos, seleccione el cuadro Add Source (Agregar origen) debajo de la actividad SourceDB. Configure este origen como la tabla DimCustomer, siguiendo los mismos pasos que ha usado para CustomerSource.
- Nombre de la secuencia de salida: escriba
DimCustomer
- Tipo de origen: seleccione
Dataset
- Opciones: active
Allow schema drift
y deje desactivadas las demás opciones. - Muestreo: seleccione
Disable
. - Conjunto de datos: seleccione + New (+ Nuevo) para crear un conjunto de datos. Use el servicio vinculado Azure Synapse y elija la tabla DimCustomer. Asegúrese de establecer DBName en el nombre del grupo de SQL.
- Nombre de la secuencia de salida: escriba
Adición de transformaciones al flujo de datos
Seleccione + a la derecha del origen
SourceDB
en el lienzo y, después, seleccione Columna derivada.En
Derived column's settings
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
CreateCustomerHash
- Incoming stream (Secuencia de entrada): seleccione
SourceDB
- Columnas: escriba lo siguiente:
Columna Expression Descripción Tipo en HashKey
sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
Crea un hash SHA256 de los valores de la tabla. Se usa para detectar los cambios de fila comparando el hash de los registros entrantes con el valor hash de los registros de destino, que coincide con el valor CustomerID
. La funcióniifNull
reemplaza los valores NULL por cadenas vacías. De lo contrario, los valores hash tienden a duplicarse cuando hay entradas NULL.- Nombre de la secuencia de salida: escriba
Seleccione + a la derecha de la columna derivada
CreateCustomerHash
en el lienzo y, después, seleccione Existe.En
Exists settings
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
Exists
- Left stream (Secuencia izquierda): seleccione
CreateCustomerHash
- Right stream (Secuencia derecha): seleccione
SynapseDimCustomer
- Exist type (Tipo de existencia): seleccione
Doesn't exist
- Exists conditions (Condiciones de existencia): establezca lo siguiente para Left (Izquierda) y Right (Derecha):
Left (Izquierda): columna de CreateCustomerHash Right (Derecha): columna de SynapseDimCustomer HashKey
HashKey
- Nombre de la secuencia de salida: escriba
Seleccione + a la derecha de
Exists
en el lienzo y después Búsqueda.En
Lookup settings
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
LookupCustomerID
- Primary stream (Secuencia principal): seleccione
Exists
- Lookup stream (Secuencia de búsqueda): seleccione
SynapseDimCustomer
- Match multiple rows (Coincidencia con varias filas): desactivada
- Match on (Coincidir en): seleccione
Any row
- Lookup conditions (Condiciones de búsqueda): establezca lo siguiente para Left (Izquierda) y Right (Derecha):
Left (Izquierda): la columna de Exists Right (Derecha): columna de SynapseDimCustomer CustomerID
CustomerID
- Nombre de la secuencia de salida: escriba
Seleccione + a la derecha de
LookupCustomerID
en el lienzo y, después, seleccione Columna derivada.En
Derived column's settings
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
SetDates
- Incoming stream (Secuencia de entrada): seleccione
LookupCustomerID
- Columnas: escriba lo siguiente:
Columna Expression Descripción Seleccione InsertedDate
iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
Si el valor InsertedDate
es NULL, inserte la marca de tiempo actual. De lo contrario, use el valorInsertedDate
.Seleccione ModifiedDate
currentTimestamp()
Actualice siempre el valor ModifiedDate
con la marca de tiempo actual.Nota:
Para insertar la segunda columna, seleccione + Agregar encima de la lista Columnas y, después, seleccione Agregar columna.
- Nombre de la secuencia de salida: escriba
Seleccione + a la derecha del paso de columna derivada
SetDates
en el lienzo y, después, seleccione Alter Row (Alteración de fila).En
Alter row settings
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
AllowUpserts
- Incoming stream (Secuencia de entrada): seleccione
SetDates
- Alter row conditions (Condiciones de modificar fila): escriba lo siguiente:
Condición Expression Descripción Seleccione Upsert if
true()
Establezca la condición en true()
en la condiciónUpsert if
para permitir upserts. Esto garantiza que todos los datos que pasan a través de los pasos del flujo de datos de asignación se insertarán o actualizarán en el receptor.- Nombre de la secuencia de salida: escriba
Seleccione + a la derecha del paso Alter row (Alteración de fila)
AllowUpserts
en el lienzo y, después, seleccione Receptor.En
Sink
, configure las propiedades siguientes:- Nombre de la secuencia de salida: escriba
Sink
- Incoming stream (Secuencia de entrada): seleccione
AllowUpserts
- Sink type (Tipo de receptor): seleccione
Dataset
- Conjunto de datos: seleccione
DimCustomer
- Opciones: active
Allow schema drift
y desactiveValidate schema
- Nombre de la secuencia de salida: escriba
Seleccione la pestaña Configuración y configure las propiedades siguientes:
- Update method (Método de actualización): active
Allow upsert
y desactive todas las demás opciones. - Key columns (Columnas de clave): seleccione
List of columns
y despuésCustomerID
en la lista - Table action (Acción de tabla): seleccione
None
- Enable staging (Habilitar almacenamiento provisional): desactivado
- Update method (Método de actualización): active
Seleccione la pestaña Mapping (Asignación) y desactive Auto mapping (Asignación automática). Configure la asignación de columnas de entrada como se describe a continuación:
Columnas de entrada Columnas de resultados SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
El flujo de asignación completado debe tener el aspecto siguiente. Seleccione Publicar todo para guardar los cambios.
Seleccione Publicar.
Procedimiento para probar el flujo de datos
Ha completado un flujo de datos de SCD de tipo 1. Si decide probarlo, podría agregar este flujo de datos a una canalización de integración de Synapse. Después, podría ejecutar la canalización una vez para realizar la carga inicial de los datos de origen del cliente en el destino DimCustomer.
Cada ejecución adicional de la canalización comparará los datos de la tabla de origen con lo que ya se incluye en la tabla de dimensiones (mediante HashKey) y solo actualizará los registros que han cambiado. Para probar esto, podría actualizar un registro en la tabla de origen y, después, volver a ejecutar la canalización y comprobar las actualizaciones de registros en la tabla de dimensiones.
Como ejemplo de cliente se usará Janet Gates. La carga inicial muestra que LastName
es Gates y CustomerId
es 4.
Esta es una instrucción de ejemplo que actualizaría el apellido del cliente en la tabla de origen.
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
Después de actualizar el registro y volver a ejecutar la canalización, DimCustomer mostraría estos datos actualizados.
El registro de cliente ha actualizado correctamente el valor LastName
para que coincida con el registro de origen y ha actualizado ModifiedDate
, sin realizar el seguimiento del valor LastName
anterior. Este es el comportamiento esperado para una SCD de tipo 1. Si el historial fuera necesario para el campo LastName
, tendría que modificar la tabla y el flujo de datos para que sean uno de los otros tipos de SCD que ha aprendido.