Azure Synapse pipeline

Lotus88 46 Reputation points
2025-01-17T09:16:28.69+00:00

Hi,

I have a question regarding Synapse. I want to create a pipeline that can delta update the target table “quotes_target” from source table “quotes_source” with filtered quotes data. I have a list of quotes id of the updated quotes after I extracted it from CDC tables. I populated them in a parquet file. I want to use a Copy activity to copy the updated quotes from “quotes_source” table to “quotes_target” table. However I am stuck how can I filtered the source data by the list of quotes id in parquet file?

Can anyone help? Thank you!

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,135 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Vinodh247 27,201 Reputation points MVP
    2025-01-17T17:02:03.7766667+00:00

    Hi ,

    Thanks for reaching out to Microsoft Q&A.

    To filter the source data by the list of quote IDs in your parquet file and copy only the updated quotes from the quotes_source table to the quotes_target table in synapse, you can try the below steps:

    Step 1: Load the Parquet File to a Staging Table

    1. Create a staging table in your database to temporarily hold the quote IDs from the Parquet file. For example: --> CREATE TABLE quotes_id_staging (quote_id INT);
    2. Use a Copy activity in your synapse pipeline to load the parquet file into this quotes_id_staging table. Configure the source dataset as the parquet file and the sink dataset as the staging table.

    Step 2: Filter Data from the Source Table

    Once the quotes_id_staging table is populated with the list of quote_ids, use a Mapping Data Flow or Stored Procedure to filter the quotes_source table based on the list of quote_ids.

    Option 1: Use a Mapping Data Flow

    1. Create a Data Flow in your Synapse pipeline.
    2. Add a source transformation to read data from the quotes_source table.
    3. Add another source transformation to read data from the quotes_id_staging table.
    4. Use a Join transformation:
      • Join the quotes_source table and the quotes_id_staging table on the quote_id column.
        • Select the Inner Join type to filter only matching quote_ids.
        1. Add a sink transformation to write the filtered data to the quotes_target table.

    Option 2: Use a stored procedure

    You can write a SQL query to perform the filtering and updating in the database using a MERGE statement. Add a Stored Procedure activity to the pipeline to execute this SQL. Here's an example:

    MERGE INTO quotes_target AS target
    USING (
        SELECT qs.*
        FROM quotes_source qs
        INNER JOIN quotes_id_staging qids ON qs.quote_id = qids.quote_id
    ) AS source
    ON target.quote_id = source.quote_id
    WHEN MATCHED THEN
        UPDATE SET 
            target.column1 = source.column1,
            target.column2 = source.column2,
            -- Add all columns you want to update
            target.updated_at = GETDATE()
    WHEN NOT MATCHED THEN
        INSERT (quote_id, column1, column2, created_at)
        VALUES (source.quote_id, source.column1, source.column2, GETDATE());
    
    
    

    Optionally, truncate the quotes_id_staging table after the pipeline run to keep it clean for future updates. This approach ensures scalability and flexibility while leveraging the capabilities of synapse for incremental updates.

    Please feel free to click the 'Upvote' (Thumbs-up) button and 'Accept as Answer'. This helps the community by allowing others with similar queries to easily find the solution.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.