Share via


SSIS - Transfer data from Multiple tables using a single DFT

https://msdnshared.blob.core.windows.net/media/2016/05/0640_NinjaAwardTinyGold.pngGold Award Winner


Introduction

Building a simple data flow in SSIS with components - OLE DB source and OLE DB Destination requires the columns and data types to be determined at design time as SSIS cannot change a package metadata during run time. Sometimes it is not avoidable to extract and transform more than one table from a single or multiple database. This means that eventually there will be several data flow tasks for these kind of extractions.

 

Also whenever a particular table changes, the package needs to be modified and data flow task mappings have to be redone. For example., addition of a new table or a new column in an existing table or changing the datatype of the columns will result in modifying and redeploying the package.

​ 

Imagine you have a need to extract and transform 20 or 200 tables? This becomes a daunting task and challenges maintenance and flexibility. You can consider this workaround before looking to opt for a third party tool or generating SSIS package programmatically.

​  

Environment

 ​​

Database: SQL server 2012

Tools: SQL Server Data tools for Visual Studio 2012

 ​

Test Data and Tables

 ​

To demonstrate this solution, I have three tables in source database which I need to transfer records to 3 similar tables in destination database. For the test run here, new records are inserted in to the destination table(s) but this design can be extended to do full refresh or load only the changes.

 

There are three tables in source database with different datatypes and each table have 5 records in it:

Whereas in the destination, there are three similar tables with the same structure. These tables can either have data in them or completely empty. For the test run, these tables are empty.

With this there are two additional tables :

  • ImportTableList
  • TableTransform_SSIS - a staging table

"ImportTableList"  - have list of tables (SourceTableName, DestinationTableName) which needs to be used as a reference guide for the package during the transfer. Three tables have been listed in them (refer the screen-shot below)."ImportTableList" can reside either in source database or destination database.

"TableTransform_SSIS" - This staging table serves as a medium to transform any type of table with varying number of columns and datatypes. Each of these column is in datatype NVARCHAR(MAX) which allows NULL.

Currently this table has 200 columns (200 is just a limit in this example, this can be extended) numbered with the pattern col<columnnumber>. This table has got ability to transform records from any given table which has from 1 column to 200 columns. This table will have records only during the record transfers other than that it will be an empty physical table.

 

SSIS Package Design

Connection managers and variables

Control-Flow Design

Each step below is referring the Package control flow diagram:

1. Get_TableList (1) - This Execute SQL Task fetches the list of tables to be imported from source database to destination database through a sql query and the entire result set is returned to object variable "User::ImportList".

2. Foreach loop ((2) Foreach - Table) will fetch one record at a time (one table transfer at a time) from object variable "User:ImportList".

  

3. From Execute SQL Task - ((3) Fetch columns for Table(s)) fetch the columns of the specific source table, pass the "User::SourceTableName" as a parameter from parameter mapping to stored procedure "dbo.usp_buildSelectQuery_Get". This stored procedure will return the SQL query (to "User::SourceQuery") with all required columns in exact order for a given "User::SourceTableName".

code snippet for dbo.usp_buildSelectQuery_Get:

CREATE PROCEDURE  [dbo].[usp_buildSelectQuery_Get]
@SourceTableName VARCHAR(50)
AS
BEGIN
 DECLARE @colList NVARCHAR(4000)
 SELECT @colList = COALESCE(@colList + ',','') + 
                       CASE WHEN  A.DATA_TYPE='datetime'
                            THEN (A.COLUMN_NAME+'|'+A.DATA_TYPE)
                            ELSE A.COLUMN_NAME
                       END                                     
 FROM INFORMATION_SCHEMA.COLUMNS AS A
 WHERE A.TABLE_NAME = @SourceTableName
 ORDER BY  A.ORDINAL_POSITION
 
 SELECT 'SELECT'+ CHAR(32) + @colList + CHAR(32)+ 'FROM' +  CHAR(32) + 'dbo.'+ @SourceTableName
END

4. Later, the Script task ((4) - Build SQL Query) gets the 'User::SourceQuery' and within that it will modify the query by casting each column to datatype Nvarchar(Max) and assign an alias name to each column with 'Col' and then suffix a sequence number based on the column order. ex: for Employee table, this task will modify the query as:

