Concurrency issue in partitioned table while updating same table from different spark notebook

heta desai 357 Reputation points
2024-12-10T11:23:16.31+00:00

Created partitioned table with below schema

CREATE TABLE Landing.TargetTable
(
    Id INT,
    Name STRING,
    SourceSystem INT,
    SourceId INT,
    ETLId INT
)
USING DELTA
PARTITIONED BY (SourceId);

Two source tables are there from where data are getting merged in TargetTable from Spark notebook.

Notebook 1 :

Source table schema from which data will be merged to target

%%sql

CREATE TABLE Landing.SourceTable
(
    Id INT,
    Name STRING,
    SourceSystem INT,
    SourceId INT,
    ETLId INT
)
USING DELTA;

Using below code merging SourceTable to TargetTable

Sourcedf = spark.sql("select * from Landing.SourceTable")

Sourcedf.createOrReplaceTempView("Sourcedf")

spark.sql("MERGE INTO Landing.TargetTable AS target USING Sourcedf AS source ON source.Id = target.Id AND source.Name = target.Name AND source.SourceSystem = target.SourceSystem and source.SourceId = target.SourceId AND target.SourceId = 1 WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED BY target THEN INSERT * WHEN NOT MATCHED BY source THEN DELETE")

Notebook 2 :

%%sql

CREATE TABLE Landing.SourceTable1
(
    Id INT,
    Name STRING,
    SourceSystem INT,
    SourceId INT,
    ETLId INT
)
USING DELTA;

Using below code merging SourceTable to TargetTable

Sourcedf = spark.sql("select * from Landing.SourceTable1")

Sourcedf.createOrReplaceTempView("Sourcedf")

spark.sql("MERGE INTO Landing.TargetTable AS target USING Sourcedf AS source ON source.Id = target.Id AND source.Name = target.Name AND source.SourceSystem = target.SourceSystem and source.SourceId = target.SourceId AND target.SourceId = 2 WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED BY target THEN INSERT * WHEN NOT MATCHED BY source THEN DELETE")

Both notebooks Notebook1 and Notebook2 will execute simultaneously and gives below error

Py4JJavaError: An error occurred while calling o289.sql. : io.delta.exceptions.ConcurrentAppendException: Files were added to partition [SourceId=2] by a concurrent update. Please try the operation again. Conflicting commit: {"timestamp":1733829194341,"operation":"MERGE","operationParameters":{"predicate":["((((Id#10232 = Id#10269) AND (Name#10233 = Name#10270)) AND (SourceSystem#10234 = SourceSystem#10271)) AND ((SourceId#10235 = SourceId#10272) AND (SourceId#10272 = 2)))"],"matchedPredicates":[{"actionType":"update"}],"notMatchedPredicates":[{"actionType":"insert"}],"notMatchedBySourcePredicates":[{"predicate":"(SourceId#10272 = 2)","actionType":"delete"}]},"readVersion":14,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetFilesAdded":"1","numTargetBytesAdded":"1235","numTargetBytesRemoved":"1235","numTargetRowsMatchedUpdated":"3","executionTimeMs":"2367","numTargetRowsInserted":"0","numTargetRowsMatchedDeleted":"0","unmodifiedRewriteTimeMs":"0","scanTimeMs":"1049","numTargetRowsUpdated":"3","numOutputRows":"3","numTargetRowsNotMatchedBySourceUpdated":"0","numTargetChangeFilesAdded":"0","numSourceRows":"3","numTargetFilesRemoved":"1","numTargetRowsNotMatchedBySourceDeleted":"0","rewriteTimeMs":"1056"},"engineInfo":"Apache-Spark/3.4.3.5.3.20241016.1 Delta-Lake/2.4.0.19","txnId":"fbb0a402-4b6c-46c1-ab60-35f9c0c91332"}

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,089 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,283 questions
{count} votes

1 answer

Sort by: Most helpful
  1. heta desai 357 Reputation points
    2024-12-12T13:11:29.28+00:00

    @Chandra Boorla

    I was trying to resolve the issue with explicit partitioning, In MERGE statement I have explicitly added condition in ON clause target.SourceId = <specific sourceId 1 or 2>. Still it is accessing other partition which leads to concurrency error.

    Notebook 1 MERGE Statement

    spark.sql("MERGE INTO Landing.TargetTable AS target USING Sourcedf AS source ON source.Id = target.Id AND source.Name = target.Name AND source.SourceSystem = target.SourceSystem and source.SourceId = target.SourceId AND target.SourceId = 1 WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED BY target THEN INSERT * WHEN NOT MATCHED BY source THEN DELETE")

    Notebook 2 MERGE Statement

    spark.sql("MERGE INTO Landing.TargetTable AS target USING Sourcedf AS source ON source.Id = target.Id AND source.Name = target.Name AND source.SourceSystem = target.SourceSystem and source.SourceId = target.SourceId AND target.SourceId = 2 WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED BY target THEN INSERT * WHEN NOT MATCHED BY source THEN DELETE")


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.