Azure Synapse pipeline

Lotus88 46 Reputation points
2025-01-17T09:16:28.69+00:00

Hi,

I have a question regarding Synapse. I want to create a pipeline that can delta update the target table “quotes_target” from source table “quotes_source” with filtered quotes data. I have a list of quotes id of the updated quotes after I extracted it from CDC tables. I populated them in a parquet file. I want to use a Copy activity to copy the updated quotes from “quotes_source” table to “quotes_target” table. However I am stuck how can I filtered the source data by the list of quotes id in parquet file?

Can anyone help? Thank you!

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,186 questions
{count} votes

2 answers

Sort by: Most helpful
  1. Vinodh247 27,871 Reputation points MVP
    2025-01-17T17:02:03.7766667+00:00

    Hi ,

    Thanks for reaching out to Microsoft Q&A.

    To filter the source data by the list of quote IDs in your parquet file and copy only the updated quotes from the quotes_source table to the quotes_target table in synapse, you can try the below steps:

    Step 1: Load the Parquet File to a Staging Table

    1. Create a staging table in your database to temporarily hold the quote IDs from the Parquet file. For example: --> CREATE TABLE quotes_id_staging (quote_id INT);
    2. Use a Copy activity in your synapse pipeline to load the parquet file into this quotes_id_staging table. Configure the source dataset as the parquet file and the sink dataset as the staging table.

    Step 2: Filter Data from the Source Table

    Once the quotes_id_staging table is populated with the list of quote_ids, use a Mapping Data Flow or Stored Procedure to filter the quotes_source table based on the list of quote_ids.

    Option 1: Use a Mapping Data Flow

    1. Create a Data Flow in your Synapse pipeline.
    2. Add a source transformation to read data from the quotes_source table.
    3. Add another source transformation to read data from the quotes_id_staging table.
    4. Use a Join transformation:
      • Join the quotes_source table and the quotes_id_staging table on the quote_id column.
        • Select the Inner Join type to filter only matching quote_ids.
        1. Add a sink transformation to write the filtered data to the quotes_target table.

    Option 2: Use a stored procedure

    You can write a SQL query to perform the filtering and updating in the database using a MERGE statement. Add a Stored Procedure activity to the pipeline to execute this SQL. Here's an example:

    MERGE INTO quotes_target AS target
    USING (
        SELECT qs.*
        FROM quotes_source qs
        INNER JOIN quotes_id_staging qids ON qs.quote_id = qids.quote_id
    ) AS source
    ON target.quote_id = source.quote_id
    WHEN MATCHED THEN
        UPDATE SET 
            target.column1 = source.column1,
            target.column2 = source.column2,
            -- Add all columns you want to update
            target.updated_at = GETDATE()
    WHEN NOT MATCHED THEN
        INSERT (quote_id, column1, column2, created_at)
        VALUES (source.quote_id, source.column1, source.column2, GETDATE());
    
    
    

    Optionally, truncate the quotes_id_staging table after the pipeline run to keep it clean for future updates. This approach ensures scalability and flexibility while leveraging the capabilities of synapse for incremental updates.

    Please feel free to click the 'Upvote' (Thumbs-up) button and 'Accept as Answer'. This helps the community by allowing others with similar queries to easily find the solution.


  2. Chandra Boorla 8,230 Reputation points Microsoft Vendor
    2025-02-12T10:46:27.5+00:00

    @Lotus88

    It looks like you're passing the p_extract_end_dt parameter from the pipeline to the data flow, but the value might not be properly applied to the sys_update_dt field during the upsert process in the sink.

    Understanding the Upsert Behavior in Data Flow

    When using upsert in an Azure Data Factory (ADF) Data Flow, the update behavior depends on whether the column is explicitly mapped and modified during the transformation.

    Here are few possible causes & solutions:

    Ensure Parameter Mapping in Data Flow - Inside the data flow, you need to explicitly assign sys_update_dt to p_extract_end_dt in a Derived Column transformation before the sink. If you didn’t do this, sys_update_dt might retain its original value from the source instead of being updated.

    Check Sink Mapping - Open the Sink transformation and verify that sys_update_dt is mapped correctly. If it’s missing, manually map it to the correct column.

    Upsert Behavior - In an upsert operation, only explicitly modified columns will be updated in the target table. If sys_update_dt is not treated as an update, it may retain its existing value.

    Check for "Alter Row" Transformation (if any) - If you're using an Alter Row transformation to control updates, ensure it allows the sys_update_dt column to be modified.

    Expected Behavior

    If mapped correctly, sys_update_dt should be updated to 2025-02-12 16:09:09.000. If not mapped, sys_update_dt may retain its previous value or not be updated.

    I hope this information helps.

    Thank you.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.