"Select CAST(EmpID as nvarchar(max)) as Col1, Cast(EmpName as nvarchar(max)) as Col2,……,Cast(NULL as nvarchar(max)) as Col199,Cast(NULL as nvarchar(max)) as Col200".

As the Employee table has only 8 columns, the rest of 192 columns will be considered as NULLs. This will keep changing as per the source table columns. within the script task, CONVERT has been used instead of CAST for Datetime conversion to prevent the loss of millisecond.

Code snippet from Script task:

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Collections.Generic;
using System.Text;
using System.Linq;
  
public void  Main()
 {
  try
   {
      string SQL1 = Dts.Variables["User::SourceQuery"].Value.ToString();
      var stringbuild = BuildSrcQuery(SQL1);
      Dts.Variables["User::SourceQuery"].Value = stringbuild;
      Dts.TaskResult = (int)ScriptResults.Success;
   }
  catch (Exception e)
   {
      Dts.Log(e.Message, 0, null);
      Dts.TaskResult = (int)ScriptResults.Failure;
    }
 }
 
private string  BuildSrcQuery(string  SQL)
 {
      string SQL1;
      string Nullstring = "NULL";
   try
   {
    //Get only the select part from the given query
    SQL1 = SQL.Substring(SQL.IndexOf("SELECT", StringComparison.OrdinalIgnoreCase) + 6,
           SQL.IndexOf("FROM", StringComparison.OrdinalIgnoreCase) - 6);
    //Add the columns in to a Generic list
    List<string> colnames = SQL1.Split(',').ToList<string>();
    // Check the no of columns in the query and if it is lesser than 199 (No of Mapped columns)
    if (colnames.Count < 199)
     {
      for (int j = colnames.Count; j <= 199; j++)
          colnames.Add(Nullstring.ToString());
     }
     string[] arrycolname = colnames.ToArray();
     for (int i = 0; i < arrycolname.Length; i++)
      {
       if (arrycolname[i].EndsWith("|datetime"))
        {
         arrycolname[i] = "CONVERT(NVARCHAR(MAX),"
         + arrycolname[i].Substring(0, arrycolname[i].LastIndexOf("|datetime")) + ",121) AS col"  + (i + 1);
        }
       else
       {
        arrycolname[i] = "CAST(" + arrycolname[i] + " as NVARCHAR(Max)) AS col"  + (i + 1);
       }
 
      }
    SQL1 = String.Join(",", arrycolname);
    //Build the SQL Query
    return "SELECT " + SQL1 +  " " + SQL.Substring(SQL.IndexOf("FROM", StringComparison.OrdinalIgnoreCase));
 }
 catch (Exception ex)
 {
 return SQL1 = "";
 }

5. In the Data-Flow task -((5) Transfer records from source to staging), from the OLEDB Source choose 'SQL command from variable' and Variable - "User::SourceQuery".


The Default value of the variable "User::SourceQuery" will be:

"SELECT  TOP 1 CAST(1 AS NVARCHAR(MAX)) AS col1,CAST(2 AS NVARCHAR(MAX)) AS col2,..,CAST(199 AS NVARCHAR(MAX)) AS col199,CAST(200 AS NVARCHAR(MAX)) AS col200 FROM dbo.ImportTableList"

The table "ImportTableList" in this default query can be any table from the source database as it does not matter here. 


In the OLE DB Destination choose the Staging table - TableTransform_SSIS.


Mappings between source and destination:

So the mappings never change during the runtime but on every table, each column data varies according to the input source columns and all additional columns will have NULLs. For example observing the data viewer during execution clarifies that:

 

a) Employee table have their corresponding data in first 8 columns and rest of the columns have NULLS.

 

b) Like-wise EmployeeDetails table have ten columns in it and other columns will have NULLs.

 

c) Similarly with the Department table that has seven columns and rest of them have NULLs.

  

6. Execute SQL Task - ((6) Inserting records from Staging table to actual table): Once the data in the Staging table, using Execute SQL Task to perform operations like Insert, Update, Delete to actual table. Here, the stored procedure (usp_RefreshTables_Insert) will just perform insert operation where it will insert only records it hasn't inserted before by using Primary Key. Also, it will just pick the columns with data from staging table ignoring all columns with NULLs.

       

