API Sample – Lookup Transform


This sample creates a data flow package with an OLEDB Source component feeding into a Lookup Transform. The Lookup transform is set to Full Cache mode, and uses [DimCustomer] as its reference table.

Items of interest:

  • CustomerKey and GeographyKey are used as the index (join) columns. This is configured by using the JoinToReferenceColumn property
  • The FirstName column is being overwritten by the value retrieved by the lookup transform
  • The LastName2 column is being added as a new output column
static void Main(string[] args)
{
    Package package = new Package();

    // Add Data Flow Task
    Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");

    // Set the name (otherwise it will be a random GUID value)
    TaskHost taskHost = dataFlowTask as TaskHost;
    taskHost.Name = "Data Flow Task";

    // We need a reference to the InnerObject to add items to the data flow
    MainPipe pipeline = taskHost.InnerObject as MainPipe;

    //
    // Add connection manager
    //

    ConnectionManager connection = package.Connections.Add("OLEDB");
    connection.Name = "localhost";
    connection.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorksDW2008;Provider=SQLNCLI10.1;Integrated Security=SSPI;Auto Translate=False;";

    //
    // Add OLEDB Source
    //

    IDTSComponentMetaData100 srcComponent = pipeline.ComponentMetaDataCollection.New();
    srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
    srcComponent.ValidateExternalMetadata = true;
    IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
    srcDesignTimeComponent.ProvideComponentProperties();
    srcComponent.Name = "OleDb Source";

    // Configure it to read from the given table
    srcDesignTimeComponent.SetComponentProperty("AccessMode", 0);
    srcDesignTimeComponent.SetComponentProperty("OpenRowset", "[DimCustomer]");

    // Set the connection manager
    srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
    srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

    // Retrieve the column metadata
    srcDesignTimeComponent.AcquireConnections(null);
    srcDesignTimeComponent.ReinitializeMetaData();
    srcDesignTimeComponent.ReleaseConnections();

    // Add transform
    IDTSComponentMetaData100 lookupComponent = pipeline.ComponentMetaDataCollection.New();
    lookupComponent.ComponentClassID = "DTSTransform.Lookup";
    lookupComponent.Name = "Lookup";

    CManagedComponentWrapper lookupWrapper = lookupComponent.Instantiate();
    lookupWrapper.ProvideComponentProperties();

    // Connect the source and the transform
    IDTSPath100 path = pipeline.PathCollection.New();
    path.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], lookupComponent.InputCollection[0]);

    //
    // Configure the transform
    //

    // Set the connection manager
    lookupComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
    lookupComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

    // Cache Type - Full = 0, Partial = 1, None = 2
    lookupWrapper.SetComponentProperty("CacheType", 0);
    lookupWrapper.SetComponentProperty("SqlCommand", "select * from [DimCustomer]");

    // initialize metadata
    lookupWrapper.AcquireConnections(null);
    lookupWrapper.ReinitializeMetaData();
    lookupWrapper.ReleaseConnections();

    // Mark the columns we are joining on
    IDTSInput100 lookupInput = lookupComponent.InputCollection[0];
    IDTSInputColumnCollection100 lookupInputColumns = lookupInput.InputColumnCollection;
    IDTSVirtualInput100 lookupVirtualInput = lookupInput.GetVirtualInput();
    IDTSVirtualInputColumnCollection100 lookupVirtualInputColumns = lookupVirtualInput.VirtualInputColumnCollection;

    // We are joining on CustomerKey and GeographyKey
    // Note: join columns should be marked as READONLY
    var joinColumns = new string[] { "CustomerKey", "GeographyKey" };
    foreach (string columnName in joinColumns)
    {
        IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
        IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);
        lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);
    }
    
    // Overwrite the existing FirstName column value with the one returned by the Lookup.
    // To do this, we need to flag the column as READWRITE, and set the CopyFromReferenceColumn property on the input
    var overwriteColumns = new string[] { "FirstName" };
    foreach (string columnName in overwriteColumns)
    {
        IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
        IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READWRITE);

        lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "CopyFromReferenceColumn", columnName);
    }

    // First output is the Match output
    IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];

    // Add a new LastName2 column from the "LastName" column returned by the lookup
    var newColumns = new Dictionary<string, string>();
    newColumns.Add("LastName", "LastName2");

    foreach (string sourceColumn in newColumns.Keys)
    {
        string newColumnName = newColumns[sourceColumn];
        string description = string.Format("Copy of {0}", sourceColumn);

        // insert the new column
        IDTSOutputColumn100 outputColumn = lookupWrapper.InsertOutputColumnAt(lookupMatchOutput.ID, 0, newColumnName, description);
        lookupWrapper.SetOutputColumnProperty(lookupMatchOutput.ID, outputColumn.ID, "CopyFromReferenceColumn", sourceColumn);
    }
}

