Data Loading performance considerations with Clustered Columnstore indexes

This article describes data loading strategies specific to tables with a Clustered Columnstore index on SQL Server 2014.  For fundamental data  loading strategies, an excellent read is the whitepaper Data Loading Performance Guide and it is greatly recommended. Though that whitepaper doesn’t include Columnstore indexes, many other concepts presented there still hold true for any data loading into SQL server.

 

Overview of Loading Data into Columnstore Indexes

When you insert or load data into a table that has a Clustered Columnstore index, the insert lands in one of 2 places:

  • The delta store
  • Compressed Columnstore

If you insert a small number of rows (< 102,400 rows) in a table with a Clustered Columnstore index ends up in the “delta store”. The “delta-store” is a row store. When this Delta store has 1,048,576 rows, it is marked as CLOSED and is compressed into columnar format by a background process called the Tuple Mover. The Tuple mover encodes and compresses the data and stores it in a columnar format.

If you are bulk loading data, whether this data lands in the delta store (row store) or directly in a compressed Columnstore depends on the batch size specified during the bulk insert process. If the Batch size for the insert is > 102,400 rows, the data no longer ends up in the delta store, rather is inserted directly into a compressed rowgroup in columnar format. This is true whether it is Bulk insert or BCP or anything using the Bulk API, even true for insert/select operations inserting a large amount of data as long as a single batch size inserts more than 102,400 rows.The size of a rowgroup is capped at 1,048,576 rows. Ideally we want the compressed rowgroup to be as close to that upper limit as the potential for larger compression exists. The added benefit of having a higher density rowgroup is scanning less row groups when querying the data.

The rowgroup size is not just a factor of the max limit but is affected by the following factors.

  • The dictionary size limit which is 16MB
  • Insert batch size specified
  • The Partition scheme of the table since a rowgroup doesn’t span partitions
  • Memory Grants or memory pressure causing row groups to be trimmed
  • Index REORG being forced or Index Rebuild being run

 

Transaction Logging implications

In a Data-warehouse scenario, assuming that our recovery model is SIMPLE or BULK-Logged, the trace flag 610 can have implications in certain cases even in the Columnstore scenario. There is a detailed explanation of trace flag 610 in the Data Loading Guide under the section I/O Impact of Minimal Logging under Trace Flag 610

The delta store is a page compressed Btree and so the minimal logging considerations still apply. Below is an example of a bulk load of 50,000 rows into a table. Given the number of rows is < 102,400, it lands in the delta store. Logging of any inserts into the delta store are affected by the trace flag 610.

Looking at the Log Records:

select count(*) as CountLogRecords from fn_dblog(NULL,NULL)

image

 

Ordering Data at Initial Load Time

When loading data into a clustered Columnstore index, consideration should be given not only to data load performance but also to query performance. Unlike a btree clustered index, where rows are ordered, loading into a clustered Columnstore index doesn’t order the data. Row groups are compressed in the order that the data is loaded and no specific ordering is done post rowgroup compression.

Say for example, we have a table with one of the columns being MarketID and we have a daily load that happens for all markets. If the input data that is loaded is not ordered by MarketID, then then queries that encompass the Market ID as a group by Column or predicate may have to scan all the segments. Due to how the data was inserted, rowgroups may not be able to be eliminated

In a case such as this, for the initial data load into a clustered column store table, it may be beneficial to sacrifice some load speed in favor of query speed by implementing the approach below

  • Load Data into a Heap ( Can be loaded concurrently with TABLOCK)
  • Create a Clustered index on the required column (MarketID in this example)
  • Create a Clustered Columnstore index with DROP_EXISTING = ON clause.

This will increase your load time but the trade-off is better rowgroup elimination and query execution time. This strategy can be used only on the initial load. Incremental data loads would benefit from the loaded data being ordered or partitioned by MarketID if the application or ETL process has the ability to sort the data prior to inserting or produce sorted data when reading from the data source.

 

Data Loading Scenarios: Non-Partitioned table

In this scenario we will look at loading data into a non-partitioned table and specifically look at the impact of parallel loading of data streams and the effect batch size in loading data.

 

Concurrent Loading