Within the stored procedure, Information_schema.Columns is used to identify the correct data type of the destination table and then during insert operation, it will convert each data from nvarchar(max) type to appropriate data type. 

Code snippet for dbo.usp_RefreshTables_Insert:

CREATE PROCEDURE  [dbo].[usp_RefreshTables_Insert]
    @TBLNAME        VARCHAR(50)
AS
BEGIN
 BEGIN TRY
 
 DECLARE
   @InsertColumnList    NVARCHAR(4000)
  ,@SelColumnList       NVARCHAR(4000)
  ,@INSERTSQLSTRING     NVARCHAR(4000)
  ,@BuildCondition      NVARCHAR(4000)
 
--Build the Insert script
 SELECT @BuildCondition = COALESCE(@BuildCondition + ' AND ','') +
                         ('A.'+ B.COLUMN_NAME+ '='+ 'dbo.TableTransform_SSIS.Col'
                              + CAST(B.ORDINAL_POSITION AS  VARCHAR(12)))
 FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS A
 INNER JOIN INFORMATION_SCHEMA.COLUMNS AS  B 
 ON A.TABLE_NAME = B.TABLE_NAME 
  AND A.COLUMN_NAME = B.COLUMN_NAME
 WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_NAME),'IsPrimaryKey') = 1 
  AND B.TABLE_NAME = @TBLNAME
         
 SELECT @InsertColumnList = COALESCE(@InsertColumnList + ', ','') + COLUMN_NAME 
 FROM INFORMATION_SCHEMA.COLUMNS
 WHERE TABLE_NAME = @TBLNAME
 
 SELECT @SelColumnList = COALESCE(@SelColumnList + ',','') 
       + 'cast(col'+  CAST(ORDINAL_POSITION AS  VARCHAR(12)) + ' ' + 'AS'  + ' '
       + CASE
         WHEN (DATA_TYPE='numeric' OR DATA_TYPE='decimal') THEN  DATA_TYPE +
          '('+CAST(NUMERIC_PRECISION AS  VARCHAR(36))+','+CAST(NUMERIC_SCALE AS  VARCHAR(12))+')'
         WHEN (CHARACTER_MAXIMUM_LENGTH = -1) THEN DATA_TYPE + '(MAX)'
         WHEN (CHARACTER_MAXIMUM_LENGTH IS NULL) THEN DATA_TYPE
         ELSE DATA_TYPE + '(' + CAST(CHARACTER_MAXIMUM_LENGTH AS  VARCHAR(12)) + ')'
         END + ')'
 FROM INFORMATION_SCHEMA.COLUMNS
   WHERE TABLE_NAME = @TBLNAME
 
 SET @INSERTSQLSTRING = 'Insert into ' + 'dbo.'  + @TBLNAME + '('+ @InsertColumnList + ')'
        + CHAR(10) + ' Select ' + @SelColumnList 
        + CHAR(10) + 'from dbo.TableTransform_SSIS'
        + CHAR(10) + 'WHERE NOT EXISTS (SELECT 1 FROM '  + @TBLNAME + ' A WHERE '  + @BuildCondition +')'
        + CHAR(10)
 EXECUTE sp_executesql @INSERTSQLSTRING
    END TRY
    BEGIN CATCH         
        DECLARE @ErrMsg NVARCHAR(4000) ,@ErrSeverity INT
        SELECT  @ErrMsg = ERROR_MESSAGE() ,@ErrSeverity = ERROR_SEVERITY()
        RAISERROR ( @ErrMsg , @ErrSeverity ,1)
        RETURN         
    END CATCH
END

7. Execute SQL Task - ((7)Truncate Staging Table): For every table within the loop, Staging table will be truncated. So it gets reused for every individual table and at the end the table will be actual empty and used only during this process.

Results

 ​

In this example, before the run 3 destination tables had no records in them and after the run all records transferred to respective destination table without any loss of any data or errors.

 

 ​

This solution successfully transfers hundreds and millions of records with multiple tables without any issues by using a single Data Flow Task (DFT). Hope this helps.