Hi ,
Thanks for reaching out to Microsoft Q&A.
To filter the source data by the list of quote IDs in your parquet file and copy only the updated quotes from the quotes_source
table to the quotes_target
table in synapse, you can try the below steps:
Step 1: Load the Parquet File to a Staging Table
- Create a staging table in your database to temporarily hold the quote IDs from the Parquet file. For example: --> CREATE TABLE quotes_id_staging (quote_id INT);
- Use a Copy activity in your synapse pipeline to load the parquet file into this
quotes_id_staging
table. Configure the source dataset as the parquet file and the sink dataset as the staging table.
Step 2: Filter Data from the Source Table
Once the quotes_id_staging
table is populated with the list of quote_id
s, use a Mapping Data Flow or Stored Procedure to filter the quotes_source
table based on the list of quote_id
s.
Option 1: Use a Mapping Data Flow
- Create a Data Flow in your Synapse pipeline.
- Add a source transformation to read data from the
quotes_source
table. - Add another source transformation to read data from the
quotes_id_staging
table. - Use a Join transformation:
- Join the
quotes_source
table and thequotes_id_staging
table on thequote_id
column.- Select the
Inner Join
type to filter only matchingquote_id
s.
- Add a sink transformation to write the filtered data to the
quotes_target
table.
- Select the
- Join the
Option 2: Use a stored procedure
You can write a SQL query to perform the filtering and updating in the database using a MERGE
statement. Add a Stored Procedure activity to the pipeline to execute this SQL. Here's an example:
MERGE INTO quotes_target AS target
USING (
SELECT qs.*
FROM quotes_source qs
INNER JOIN quotes_id_staging qids ON qs.quote_id = qids.quote_id
) AS source
ON target.quote_id = source.quote_id
WHEN MATCHED THEN
UPDATE SET
target.column1 = source.column1,
target.column2 = source.column2,
-- Add all columns you want to update
target.updated_at = GETDATE()
WHEN NOT MATCHED THEN
INSERT (quote_id, column1, column2, created_at)
VALUES (source.quote_id, source.column1, source.column2, GETDATE());
Optionally, truncate the quotes_id_staging
table after the pipeline run to keep it clean for future updates. This approach ensures scalability and flexibility while leveraging the capabilities of synapse for incremental updates.
Please feel free to click the 'Upvote' (Thumbs-up) button and 'Accept as Answer'. This helps the community by allowing others with similar queries to easily find the solution.