Loading in concurrent Streams can increase the Load throughput into a single table. In this case when we load concurrently, multiple row groups are created whether delta rowgroups or compressed rowgroups. Concurrent loading of the data does enable us to utilize multiple cores on the system and in an ideal scenario with no other bottleneck you can spawn one bulk load per core. How many parallel streams to spawn in reality though depends on both the size of data and resources on the server as at some point you will hit a physical resource bottleneck whether it is CPU or Memory or Disk IO or a logical resource bottleneck such as contention on a ROWGROUP_FLUSH lock which can be induced by slower disk IO on the Log drive. You can use the Waits and Queues Methodology alongside performance monitor to find out your primary bottleneck if at some point you do not get closer to linear scale as you increase the number of streams. A good tool to analyze the waits is SQL Nexus

The data below is from some tests run on a particular table on a particular server (see appendix). The objective of this is to demonstrate that parallel streams do improve throughput.

Data Input: 32 files for a total of 340 GB, 3 billion rows inserted into a single non-partitioned table. All the 32 files are processed for each run, the number of concurrent streams indicate how many files are processed in parallel.

 

Table Type

Concurrent
streams

Duration

hh:mm:ss

DBSize_GB

Throughput
GB /Hour

AvgCPU

MaxCPU

CCI

4

2:05:01

164

135

6%

9%

CCI

8

1:05:39

164

341

14%

15%

CCI

16

0:32:41

164

637

27%

28.5%

CCI

32

0:24:00

164

816

50%

56%

Heap with TABLOCK

16

0:15:29

360

1360

18%

22%

Heap with TABLOCK

32

0:09:15

360

2267

36%

38%

 

Note: In the case of loading into a heap, there isn’t any encoding or compression that needs to be done on the data which does affect the overall load speed but in the case of loading data into a heap you do significantly more IO. However with a Columnstore index given the data is both encoded and compressed the final size of the data is significantly smaller and some of the time is taken to compress and encode this data.

If we consider just the Clustered Columnstore tables, we see that concurrent streams scale in a rather linear fashion until you start hitting resource bottlenecks. This test had multiple streams being inserted into the same table.

 

image

 

Size or space considerations and Segment Density

The space or compression that you get on a table using CCI depends on several factors including the schema of the table and types and number of columns, the redundancy of data within a row group, the average size of a row group among others. When you look at the size for a table with a cluster Columnstore index, the size includes the size for the rowgroup itself and the size for the dictionaries.

Loading with different batch sizes can result in different sizes of the table potentially. Note that batch size here isn’t the only factor that controls the size of the rowgroup as we have mentioned earlier in the article, but is one that is easily controlled during load. One of the other implications of having a rowgroup filled as close to the limit as possible is that during Query time, the unit of IO is a segment which means that the fuller the segments, the less segments have to be read.

In the table below, we not only consider the batch size, but also the average rowgroup size in question since there are multiple factors as described earlier that contribute to a rowgroup being trimmed which in turn can result in lower compression. In the case of bulk loading in batches less than 102,400 rows, we ought to consider using trace flag 610 as performance is benefited if the delta store operations are minimally logged which either way is a best practices in a Data Warehouse. As you can see from the table below, as the batch size reduces, the Database size potentially increases. One more observation here is that though we specify a batch size of 1048576 rows, the rowgroup is trimmed due to the dictionary being full. A good practice is to start out with a batch size of 1048576 and then adjust from there based on the average rows/rowgroup.

Data Input: 16 files totaling 170 GB concurrently loaded by 16 processes. Total of 1.5 Billion rows inserted into a single non-partitioned table.

 

Batch Size

DBSize

Duration

Avg Rows/ rowgroup

Trim Reason

50,000

138.5 (1) 82.5 ( 2)

0:24:22

1048576 (1) 349813 (2)

Inserts into Row-store until it is filled, compressed later by tuple Mover. Data not immediately available in columnar format. Rowgroup trimmed due to dictionary full.

102,400

87.4

0:20:01

102,400

Batch Size

1048576

82.5

0:17:05

349813

Dictionary Full

 

Note: The batch size parameter depends on the bulk load method. BCP uses BATCHSIZE, BULK Insert can use BATCHSIZE, SSIS uses Maximum Insert Commit Size. Further details on the reasons for rowgroups being trimmed are here

(1) When you insert into a row store (batch size < 102,400) data is initially inserted into the delta store which is a page compressed btree. At a later point compression into columnar format is done by the tuple mover background process. When you look at the size initially the size can be larger as the background process hasn’t moved all the rows into compressed columnar storage.

(2) This is the point that the Tuple Mover has moved all the rows into compressed row groups and this can take a while as the tuple mover is single threaded.

 

Data Loading Scenarios: Partitioned table

