My colleague Larry Clark contacted me today regarding my PDC presentation covering the 2008 Summer Olympics video monitoring project. Basically he wanted more information on some of the lessons I learned to make my SSIS package scale. This package I built for this project ran every five minutes, and at peak loads needed to extract up to a million log rows, transform them, and load them into our data warehouse. I built about four versions of this package before I got it right. Here's some of the things I learned.
- Use snapshots to improve concurrency.
My source table was seeing a ton of insert activity. I needed to get in and read the new rows very quickly with minimal impact on concurrency. I did not want to block new inserts to the source table while my ETL process was running. Dirty reads were not an option because I relied on a sequence number in the source table as my high watermark.
I found that the fastest way to do this is to create a snapshot of the new rows by running select subquery in an insert statement. The subquery pulls rows from the source which are then inserted into a scratch table on the same instance. This approach moves the data in-process, minimizing the amount of time that locks need to be held on the source table. The tradeoff is the additional IO required to write out the copy. The scratch table is a simple heap with no indexes.
In my package, I implemented this using an Execute SQL Task that runs the following stored procedure:
- Reduce or eliminate dependencies on MS DTC.
My initial attempts at building this package resulted in some very large, very long-running distributed transactions. For smaller batches of new rows, the package seemed to work fine, but as the row volumes increased, the package started to fail with errors like this:
Error: 0xC0202009 at Update Client Addresses, ClientAddresses Destination : SSIS Error Code DTS_E_OLEDBERROR. An OLE DB error has occurred. Error code: 0x80004005. An OLE DB record is available. Source: "Microsoft SQL Server Native Client 10.0" Hresult: 0x80004005 Description: "The Microsoft Distributed Transaction Coordinator (MS DTC) has cancelled the distributed transaction.".
Unfortunately I didn't have time to properly investigate the problem I was having with MS DTC, so I changed the architecture of the package to minimize the scope and duration of the distributed transaction. This turned out to be a winning strategy. I was able to limit the scope of the distributed transaction to a single data flow task that copied data from the scratch table on my source to a scratch table on my destination. This was a raw insert operation, I avoided performing transformations during this move to make sure I wasn't slowing things down and unintentionally extending the duration of the distributed transaction.
Again the tradeoff is a bit of additional IO, I used scratch tables both on my source and destination, but this greatly improved the reliability and performance of the package as a whole.
- Design for parallel execution.
The SSIS team has done a great job of improving thread scheduling and management in the SQL Server 2008 release. This eliminates the need to manually configure the MaxConcurrentExecutables package level property to avoid starving the system. Many package designers make the mistake of executing tasks serially when the don't need to be. If you are executing tasks serially, make sure that you are doing so because you have a logical requirement related to data integrity. If such a requirement does not exist, run them in parallel.
In my case I was able to run all of the data flow tasks associated with dimension table loading in parallel except for one which I saved till the end because it required more CPU horsepower (more on that later). Many of these parallel data flows rarely updated tables because the dimensions they loaded were relatively static, so by running them in parallel I was making good use of my hardware and not holding up other data flow tasks that were actually doing real work.
Here's a picture of the control flow in the package, you can see the parallel dimension loading routines clustered in the middle:
- Don't extract more data than you need.
This one probably goes without saying, but I'm including it for newbies. The only data I could guarantee was new during a load was the fact data (log rows). I could not guarantee that all of the dimension data represented in those facts was new. Some of these dimensions could get quite large, so my package needed to be very selective about pulling incremental dimension data from the source system. Here's how I implemented it for my video URL dimension:
This data flow task first uses a SELECT DISTINCT to get a unique set of video URL keys from the destination scratch table containing my new fact data. Next I do a lookup against my destination dimension table. A match indicates that the dimension row already exists and no action is required. Note that all the processing up to this point happens at my destination, imposing no load whatsoever on my source. A non-match indicates that the dimension row doesn't exist, so I do a second lookup against the source system to retrieve the new dimension row. Note that I'm only doing this lookup for rows I absolutely know that I do not have. Since the video URL key already exists, I know it has to exist on my source, so the operation can be done with a dirty read which I implemented by setting the IsolationLevel on the data flow task to ReadUncommitted. The combination of being very selective on which dimension rows I need to retrieve, combined with the less restrictive isolation level ensures that I am not consuming unnecessary resources at the source.
This is of course a very simplistic dimension. If it were a slowly-changing dimension I would have used a more sophisticated approach such as the new SQL Server 2008 MERGE command.
- Consider using a script transformation instead of an Integration Services expression for complex calculations.
SSIS includes a built-in C++ like expression syntax for calculating data values. The syntax is fast and easy to use for simple calculations, but in my case I had a relatively complex calculation that would have been very painful to implement using an Integration Services expression, not to mention hard to maintain. After a failed attempt to write the expression, I turned to a custom script transformation to solve the problem.
As rows flow through a custom script transformation, it has the power to modify them in memory and pass them along to the next element in the data flow. My transform needed to calculate an IP number from an IP address in order to geo-tag a log event. I was able to locate a C# routine on Justin Cook's blog post here that performs the IP Number calculation (thx Jason!). Using this code inside my data flow proved surprisingly easy. I used a derived column transform to add a blank column to the result set of the right size and type, then the pasted Jason's code into my custom script transform. The resulting script looks like this:
- Consider using a conditional split transform to apply more CPU power to an expensive lookup operation.
I mentioned earlier that one of my transforms required more CPU power, specifically the transform that geo-tags client IP addresses. We licensed a data set of all registered IP address ranges and their associated geographies from Ip2Location. Associating an IP address to its correct range involved converting the IP address to an IP number. I found a fast way of doing this described in section #5 above. Once I had the IP number, I had to write a query in Transact-SQL to associate it with the correct IP address range in the Ip2Location data set.
This turned out not to be a trivial task. The Ip2Location data set had over 6 million rows in it, and the search arguments were not a direct match, but needed to use aggregation and range searches to determine the correct range that an IP number belonged to. A range was defined as a starting IP Number (FROM address) and an ending IP Number (TO address), and I used an IDENTITY to uniquely identify each range when I initially loaded the data set. I could have created a unique index on the combination of the FROM and TO address, but I thought I would be able to make the query perform better if there was a single column primary key. This also simplified my dimensional data model by allowing foreign key references to be a single column, which had major performance advantages later when I built my cubes.
First I needed to get the IP range lookup query to work, then I had to figure out the right indexing strategy on the data set to make it perform well. I decided to implement the query as a scalar User Defined Function (UDF) so I could re-use it in a variety of operations:
CREATE PROCEDURE [VideoMon].[ExtractLogRows]
TRUNCATE TABLE [VideoMon].[RawLogExport];
INSERT INTO [VideoMon].[RawLogExport]
WHERE [VideoMon].[RawLog].[LogId] > @MaxLogId;
Note the search argument which allows me to extract only the new log rows since the last load. The @MaxLogId is the high water mark from the previous load.
Remember SSIS imposes some default locking behavior to handle rolling back changes if a package fails. You may need to override that behavior to avoid unintentionally holding locks on your source while the package is running. I did this by changing the default IsolationLevel at the package level from Serializable to Unspecified, then explicitly setting the IsolationLevel on my control flow containers as required.
This approach is markedly different from those advocated by people who think scratch table IO is evil, and that all transformation operations should occur in-flight. I was one of those people before I built this package. SSIS has an impressive array of capabilities that make it possible to transform data without having to write it back to disc multiple times. The volume of data I was dealing with made this approach impossible. In one early version of the package, I used multiple sort and merge transforms to manipulate all of the data in memory. This worked great with smaller batches of rows but once volume picked up it put the whole system under memory pressure and my package started failing. The resulting package uses both approaches judiciously (scratch tables and in-flight transformation) for a winning approach.
One final point. I was asked by my colleague Tom Pizzato why I didn't use the new Change Data Capture (CDC) support in SQL Server 2008 to implement this part. The answer is that since I had complete control over the schema of the source system, I could build my own light-weight change data capture mechanism and reduce the complexity of the system a bit. If I didn't have that kind of control I would have definitely used CDC.
public override void MyAddressInput_ProcessInputRow(MyAddressInputBuffer Row)
Row.IpNumber = IPAddressToNumber(Row.ClientAddress);
public static Int64 IPAddressToNumber(String IPaddress)
Int64 num = 0;
arrDec = IPaddress.Split('.');
for (i = arrDec.Length - 1; i >= 0; i--)
num += ((Int64.Parse(arrDec[i]) % 256) * (Int64)Math.Pow(256, (3 - i)));
The first method runs once per-row, and calls Jason's routine to calculate the IP Number and store it in my blank derived column. Note that the script gets compiled at run-time, so this is not executed as an interpreted operation as was the case in the DTS days. I didn't have the opportunity to compare the performance of the script transformation approach vs. the Integration Services expression approach as I gave up trying to write the expression, it was way too complex and would likely have been very fragile from a maintenance perspective. I found this to be a much better approach and the performance was great.
CREATE FUNCTION [VideoMonDW].[GetIpId]
-- Declare the return variable here
DECLARE @IpId bigint
-- Add the T-SQL statements to compute the return value here
@IpId = IP_ID
(IP.IP_FROM = (SELECT MAX(IP_FROM) FROM VideoMonDW.Ip2Location IP2 WHERE IP2.IP_FROM <= @IpNumber)) AND
(IP.IP_TO = (SELECT MIN(IP_TO) FROM VideoMonDW.Ip2Location IP3 WHERE IP3.IP_TO >= @IpNumber));
-- Return the result of the function
The "IP ID" is the unique identifier I generated using an IDENTITY that uniquely identifies an IP address range. Note the search arguments in red. These are very expensive aggregation and range search operations. Running this query with no indexes took over 30 seconds per lookup on my dev box. That obviously was not going to scale. Next I created indexes on every combination of the primary key (IP ID), FROM and TO columns I could think of, including compound indexes. Finally, I re-ran the query using SHOWPLAN and figured out which indexes were actually getting used. The results showed me that the following indexes were getting used:
IP_ID (Unique clustered index)
FROM, TO (Non-clustered index)
TO (Non-clustered index)
These indexes got the query execution time down to 4 seconds on my dev box. That was still too expensive when you consider the volumes I was dealing with. I had tuned the lookup query as much as I could at this point, I needed to find another way to make them scale. What I decided to do was split my data set into two streams using the Conditional Split transform like this:
The data set comes into the conditional split which has a default output for odd-numbered rows, and a second output for even-numbered rows using a simple condition:
ClientAddressId % 2 == 0
SSIS will automatically spin up multiple threads to process each side of the stream, giving me double the CPU horsepower than I would have had if I had stuck with a single stream. Note that my seleciton of two streams was completely arbitrary, it could have been four, or six. I just found that a simple even/odd calculation was easy to understand and most likely easier to maintain.
Note also that I used an OLE DB Command as the destination for each stream instead of a standard OLE DB destination adaptor. OLE DB Command transformations execute a SQL command once for each row in the data set. I used this approach for two reasons. First, since I had implemented my IP address range lookup as a TSQL function (see GetIpId function above), I wanted to call that function from the values clause of an insert statement rather than writing another lookup transformation in order to save time and reduce complexity. Secondly, OLE DB Destination adapters default to an AccessMode of "OpenRowset Using FastLoad". This is the fastest way to bulk-insert rows into a table, but after some testing I found my two threads were blocking each other with table locks.
Here's what the syntax of the insert statement looks like in my OLE DB command objects, note use of the GetIpId function in the values clause:
OLE DB Command Objects do not use bulk-loading API's, so I no longer had a problem with my two insert threads blocking each other. This is a great example of how you can use simple design approaches in SSIS to help make your package scale better.