Comments (6)

  1. This is an index post for the series of posts with examples on how to create packages programmatically.

  2. The SQL Server Integration Services team added valuable new caching options (and scalability) to the

  3. 这篇文章中我们将继续和大家分享一些使用SSIS API构建SSIS包的示例 1. 构建一个Row Count转换 下面的代码可以构建一个包含OLEDB数据源和Row Count转换(包含在一个数据流任务重)的SSIS包。其中Row

  4. Dave says:

    This looks to me like you're joining the column to itself.

    foreach (string columnName in joinColumns)

       {

           IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];

           IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);

           lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);

       }

    I'm getting it to work by simply looking up the column that's coming from the source and then linking to the column name in my lookup component.

    IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns["col1"];

               IDTSInputColumn100 inputColumn = patlookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);

               patlookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", "col2");

  5. Binod Mahto says:

    Hi Matt,

    First, I must say this is a very nice post but unfortunately my code is not working (even though I followed the steps mentioned here).

    My Problem Statement: I want to move data from one table to other table and at the same also avoid insertion of duplicate records is destination table. To avoid duplicate records to insert I am using LookUp component but it's not working.

    Line 'IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];' doesn't return anything.

    Below is the code written for LookUP transform which basically I am using to ignore duplicate rows to insert in DB again.

               IDTSComponentMetaData100 lookupComponent = pipeline.ComponentMetaDataCollection.New();

               …

               lookUpPath.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], lookupComponent.InputCollection[0]);

               …

               // Cache Type – Full = 0, Partial = 1, None = 2

               lookupWrapper.SetComponentProperty("CacheType", 0);

               lookupWrapper.SetComponentProperty("NoMatchBehavior", 1);// 1= Redirect rows to No Match output

               lookupWrapper.SetComponentProperty("SqlCommand", "select Name, ProductNumber from [dbo].[TestTableDest]");

               // initialize metadata

               …

               // Mark the columns we are joining on

              …

               // Note: join columns should be marked as READONLY

               var joinColumns = new string[] { "Name", "ProductNumber" };

               foreach (string columnName in joinColumns)

               {

                   IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];

                   IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);

                   lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);

               }

               // Second output is the Un-Match output

               IDTSOutput100 lookupNoMatchOutput = lookupComponent.OutputCollection[1];

    But here lookupNoMatchOutput  doesn't return any column. Any help on this would be appreciated.

    Note: '…' means same code as explained in your post only. i can not post the entire code becoz of content length restriction.

  6. Binod Mahto says:

    Found the problem:

    " IDTSOutput100 lookupNoMatchOutput = lookupComponent.OutputCollection[1];" will not list columns to map with Oledb/ado.net Destination. To do this mapping we have to use source output collection only.

    Below is the final Oledb destination column mapping code I wrote:

    IDTSPath100 path = pipeline.PathCollection.New();

               path.AttachPathAndPropagateNotifications(lookupNoMatchOutput, destComponent.InputCollection[0]);

               // Configure the destination

               IDTSInput100 destInput = destComponent.InputCollection[0];

               IDTSVirtualInput100 destVirInput = destInput.GetVirtualInput();

               IDTSInputColumnCollection100 destInputCols = destInput.InputColumnCollection;

               IDTSExternalMetadataColumnCollection100 destExtCols = destInput.ExternalMetadataColumnCollection;

               IDTSOutputColumnCollection100 sourceColumns = srcComponent.OutputCollection[0].OutputColumnCollection;

               // The OLEDB destination requires you to hook up the external columns

               foreach (IDTSOutputColumn100 outputCol in sourceColumns)

               {

                   // Get the external column id

                   IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destExtCols[outputCol.Name];

                   if (extCol != null)

                   {

                       // Create an input column from an output col of previous component.

                       destVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);

                       IDTSInputColumn100 inputCol = destInputCols.GetInputColumnByLineageID(outputCol.ID);

                       if (inputCol != null)

                       {

                           // map the input column with an external metadata column

                           destDesignTimeComponent.MapInputColumn(destInput.ID, inputCol.ID, extCol.ID);

                       }

                   }

               }

    Note: Excuse me for typo mistakes in my earlier reply post or if is there any in this post.