Most often, very large data warehouse tables are partitioned due to some of the benefits of partitioning that include

  • Enabling Data archival with a sliding window
  • Efficient loading of data
  • Manageability improvements including index rebuilds at the partition level
  • Partition elimination during queries

Loading data into a partitioned table with a Columnstore index depends largely on a several factors

  • The data source from which Data is being Loaded
  • Whether the data being loaded is sorted or not.
  • Data being loaded into an empty partition or rather the granularity of data load matching the partition granularity

 

Data Loads aligned to Partition Boundary:

This is a case that we have a partitioned table and data is loaded into an empty partition. In this case we follow the methodology below just like we do for a single partition table and then swap that table into the empty partition. This is covered in detail in the Data Loading Guide under then Bulk Loading into a partitioned table Section. Here are the brief steps

  • Create a Staging table with same definition as the destination table including the Clustered Columnstore index.
  • Load the data into the staging table (concurrently if possible).
  • Create the constraints that match the partition boundaries
  • Swap that staging table with the corresponding empty partition

Optional: If you require the data ordered for better rowgroup elimination based on query patterns, see the steps detailed out in “Ordering Data at Initial Load Time” section.

Data Loads that span multiple partitions

In this scenario, we have a partition table where the data being loaded from a file or single stream may span multiple partitions. In this case, SQL Server has to sort the data is sorted prior to insert into the CCI table. Depending on the batch size used in this scenario and the resources such as memory required, the Sort will reside in memory or will spill to disk as seen in the plan below. The sort is more expensive if the input data is not sorted on the partition key. If the data load is coming from an existing relational table, it would be beneficial to sort it with the ORDER BY clause on the partition key against the source where the data resides. In the case of flat files as your input, we would recommend if possible for the application or ETL process to produce files that are sorted on the partition key and aligned to a partition boundary.

 

image

 

Choosing a batch size here has a several ramifications:

  • The sorting is done for a particular batch, so it affects the memory grant size and resources to sort the number of rows that correspond to the batch size
  • The batch size is applicable to the Bulk insert statement as a whole and not limited to the number of rows inserted within a specific partition. Even though you specify a batch size of ~ 1 Million, you could end up with several smaller compressed row groups than you are used to seeing for a non-partitioned table.
  • If more than 102,400 rows are inserted in a single partition within a single batch, then they are instantly compressed, if not they are inserted into a delta store (row store).
  • If the bulk load distributes rows across all partitions in small batches, you may end up with open delta rowgroups in all partitions and may have to force compression using the index REORG
  • Using a higher batch size than ~1 Million rows may be beneficial if you know that the incoming data being inserted spans multiple partitions.

If we look at the sys.column_store_row_groups, the general expectation is that if we use a batch size > 102,400 you will see data being inserted into compressed row groups directly. However row groups exist within the partition boundary and so we may see inserts into a delta store or row store within a partition though our batch size is greater than 102,400. At the end of your data load, you could potentially see a cross section of row groups in the OPEN state, some CLOSED and some COMPRESSED.

Example below shows row groups in partition 2 that has both a delta store rowgroup that is OPEN and currently accepting rows, and one that is CLOSED waiting for TUPLE mover to move it to compressed columnar format and one that is already in a compressed format.

image

 

Looking at the table below we observe that

  • When the Input Data is sorted on the Partition key, the throughput is higher than when it is not
  • Import into a Partitioned table can insert into delta store as shown above if the rows that land in a specific partition are < 102,400 per batch
  • The DB Size specified below is the final DB size once the Tuple mover is run. Depending on how spread out the data is across partitions, initial size can be much larger before Tuple move compresses the row groups
  • If the data is not sorted on the partition key, knowing a good batch size to use at load time is trickier and we may end up with smaller row groups which means the compression will be less than ideal as seen in the last 2 rows of the table. Increasing batch size will reduce table size at the expense of resources such as memory for the sort.

Data Input: 16 files totaling 170 GB concurrently loaded by 16 processes. Total of 1.5 Billion rows inserted.

 

Table Type

Sorted on Partition Key

Concurrent
streams

 

 

Batch Size

Duration

  

DB Size

GB

Throughput
GB /Hour

 

Average

CPU

 

Non-Partitioned

N/A

16

1048576

0:16: 05

82.5

637

27%

Partitioned

 

Y

8

 

1048576

0:34:17

84

300

13%

Partitioned

Y

16

1048576

0:18:53

84

567

26%

Partitioned

N

8

1048576

0:48:50

93

212

13%

Partitioned

N

16

1048576

0:26:21

93

392

26%

Partitioned

N

16

3145728

0:21:53

