Created partitioned table with below schema
CREATE TABLE Landing.TargetTable
(
Id INT,
Name STRING,
SourceSystem INT,
SourceId INT,
ETLId INT
)
USING DELTA
PARTITIONED BY (SourceId);
Two source tables are there from where data are getting merged in TargetTable from Spark notebook.
Notebook 1 :
Source table schema from which data will be merged to target
%%sql
CREATE TABLE Landing.SourceTable
(
Id INT,
Name STRING,
SourceSystem INT,
SourceId INT,
ETLId INT
)
USING DELTA;
Using below code merging SourceTable to TargetTable
Sourcedf = spark.sql("select * from Landing.SourceTable")
Sourcedf.createOrReplaceTempView("Sourcedf")
spark.sql("MERGE INTO Landing.TargetTable AS target USING Sourcedf AS source ON source.Id = target.Id AND source.Name = target.Name AND source.SourceSystem = target.SourceSystem and source.SourceId = target.SourceId AND target.SourceId = 1 WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED BY target THEN INSERT * WHEN NOT MATCHED BY source THEN DELETE")
Notebook 2 :
%%sql
CREATE TABLE Landing.SourceTable1
(
Id INT,
Name STRING,
SourceSystem INT,
SourceId INT,
ETLId INT
)
USING DELTA;
Using below code merging SourceTable to TargetTable
Sourcedf = spark.sql("select * from Landing.SourceTable1")
Sourcedf.createOrReplaceTempView("Sourcedf")
spark.sql("MERGE INTO Landing.TargetTable AS target USING Sourcedf AS source ON source.Id = target.Id AND source.Name = target.Name AND source.SourceSystem = target.SourceSystem and source.SourceId = target.SourceId AND target.SourceId = 2 WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED BY target THEN INSERT * WHEN NOT MATCHED BY source THEN DELETE")
Both notebooks Notebook1 and Notebook2 will execute simultaneously and gives below error
Py4JJavaError: An error occurred while calling o289.sql. : io.delta.exceptions.ConcurrentAppendException: Files were added to partition [SourceId=2] by a concurrent update. Please try the operation again. Conflicting commit: {"timestamp":1733829194341,"operation":"MERGE","operationParameters":{"predicate":["((((Id#10232 = Id#10269) AND (Name#10233 = Name#10270)) AND (SourceSystem#10234 = SourceSystem#10271)) AND ((SourceId#10235 = SourceId#10272) AND (SourceId#10272 = 2)))"],"matchedPredicates":[{"actionType":"update"}],"notMatchedPredicates":[{"actionType":"insert"}],"notMatchedBySourcePredicates":[{"predicate":"(SourceId#10272 = 2)","actionType":"delete"}]},"readVersion":14,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetFilesAdded":"1","numTargetBytesAdded":"1235","numTargetBytesRemoved":"1235","numTargetRowsMatchedUpdated":"3","executionTimeMs":"2367","numTargetRowsInserted":"0","numTargetRowsMatchedDeleted":"0","unmodifiedRewriteTimeMs":"0","scanTimeMs":"1049","numTargetRowsUpdated":"3","numOutputRows":"3","numTargetRowsNotMatchedBySourceUpdated":"0","numTargetChangeFilesAdded":"0","numSourceRows":"3","numTargetFilesRemoved":"1","numTargetRowsNotMatchedBySourceDeleted":"0","rewriteTimeMs":"1056"},"engineInfo":"Apache-Spark/3.4.3.5.3.20241016.1 Delta-Lake/2.4.0.19","txnId":"fbb0a402-4b6c-46c1-ab60-35f9c0c91332"}