88

485

26%

 

Note: The database size is the final sizes after TUPLE mover has worked in the case rows that landed in the delta store which may be long after the bulk load has been completed. The initial size after the bulk load is completed was larger in particular for the cases where the data is not sorted

There are a few other aspects such as MERGE/UPSERT and Locking related concurrency which we will cover in a separate post.

 

SSIS Data Flow Task Max Insert Commit Size and implications on Row group sizes

Integration Services has a few parameters that can directly or indirectly affect your batch size in the Data Flow Task

  • Maximum Insert Commit Size: Indicates the batch size that will be committed.
  • DefaultMaxBufferRows: is a configurable setting of the SSIS Data Flow task that is set by default to 10,000 records. 
  • DefaultMaxBufferSize: is another setting of the SSIS Data Flow task which is set to 10 MB by default. It also has a maximum value of 100MB that can be set which is not configurable

If the Maximum Insert Commit Size is set to 0, it is effectively not setting any batch size and considers the insert one single batch. If the Maximum Insert Commit Size is anything but 0, that is when the other two properties come into play in determining what batch size is used. For additional details see the article: SSIS Buffer Sizing

Copying the table from the whitepaper: Integration Services Batch Sizes Section @ Data Loading Guide 

Max Insert Commit Size

Effect

MICS > DefaultMaxBufferSize

MICS setting is ignored, one commit for every buffer

MICS = 0

Entire batch is committed in one transaction ( BatchSize = 0 )

MICS < buffer size

Commit is issued every time MICS rows encountered and end of each buffer.

The effective impact of this is we can end up with rowgroups that have less rows than we ideally want. If you take for example a table that has rows that are 300 bytes in size (Sum of the max length of all columns). Given the maximum buffer size of 100MB a maximum of 104857600/300 =  348,000 rows can fit in that 100MB of buffer. In this case even if we specified a Maximum Insert Commit Size (Batch Size) of 1 million rows, effectively due to the BufferSize in SSIS this goes as batches of 348,000 rows.

For fact tables with very large row sizes where a large amount of data is being loaded via a data flow task, multiple parallel Data flow tasks can be considered that load only a subset of the rows setting the Maximum Insert Commit Size to 0.

 

Summary Data Loading best practices

  • Do not use TABLOCK for concurrent loading on a CCI as it will serialize concurrent LOADS unlike on a heap.
  • Enable Trace flag 610 if doing a large amount of trickle inserts as delta store inserts are non-logged.
  • Split input data into multiple files if possible to take advantage of parallel loading.
    • A single bulk load from a single file will be serial
    • A single insert select is serial from the insert side
  • Batch size of 1048576 is generally a good start in case of inserts into a single partition or a non-partitioned table.
  • If there is a known predominant query pattern with a predicate or aggregates predominantly done on a specific column, focus on only Data load performance maybe the wrong approach. Ordering the data during the load can result in significant performance benefits during querying.
    • Load the Data into a Heap
    • Create a Clustered index on the required columns
    • Create a Clustered Columnstore index with DROP_EXISTING.
  • If loading data into a partitioned table where the input data spans multiple partitions
    • If the ETL process or application is able to produce data aligned to the partition boundary and load in concurrent streams, that is the preferred approach.
      • Order the data on the partition column or in case of a known specific column that most aggregates are done on, order on that specific column to enhance chances of rowgroup elimination.
    • If you cannot produce data aligned to the partition boundary, consider sorting on the partition key.
    • If you cannot order the data, consider a larger batch size than 1 million.
  • If using SSIS Data Flow Task for ETL, be aware of the Maximum Insert Commit Size and DefaultMaxBufferSize defaults specified.
  • Normalize Strings out of the Fact tables if possible, string columns can cause a segment to hit the dictionary size limit and trim the rowgroup to a smaller size resulting in less than ideal compression.

 

Tested Hardware and configuration:

HP ProLiant DL580 Gen 8, Intel® Xeon® CPU E7-4870 V2 @ 2.30 Ghz – 4 sockets, 60 Cores, 1TB Ram

Data was loaded into a single filegroup spread across 3 data Drives ,Log on SSD Drive

SQL Server best practices followed per: Recommended updates and configuration options for SQL Server 2012 and SQL Server 2014 . This article has considerations of the circumstances under which each of the trace flags specified below ought to be used.

Trace flags / Startup Options enabled: -T610, -T1117, -T1118, -T8048, -T9024, –E

 

Denzil Ribeiro

Program Manager SQL/Azure CAT