Columnstore Loading on Eight Socket Servers

I had a brief opportunity to do SQL Server workload testing on an eight socket server. It didn’t go well.

Sockets

I’ll give an extremely brief introduction to NUMA and sockets because I have a bit more free time these days. You’re probably reading this blog post on a machine with a single socket. All that means is that there’s a single CPU chip plugged into the motherboard. All of the RAM for the machine is local to that CPU chip. The recent king of single socket performance for SQL Server is the Intel 8280. 28 cores at 2.7 GHz with the latest instruction sets is quite a lot of processing power. There’s a newer chip out there now but it’s not clear to me if anyone can buy it yet.

What can you do if a workload can’t be scaled out and it needs more than the CPU power available with a single socket solution? A two socket solution could be the answer. A two socket server has two CPU chips plugged into different slots on a motherboard and each CPU has a local bank of RAM. Any part of memory can be access by either CPU but there is a performance penalty for foreign memory access, which is just accessing memory from a different CPU. See the simple diagram below:

As mentioned earlier, it is more expensive for CPU0 to access memory local to CPU1 compared to its own local memory. The performance penalty of NUMA will depend on the workload as well as the hardware. It can range from not noticeable at all to a serious issue that needs to be addressed in some way. In some situations this can be as easy as convincing your VM admin to change how the VM is set up. In others, your best bet may be to figure out a way to keep the workload on a single socket. Some folks advocate for single socket solutions compared to two socket solutions for OLTP workloads due to the NUMA penalty.

Perhaps you are wondering what happens if a workload needs more power available from a two socket server? Enter the four socket server. Four socket servers have four slots for CPU chips. An Intel 8280 processor in a four socket configuration gives you a total of 112 physical cores. That’s a lot. Four socket servers are more expensive than two socket servers and are significantly less common to find in data centers. Often, they are a custom order which makes months for the hardware vendor to build and deliver. In addition, the connections between CPUs are more complicated. Below are two common configurations:

Under configuration A, it takes two hops for CPU0 to access memory local to CPU3. That will increase latency more than going to CPU1 or CPU2. However, memory latency between CPUs won’t be the same for all pairs even in configuration B. In addition, it’s more complicated for SQL Server to try to manage memory on a four socket box. Even in the best case scenario for memory management, by design everything stored in SQL Server owned memory isn’t NUMA aware, such as the columnstore object pool. Nonetheless, if you need more CPU than a two socket server or VM can provide then you’re stuck living with the complexities of a four socket solution.

What if you need even more power? Hardware manufacturers once again have you covered with eight socket servers. These are even more expensive and uncommon than four socket solutions. Different vendors handle NUMA differently. There are simply more possible permutations with eight CPUs compared to four. I’m told that the following is an example layout for an eight socket machine:

I’m not a NUMA expert by any means. However, I think it’s fair to say that it looks a lot more complicated. There are many more instances where accessing foreign memory will take two hops instead of one. I also don’t see a way to carve out a four socket VM with all CPUs just one hop away. All in all, that diagram makes me nervous. As a final note, eight socket machines are not the limit of what’s possible, but I’ll stop here because I’ve never run code on anything larger than eight sockets.

Test Setup

I elected to use a high concurrency CCI insert workload to compare performance between a four socket VM and an eight socket VM. Quite conveniently, I already had a test columnstore workload that I knew pushed the SQL Server scalability limits in terms of memory management. To perform the threading I used the SQL Server Multi Thread open source framework. I wanted all sessions to go to their own schedulers. That could have been tough to manage with tests up to 200 threads but the threading framework handles that automatically.

For those following along at home, testing was done with SQL Server 2019 with LPIM and TF 876 enabled. Guest VMs were built with VMware with Windows Server 2019 installed. The four and eight socket VMs were created on the same physical host with about 5.5 TB of RAM available to the guest OS in both configurations.

Test Results

In case you didn’t click the earlier link, each job inserts about 10.4 million rows into a table with a clustered columnstore index. The smallest test ran just a single thread. The largest test on the four socket VM ran 100 concurrent jobs for a total insert volume of a billion rows. The largest test on the eight socket VM ran 200 concurrent jobs for a total insert volume of two billion rows. In all test scenarios, the four socket VM performed more work per second than the eight socket VM:

Quite a disappointing result. Double your SQL Server license fees for a slower overall system! There are the expected memory related wait stats for the 200 thread result:

I grabbed a sample of callstacks with PerfView:

Drilling into the top stack shows that decommitting memory is a scalability issue:

The stacks for the 100 thread test on the four socket VM results are a bit surprising by comparison:

Overall CPU reported by PerfView is the same. Perhaps this is a limitation of the tool. However, it really does seem like all of those extra cores simply aren’t helping under the eight socket configuration. Throughput goes down instead of up. I had very limited time with this machine and don’t want to repeat much of a previous blog post, so I’ll stop the technical analysis here. It’s likely that nothing can really be done to improve the workload except large configuration or hardware changes.

If I had to make this workload perform better, I would directly engage with the vendors. If that didn’t work, I would try a bare metal configuration, SQL Server on Linux, or running on an eight socket machine from a different vendor. I think that it’s fair to say that the test results would be different, but I can’t speculate as to whether they would be better or worse or what the difference would be.

Final Thoughts

We live in a wonderful era of hardware prosperity. With a big enough checkbook, you can run your SQL Server workloads on servers with hundreds of CPU cores. However, proceed with extreme caution if you’re considering moving a SQL Server workload to a server or VM with more than four sockets. It might not scale as well as you’re hoping. Thanks for reading!

The Trillion Row Operator

SQL Server friend and excellent webmaster Mr. Erik C. Darling showed me a query plan gone wrong:

48 billion rows for a single operator is certainly a large number for most workloads. I ended up completely missing the point and started wondering how quickly a query could process a trillion rows through a single operator. Eventually that led to a SQL performance challenge: what is the fastest that you can get an actual plan with at least one operator processing a trillion rows? The following rules are in play:

  1. Start with no user databases
  2. Any query can run up to MAXDOP 8
  3. Temp tables may be created and populated but all such work needs to finish in 30 seconds or less
  4. A RECOMPILE query hint must be present in the final query
  5. Undocumented features and behavior are fair game

Unless stated otherwise here, all testing was done on SQL Server 2017 CU20. The processor of my local machine is an Intel i7-9700K. If you’re following along at home you may see different results depending on your hardware. The queries are extremely CPU intensive. If you’d like to skip ahead to the winning query then scroll down the bottom.

Loop Join

A natural way to generate a lot of rows quickly is to cross join together two equally sized tables. Throwing a million rows into a temporary table takes very little time. Finding the count of the result set should meet the challenge requirements as long as the right optimizer transforms are not available. An initial attempt for data prep:

DROP TABLE IF EXISTS #loop_test;

CREATE TABLE #loop_test (
ID TINYINT NOT NULL
);

INSERT INTO #loop_test WITH (TABLOCK)
SELECT TOP (1000000) CAST(0 AS TINYINT) ID
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #loop_test (ID) WITH FULLSCAN, NORECOMPUTE;

Along with the query to run:

SELECT COUNT_BIG(*)
FROM #loop_test t1 WITH (TABLOCK)
CROSS JOIN #loop_test t2 WITH (TABLOCK)
OPTION (RECOMPILE, MAXDOP 8);

The query returns the expected result of 1 trillion, but there is no operator that processes one trillion rows:

The query optimizer uses the LocalAggBelowJoin rule to perform an aggregate before the join. This brings down the total query estimated cost from 1295100 units to 6.23274 units for quite a savings. Trying again after disabling the transform:

SELECT COUNT_BIG(*)
FROM #loop_test t1 WITH (TABLOCK)
CROSS JOIN #loop_test t2 WITH (TABLOCK)
OPTION (RECOMPILE, MAXDOP 8, QueryRuleOff LocalAggBelowJoin);

Results in a new query plan:

This query meets the requirements of the challenge. The stream aggregate, nested loops, and row count spool operators will process 1 trillion rows each. However, it is unsatisfactory from a performance point of view. The row mode stream aggregate has a significant profiling penalty compared to a batch mode hash aggregate alternative. It is simple enough to create a dummy table to add batch mode eligibility for the query plan:

DROP TABLE IF EXISTS #cci;
CREATE TABLE #cci (ID INT, INDEX CCI CLUSTERED COLUMNSTORE)

SELECT COUNT_BIG(*)
FROM #loop_test t1 WITH (TABLOCK)
CROSS JOIN #loop_test t2 WITH (TABLOCK)
LEFT OUTER JOIN #cci ON 1 = 0
OPTION (RECOMPILE, MAXDOP 8, QueryRuleOff LocalAggBelowJoin);

The new plan has a nice batch mode operator to do the counting:

With an actual plan request, the new query processes rows at roughly twice the rate as the row mode query. Perhaps query profiling does its work once per batch for batch mode operators and once per row for row mode operators. Each batch has around 900 rows so profiling might be much cheaper per row for the hash aggregate compared to the stream aggregate. Strictly speaking, the goal of this challenge is to optimize the amount of time it takes to get an actual plan. This might lead to different tuning results than optimizing for query completion time without an actual plan request.

The final time for the batch mode query is 2 hours and 12 minutes. I uploaded an actual plan to pastetheplan.com. The query has a CPU to elapsed time ratio of only 5.37. That means on that average during query execution, more than 2.5 CPU cores were not doing useful work. Getting all eight CPU cores to perform work as much as possible during query execution should allow the query to finish faster. The CPU imbalance in this case is caused by how rows are distributed to threads on the outer side of the nested loop operator:

The query engine has different algorithm choices to perform a parallel scan of a table. In this case, it doesn’t pick an algorithm that leads to optimal performance. The nested loop operator in total drives 42717 seconds of CPU work. It’s important to have that work as balanced between threads as possible. In this situation it is helpful to reduce the number of rows per page. To understand why I recommend watching this Pass Summit presentation by Adam Machanic. This results in better row distribution between threads and theoretically better performance. The following code only allows for a single row per data page:

DROP TABLE IF EXISTS #wide_1_million_rows;

CREATE TABLE #wide_1_million_rows (
ID TINYINT NOT NULL,
FILLER VARCHAR(4200) NOT NULL
);

INSERT INTO #wide_1_million_rows WITH (TABLOCK)
SELECT TOP (1000000) CAST(0 AS TINYINT) ID, REPLICATE('Z', 4200) FILLER
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #wide_1_million_rows (ID) WITH FULLSCAN, NORECOMPUTE;

The new query plan generates in 1:29:40 with a CPU ratio of the query is 7.977 which is near perfect. Rows are quite balanced between threads:

I don’t know of a method to further optimize the nested loop join query. Most of the query’s execution time is spent on profiling. Sampled stacks from 15 seconds of query execution:

ETW tracing tools such as PerfView along with internals knowledge can be used to more finely break down how CPU is used by this query. Almost 75% of the query’s execution time is spent on sqlmin!CQScanProfileNew:GetRowImp and on methods called by it. The query only takes 23 minutes to complete without an actual plan which is quite close to 25% of the runtime with an actual plan. If you’re interested in more examples of using PerfView to draw conclusiosn about SQL Server consider checking out the list of links in this stack exchange answer.

Merge Join

A cross join can only be implemented as a nested loop join but the same end result can be emulated with a merge join. Let’s start with testing a MAXDOP 1 query. Setup:

DROP TABLE IF EXISTS #cci;
CREATE TABLE #cci (ID INT, INDEX CCI CLUSTERED COLUMNSTORE)

DROP TABLE IF EXISTS #merge_serial;

CREATE TABLE #merge_serial (
ID TINYINT NOT NULL
);

INSERT INTO #merge_serial WITH (TABLOCK)
SELECT TOP (1000000) 0 ID
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE CLUSTERED INDEX CI ON #merge_serial (ID);

The query to run:

SELECT COUNT_BIG(*)
FROM #merge_serial t1 WITH (TABLOCK)
INNER JOIN #merge_serial t2 WITH (TABLOCK) ON t1.ID = t2.ID
LEFT OUTER JOIN #cci ON 1 = 0
OPTION (RECOMPILE, MERGE JOIN, MAXDOP 1, QueryRuleOff LocalAggBelowJoin);

I did not allow the query to run to completion because it would have taken around 25 hours to complete. On an 8 core machine with merge join, there’s no reason to expect that a parallel query could do any better than 25/8 = 3.125 hours. This makes merge join an inferior option compared to loop join so there isn’t a good reason to pursue further approaches that use a merge join.

For completeness, the best merge plan time I was able to get was 5 hours and 20 minutes. The plan avoids the scalability issues found with parallel merge and order preserving repartition streams. The end result is still poor. The merge join simply consumes a lot of CPU to do its work. The additional overhead added by the loop join is not helpful.

Row Mode Hash Join

Perhaps we’ll have more luck with a hash join compared to a merge join. After all, the query optimizer assigns a lower estimated cost for a row mode hash join compared to a merge join for the trillion row scenario. What could go wrong? The first thing that goes wrong is that the addition of the empty CCI will lead to a batch mode hash join for this query. Avoiding that requires changing the join to a type that doesn’t allow for batch mode execution. Among other methods, this can be accomplished by putting the hash join on the inner side of a nested loop or changing the join to also match on NULL. The second technique will be used at first because it leads to significantly less overhead compared to the loop join approach.

Starting again with a MAXDOP 1 test:

DROP TABLE IF EXISTS #cci;
CREATE TABLE #cci (ID INT, INDEX CCI CLUSTERED COLUMNSTORE)

DROP TABLE IF EXISTS #hash_maxdop_1;
CREATE TABLE #hash_maxdop_1 (
	ID TINYINT NULL
);

INSERT INTO #hash_maxdop_1 WITH (TABLOCK)
SELECT TOP (1000000) 0 
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #hash_maxdop_1 (ID) WITH FULLSCAN, NORECOMPUTE;

The test query has a more complex join condition:

SELECT COUNT_BIG(*)
FROM #hash_maxdop_1 t1 WITH (TABLOCK)
INNER HASH JOIN #hash_maxdop_1 t2 WITH (TABLOCK) ON t1.ID = t2.ID OR (t1.ID IS NULL AND t2.ID IS NULL)
LEFT OUTER JOIN #cci ON 1 = 0
OPTION (MAXDOP 1, RECOMPILE)

Note how the join now includes rows where both sides are NULL. There are no such rows in the table but the query optimizer does not know that so a row mode join is used. The query completed in 5.5 hours. This means that a parallel hash join approach could be competitive with the nested loop join. An optimistic time estimate for a parallel query is 41 minutes.

Getting a parallel query isn’t as simple as changing MAXDOP from 1 to 8 will not have the desired effect. Take a look at the following query plan born from such an attempt:

The repartition stream operators have a partitioning type of hash. That means that a hashing algorithm is used to assign rows to different threads. All of the rows from the table have the same column value, so they will all hash to the same value and will end up on the same thread. Seven CPUs will have no work to do even though the query technically executes at DOP 8. This will not lead to performance that’s better than executing at DOP 1.

Through testing I found that the following values for a nullable TINYINT all go to different threads: 0, 1, 4, 5, 7, 9, 14, and 17. Note that I’m relying on undocumented details here that might change between releases. Inserting 250k rows of each value into one table and 500k rows of each value into another table should result in 8 * 250000 * 500000 = 1 trillion rows total. Code to do this is below:

DROP TABLE IF EXISTS #cci;
CREATE TABLE #cci (ID INT, INDEX CCI CLUSTERED COLUMNSTORE)

DROP TABLE IF EXISTS #row_hash_join_repartition_8_build;
CREATE TABLE #row_hash_join_repartition_8_build (
	ID TINYINT NULL
);

INSERT INTO #row_hash_join_repartition_8_build
SELECT TOP (2000000) CASE ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) % 8 
WHEN 0 THEN 0
WHEN 1 THEN 1
WHEN 2 THEN 4
WHEN 3 THEN 5
WHEN 4 THEN 7
WHEN 5 THEN 9
WHEN 6 THEN 14
ELSE 17
END
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #row_hash_join_repartition_8_build (ID) WITH FULLSCAN, NORECOMPUTE;


DROP TABLE IF EXISTS #row_hash_join_repartition_8_probe;
CREATE TABLE #row_hash_join_repartition_8_probe (
	ID TINYINT NULL
);

INSERT INTO #row_hash_join_repartition_8_probe
SELECT TOP (4000000) CASE ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) % 8 
WHEN 0 THEN 0
WHEN 1 THEN 1
WHEN 2 THEN 4
WHEN 3 THEN 5
WHEN 4 THEN 7
WHEN 5 THEN 9
WHEN 6 THEN 14
ELSE 17
END
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
CROSS JOIN master..spt_values t3
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #row_hash_join_repartition_8_probe (ID) WITH FULLSCAN, NORECOMPUTE;

Along with the query to run:

SELECT COUNT_BIG(*)
FROM #row_hash_join_repartition_8_build t1 WITH (TABLOCK)
INNER JOIN #row_hash_join_repartition_8_probe t2 WITH (TABLOCK) ON t1.ID = t2.ID OR (t1.ID IS NULL AND t2.ID IS NULL)
LEFT OUTER JOIN #cci ON 1 = 0
OPTION (RECOMPILE, HASH JOIN, MAXDOP 8, QueryRuleOff LocalAggBelowJoin);

The query plan appeared after 42 minutes with a CPU ratio of 7.7. This was a better result than I was expecting. The repartition stream operators don’t seem to have much of a negative effect on overall runtime.

For completeness, I also tested a parallel hash join plan without the repartition stream operators. All of the hash join work is on the inner side of a nested loop join. The loop join adds significant overhead so this ends up being a losing method. If you’d like to see for yourself, here is one way to set up the needed tables:

DROP TABLE IF EXISTS #cci;
CREATE TABLE #cci (ID INT, INDEX CCI CLUSTERED COLUMNSTORE)

DROP TABLE IF EXISTS #hash_join_demand_62500_rows;
CREATE TABLE #hash_join_demand_62500_rows (
	ID TINYINT NOT NULL
);

INSERT INTO #hash_join_demand_62500_rows WITH (TABLOCK)
SELECT TOP (62500) ISNULL(CAST(0 AS TINYINT), 0) ID
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #hash_join_demand_62500_rows (ID);

As well as one way to generate such a query:

SELECT COUNT_BIG(*)
FROM
(VALUES
	  (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
	, (0),(0),(0),(0),(0),(0),(0),(0)
) v(v)
CROSS JOIN (
	SELECT 1 c
	FROM #hash_join_demand_62500_rows t1 WITH (TABLOCK)
	INNER HASH JOIN #hash_join_demand_62500_rows t2 WITH (TABLOCK) ON t1.ID = t2.ID
) ca
LEFT OUTER JOIN #cci ON 1 = 0
OPTION (RECOMPILE, FORCE ORDER, MAXDOP 8, MIN_GRANT_PERCENT = 99, NO_PERFORMANCE_SPOOL);

It might be difficult to understand what’s going on here, so here’s a picture of the query plan:

Work is split up into 256 buckets. Each row fed into the outer side of the nested loop join results in 62500 rows hash joined with 62500 rows.

The disappointing final result takes about 2 hours of runtime. This query might perform better on a busy system compared to the simple parallel hash join because work is assigned to threads somewhat on demand. In this case, the repartition streams query query requires equal work to be completed on all threads. However, the testing environment only has a single query running at a time.

The query pattern here is similar to a parallel apply. Perhaps you’ve heard good things about that pattern and are wondering why it didn’t work out here. In my experience, that query pattern does best when an order preserving repartition streams operator can be avoided, there’s a supporting index on the inner side, and the row count coming out of the inner side is relatively low. The above query is 0 for 3.

Batch Mode Hash Join

I’ll be honest. I expected batch mode hash join to be the winner. I was wrong. Batch mode parallelism works in a fundamentally different way than row mode parallelism. There is no repartition streams operator which assigns rows to different threads based on a hash or other algorithm. Rather, each operator takes care of parallelism by itself. In my experience this is generally quite advantageous for both performance and scalability, but there can be drawbacks.

We intend to get a batch mode hash join, so it seems logical enough to join together a CCI with one million rows. Sample code to do that:

DROP TABLE IF EXISTS #batch_mode_hash_equal_sized;

CREATE TABLE #batch_mode_hash_equal_sized (
ID TINYINT NOT NULL,
INDEX CCI CLUSTERED COLUMNSTORE
);

INSERT INTO #batch_mode_hash_equal_sized WITH (TABLOCK)
SELECT TOP (1000000) CAST(0 AS TINYINT) ID
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #batch_mode_hash_equal_sized (ID) WITH FULLSCAN, NORECOMPUTE;

The query to be run no longer needs the join to the empty CCI table:

SELECT COUNT_BIG(*) 
FROM #batch_mode_hash_equal_sized c1
INNER JOIN #batch_mode_hash_equal_sized c2 ON c1.ID = c2.ID
OPTION (RECOMPILE, MAXDOP 8, QueryRuleOff LocalAggBelowJoin);

An actual plan is generated on my machine in about 27.5 minutes. While clearly better than all other attempts so far, the CPU to elapsed ratio suggests that significant improvement is possible: it is only 4.6 out of a possible 8.0. Batch mode processing is generous enough to provide a helpful wait stat. There is about 5607 seconds of the HTDELETE wait. Put simply, some of the threads of the hash join ran out of work to do and waited for the other threads to finish their work. As I understand it with batch mode joins, the probe side is what matters in terms of work imbalance. Row count per thread of the probe side:

The million row CCI table is split up into 9 chunks of rows which are divided between 8 threads which leads to many threads having no work to do for a while. Quite unfortunate. Oddly enough, switching to MAXDOP 9 could lead to much better performance for this query. That isn’t an available option on my local machine so I switched to testing on a VM with Intel 8280 sockets. The theory works out: the MAXDOP 8 query takes 50 minutes to complete and the MAXDOP 9 query takes 25 minutes to complete. At MAXDOP 9 the row distribution is quite nice on the probe side:

The rules for the different algorithms available to do parallel scans on CCIs are not documented. I previously investigated it here. Interestingly enough, this is an example where an exchange operator could make a positive difference. However, the batch mode hash join doesn’t have that as an option. While there are various tricks to achieve even row distribution from a CCI, it seems simplest to just replace the tables with row store. Converting 2 million rows from row mode to batch mode won’t add any measurable overhead. Using the same table that was used for the best loop join time:

SELECT COUNT_BIG(*) 
FROM #wide_1_million_rows c1
INNER JOIN #wide_1_million_rows c2 ON c1.ID = c2.ID
LEFT OUTER JOIN #cci ON 1 = 0
OPTION (RECOMPILE, MAXDOP 8, QueryRuleOff LocalAggBelowJoin)

The new query plan finishes in just under 16 minutes. Row distribution per thread on the probe side is much more even:

You might be thinking that 16 minutes is a pretty good result. I too initially thought that.

The Winner

All of the previous approaches spent the vast majority of their time performing a join to generate rows from smaller tables. The scans of the underlying tables themselves were quite fast. Perhaps a better approach would be to avoid the join entirely. For example, consider inserting a billion rows into a table and scanning that table 1000 times with UNION ALL. That would send one trillion rows to a single operator without the expense of a join. Of course, the downside is that SQL Server would need to read one trillion rows total from a table. It would be important to make the reading of rows as efficient as possible. The most efficient method I know is to create a columnstore table with extreme compression.

The title of this section gives it away, but let’s start with a small scale test to see if the approach has merit. Code to load 100 million rows into a CCI:

DROP TABLE IF EXISTS #count_test;

CREATE TABLE #count_test (
ID TINYINT NOT NULL,
INDEX CCI CLUSTERED COLUMNSTORE
);

INSERT INTO #count_test WITH (TABLOCK)
SELECT TOP (100000000) 0 ID
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
CROSS JOIN master..spt_values t3
OPTION (MAXDOP 1);

CREATE STATISTICS S1 ON #count_test (ID);

SQL Server is able to compress this data quite well because there’s just a single unique value. The table is only 1280 KB in size. The data prep code does exceeds the 30 second time limit on my machine due to the statistics creation, but I’ll ignore that for now because it’s just a trial run. I can avoid all types of early aggregation with the following syntax:

SELECT COUNT_BIG(*)
FROM
(
	SELECT ID
	FROM #count_test
	UNION ALL
	SELECT ID
	FROM #count_test
) q
OPTION (RECOMPILE, MAXDOP 8, FORCE ORDER)

Picture of the query plan:

On my machine, that query takes about 41 ms to process 200 million rows. Based on that, it seems reasonable to expect that a query that processes a trillion rows could finish in under four minutes. There is a balancing act to perform here. Putting more rows into the table makes it harder to complete data prep within 30 seconds, but cuts down on the number of UNION ALLs performed which reduces query compilation time.

The approach that I settled on was to load 312.5 million rows into a table. Reading that table 3200 times results in a trillion total rows. 3200 is a convenient number to achieve with nested CTEs and I’m able to load 312.5 million rows quite reliably under 30 seconds even with MAXDOP 4. I can’t say that this is the best possible approach but it seems to work fairly well.

The last detail in terms of data prep is to deal with statistics creation. SQL Server forces a sample rate of 100% for very small tables. Unfortunately, extremely compressed columnstore indexes can fall into the 100% range even if they have a high row count. Creating a statistic object with a sample of 312.5 million rows will take longer than 30 seconds. I don’t need good statistics for query performance so I work around the issue by creating a statistic with NORECOMPUTE after an initial load of 500k rows. Code to create and populate a CCI with 312.5 million rows:

DROP TABLE IF EXISTS #CCI_1M_ONE_COLUMN;

CREATE TABLE #CCI_1M_ONE_COLUMN (ID TINYINT NOT NULL, INDEX I CLUSTERED COLUMNSTORE);

DECLARE @insert_count TINYINT = 0;
WHILE @insert_count < 8
BEGIN
	INSERT INTO #CCI_1M_ONE_COLUMN
	SELECT TOP (125000) 0
	FROM master..spt_values t1
	CROSS JOIN master..spt_values t2
	OPTION (MAXDOP 1);

	SET @insert_count = @insert_count + 1;
END;

DROP TABLE IF EXISTS #C;

CREATE TABLE #C (I TINYINT NOT NULL, INDEX I CLUSTERED COLUMNSTORE);

INSERT INTO #C WITH (TABLOCK)
SELECT TOP (500000) 0
FROM #CCI_1M_ONE_COLUMN
OPTION (MAXDOP 1);

CREATE STATISTICS S ON #C (I) WITH NORECOMPUTE, FULLSCAN;

WITH C13 AS (
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
	UNION ALL
	SELECT ID FROM #CCI_1M_ONE_COLUMN
)
, C312 AS
 (
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
	UNION ALL
	SELECT ID FROM C13
)
INSERT INTO #C WITH (TABLOCK)
SELECT 0
FROM C312
OPTION (MAXDOP 4, FORCE ORDER, QueryRuleOff GenLGAgg, QueryRuleOff EnforceBatch, QueryRuleOff GbAggToStrm);


DBCC TRACEON(10204);

ALTER INDEX I ON #C REORGANIZE WITH (COMPRESS_ALL_ROW_GROUPS = ON);
ALTER INDEX I ON #C REORGANIZE WITH (COMPRESS_ALL_ROW_GROUPS = ON);

DBCC TRACEOFF(10204);

The above code isn’t as efficient as possible but it gets the job done. It also provides a preview of the syntax that is used to query the table 3200 times. A nested CTE approach is used instead of writing out UNION ALL 3199 times. Trace flag 10204 is used because there’s no need to act on rows outside of the delta rowgroups.

Query compile time can be quite significant for this query. On an older PC while running these queries, I could determine when the query compile ended by a change in fan noise. The new additions to the query hints are there to minimize query compile time as much as possible. This is important when a table is referenced 3200 times in a query. I’m told that there are undocumented trace flags which can reduce query compile time even more but I’ll leave that as an exercise to the reader. Here’s the query text of the UNION ALL approach:

WITH C8 AS (
	SELECT I FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I  FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I FROM #C WITH (TABLOCK)
	UNION ALL
	SELECT I FROM #C WITH (TABLOCK)
)
, C160 AS
 (
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
	UNION ALL
	SELECT I FROM C8
)
, C3200 AS
(
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
	UNION ALL
	SELECT I FROM C160
)
SELECT COUNT_BIG(*)
FROM C3200
OPTION (RECOMPILE, MAXDOP 8, FORCE ORDER, QueryRuleOff GenLGAgg, QueryRuleOff EnforceBatch, QueryRuleOff GbAggToStrm);

The good news is that generating an actual plan takes about 3 minutes on my machine. The bad news is that uploading a 27 MB query plan is a lot more difficult than I thought. I ended up loading it to my personal website. You can view all of the XML in its glory here.

For a method more friendly to my personal bandwidth, you can look at the screenshot below:

I don’t know of a more faster way to complete the challenge. Without an actual plan, the query takes about 63 seconds to complete. However, the query takes about 3 minutes to complete on SQL Server 2019, even without requesting an actual plan. The difference is caused by lightweight query profiling in SQL Server 2019, which is turned on by default. I find query profiling to be very useful in general for evaluating performance issues in real time and for estimating how long a query will take to complete. However, it can still add significant overhead for certain types of queries. The difference is quite apparent in the call stacks:

This is why I did all of my testing for this blog post on SQL Server 2017, in case you’ve been wondering that.

Final Thoughts

Summary of all attempts:

The results are quite specific to the test scenario: they should not be used to draw conclusions about general join performance. As mentioned earlier, in a few cases the bottleneck is the query profiling infrastructure instead of typical aspects of query performance. If anyone out there knows of a faster technique than those tested here, I would be delighted to learn about it. Thanks for reading!

Using Exchange Demand Partitioning to Improve Parallel Query Scalability

One of our SQL Server workloads runs many concurrent parallel queries on a possibly already busy server. Other work occurring on the server can have dramatic effects on parallel query runtimes. One way to improve scalability of parallel query runtimes is to achieve query plans with operators that allow a dynamic amount of work to be completed by each parallel worker thread. For example, exchange operators with a partitioning type of hash or round robin force a typically even amount of work to be completed by each worker thread. If a worker thread happens to be on a busy scheduler then query runtime may increase due to the longer runtime of the worker thread that is competing for CPU resources. Serial zones in parallel query plans can of course have the same problem. Batch mode operators, exchange operators with a partitioning type of broadcast or demand, and the parallel page supplier are all examples of operators which can do a dynamic amount of work per thread. Those are the operators that I prefer to see in query plans for this workload.

Very little has been written about exchange operators with a partitioning type of demand, so I forgive you for not hearing of it before today. There is a brief explanation available here, an example of using demand partitioning to improve some query plans involving partitioned tables, and a Stack Exchange answer for someone comparing round robin and demand partitioning. You have the honor of reading perhaps the fourth blog post about the subject.

Start


The demos are somewhat dependent on hardware so you may not see identical results if you are following along. I’m testing on a machine with 8 CPU and with max server memory was set to 6000 MB. Start with a table with a multi-column primary key and insert about 34 million rows into it:

DROP TABLE IF EXISTS #;
CREATE TABLE # (
ID BIGINT NOT NULL,
ID2 BIGINT NOT NULL,
STRING_TO_AGG VARCHAR(MAX),
PRIMARY KEY (ID, ID2)
);

INSERT INTO # WITH (TABLOCK)
SELECT RN, v.v, REPLICATE('REPLICATE', 77)
FROM (
SELECT TOP (4800000) ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) RN
FROM master..spt_values
CROSS JOIN master..spt_values t2
) q
CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7)) v(v);

The query tuning exercise is to insert the ID column and a checksum of the concatenation of all the STRING_TO_AGG values for each ID value ordered by ID2. This may seem like an odd thing to do but it is based upon a production example with an adjustment to not write as much data as the real query. Not all of us have SANs in our basements, or even have a basement. Use the following for the target table:

DROP TABLE IF EXISTS ##;

CREATE TABLE ## (
ID BIGINT NOT NULL,
ALL_STRINGS_CHECKSUM INT
);

Naturally we use SQL Server 2019 CU4 so the STRING_AGG function is available to us. Here is one obvious way to write the query:

INSERT INTO ## WITH (TABLOCK)
SELECT ID, CHECKSUM(STRING_AGG(STRING_TO_AGG , ',') WITHIN GROUP (ORDER BY ID2))
FROM #
GROUP BY ID
OPTION (MAXDOP 8);

The above query takes about 63 seconds on my machine with a cost of 2320.57 optimizer units. The query optimizer decided that a serial plan was the best choice:

This is a rather lame result for a query tuning exercise so I will assume that I know better and force a parallel query plan with undocumented trace flag 8649. SQL Server warns us that the estimated cost is 4816.68 optimizer units but surely doesn’t expect a detail like that to stop me. The adjusted query executes in 75 seconds:

My arrogance is checked. The parallel query is slower than the serial version that the query optimizer wanted us to have. The problem is the hash exchange operator. It is an order preserving exchange with a high MAXDOP and wide rows. This is the worst possible situation for exchange performance. How else can we write the query? Batch mode is not available for STRING_AGG so that’s out. Does anyone remember anything about tuning row mode queries?

The Dark Side


Query hints along with carefully constructed T-SQL are the pathway to many abilities, some considered to be unnatural. We can give the classic Parallel Apply query pattern made famous by Adam Machanic a shot to solve this problem. Perhaps you are thinking that there is no driving table for the outer side of a nested loop join, but we can create one by sampling the clustered index of the base table. I’ll skip that part here and just use what I know about the data to divide it into 96 equal ranges:

DROP TABLE IF EXISTS ###;
CREATE TABLE ### (s BIGINT NOT NULL, e BIGINT NOT NULL);

INSERT INTO ###
VALUES
(1, 50000),
(50001, 100000),
(100001, 150000),
(150001, 200000),
(200001, 250000),
(250001, 300000),
(300001, 350000),
(350001, 400000),
(400001, 450000),
(450001, 500000),
(500001, 550000),
(550001, 600000),
(600001, 650000),
(650001, 700000),
(700001, 750000),
(750001, 800000),
(800001, 850000),
(850001, 900000),
(900001, 950000),
(950001, 1000000),
(1000001, 1050000),
(1050001, 1100000),
(1100001, 1150000),
(1150001, 1200000),
(1200001, 1250000),
(1250001, 1300000),
(1300001, 1350000),
(1350001, 1400000),
(1400001, 1450000),
(1450001, 1500000),
(1500001, 1550000),
(1550001, 1600000),
(1600001, 1650000),
(1650001, 1700000),
(1700001, 1750000),
(1750001, 1800000),
(1800001, 1850000),
(1850001, 1900000),
(1900001, 1950000),
(1950001, 2000000),
(2000001, 2050000),
(2050001, 2100000),
(2100001, 2150000),
(2150001, 2200000),
(2200001, 2250000),
(2250001, 2300000),
(2300001, 2350000),
(2350001, 2400000),
(2400001, 2450000),
(2450001, 2500000),
(2500001, 2550000),
(2550001, 2600000),
(2600001, 2650000),
(2650001, 2700000),
(2700001, 2750000),
(2750001, 2800000),
(2800001, 2850000),
(2850001, 2900000),
(2900001, 2950000),
(2950001, 3000000),
(3000001, 3050000),
(3050001, 3100000),
(3100001, 3150000),
(3150001, 3200000),
(3200001, 3250000),
(3250001, 3300000),
(3300001, 3350000),
(3350001, 3400000),
(3400001, 3450000),
(3450001, 3500000),
(3500001, 3550000),
(3550001, 3600000),
(3600001, 3650000),
(3650001, 3700000),
(3700001, 3750000),
(3750001, 3800000),
(3800001, 3850000),
(3850001, 3900000),
(3900001, 3950000),
(3950001, 4000000),
(4000001, 4050000),
(4050001, 4100000),
(4100001, 4150000),
(4150001, 4200000),
(4200001, 4250000),
(4250001, 4300000),
(4300001, 4350000),
(4350001, 4400000),
(4400001, 4450000),
(4450001, 4500000),
(4500001, 4550000),
(4550001, 4600000),
(4600001, 4650000),
(4650001, 4700000),
(4700001, 4750000),
(4750001, 4800000);

I can now construct a query that gets a parallel apply type of plan:

INSERT INTO ## WITH (TABLOCK)
SELECT ca.*
FROM ### driver
CROSS APPLY (
	SELECT TOP (987654321987654321) ID, CHECKSUM(STRING_AGG(STRING_TO_AGG , ',') WITHIN GROUP (ORDER BY ID2)) ALL_STRINGS_CHECKSUM
	FROM # WITH (FORCESEEK)
	WHERE ID BETWEEN driver.s AND driver.e
	GROUP BY ID
) ca
OPTION (MAXDOP 8, NO_PERFORMANCE_SPOOL, FORCE ORDER);

This is an unnatural query plan. The query optimizer assigned it a cost of 36248.7 units. I had to add the TOP to get a valid query plan with an index seek. Removing the TOP operator results in error 8622. Naturally such things won’t stop us and running the query results in an execution time between 15 – 19 seconds on my machine which is the best result yet.

This query plan has an exchange partitioning type of round robin. Recall such exchange types can lead to trouble if there’s other work executing on one of the schedulers used by a parallel worker thread. So far I’ve been testing these MAXDOP 8 queries with nothing else running on my 8 core machine. I can make a scheduler busy by running a MAXDOP 1 query that has no real reason to yield before exhausting its 4 ms quantum. Here is one way to accomplish that:

SELECT TOP (1) t1.high + t2.high + t3.high + t4.high
FROM master..spt_values t1
CROSS JOIN master..spt_values t2
CROSS JOIN master..spt_values t3
CROSS JOIN master..spt_values t4
ORDER BY t1.high + t2.high + t3.high + t4.high
OPTION (MAXDOP 1, NO_PERFORMANCE_SPOOL);

Wait stats for this query if you don’t believe me:

Running this query at the same time as the parallel query can apply a large performance penalty to the parallel query. The parallel query can take up to 48 seconds to execute if even a single worker thread has to share time on a scheduler with another. That is, the query ran 3 times slower when I added a single MAXDOP 1 query to the workload. Looking at thread details for the parallel query:

As you can see, one of the worker threads took a significantly longer amount of time to complete its work compared to the other threads. There is no logged wait statistic for this kind of performance problem in which the other parallel worker threads complete their work much earlier than when the query finishes. If there’s no worker thread then there is no wait associated with the query. The only way to catch this is to look at actual row distribution or the CPU time to elapsed time ratio.

You may be wondering why the query is worse than twice as slow as before. After all, if all workers do an equal amount of work but one now gets access to half as much CPU as before it seems reasonable to expect the runtime to double instead of triple. The workers of the parallel query have many reasons they might yield before exhausting their full 4 ms quantum – an I/O wait for example. The MAXDOP 1 SELECT query is designed to not yield early. What is very likely happening is that the MAXDOP 1 query gets a larger share of the scheduler’s resources than 50%. SQL Server 2016 made adjustments to try to limit this type of situation but by its very nature I don’t see how it could ever lead to a perfect sharing of a scheduler’s resources.

Demanding Demand Partitioning


We can get an exchange operator with demand based partitioning by replacing the driving temp table with a derived table. Full query text below:

INSERT INTO ## WITH (TABLOCK)
SELECT ca.*
FROM (
VALUES
(1, 50000),
(50001, 100000),
(100001, 150000),
(150001, 200000),
(200001, 250000),
(250001, 300000),
(300001, 350000),
(350001, 400000),
(400001, 450000),
(450001, 500000),
(500001, 550000),
(550001, 600000),
(600001, 650000),
(650001, 700000),
(700001, 750000),
(750001, 800000),
(800001, 850000),
(850001, 900000),
(900001, 950000),
(950001, 1000000),
(1000001, 1050000),
(1050001, 1100000),
(1100001, 1150000),
(1150001, 1200000),
(1200001, 1250000),
(1250001, 1300000),
(1300001, 1350000),
(1350001, 1400000),
(1400001, 1450000),
(1450001, 1500000),
(1500001, 1550000),
(1550001, 1600000),
(1600001, 1650000),
(1650001, 1700000),
(1700001, 1750000),
(1750001, 1800000),
(1800001, 1850000),
(1850001, 1900000),
(1900001, 1950000),
(1950001, 2000000),
(2000001, 2050000),
(2050001, 2100000),
(2100001, 2150000),
(2150001, 2200000),
(2200001, 2250000),
(2250001, 2300000),
(2300001, 2350000),
(2350001, 2400000),
(2400001, 2450000),
(2450001, 2500000),
(2500001, 2550000),
(2550001, 2600000),
(2600001, 2650000),
(2650001, 2700000),
(2700001, 2750000),
(2750001, 2800000),
(2800001, 2850000),
(2850001, 2900000),
(2900001, 2950000),
(2950001, 3000000),
(3000001, 3050000),
(3050001, 3100000),
(3100001, 3150000),
(3150001, 3200000),
(3200001, 3250000),
(3250001, 3300000),
(3300001, 3350000),
(3350001, 3400000),
(3400001, 3450000),
(3450001, 3500000),
(3500001, 3550000),
(3550001, 3600000),
(3600001, 3650000),
(3650001, 3700000),
(3700001, 3750000),
(3750001, 3800000),
(3800001, 3850000),
(3850001, 3900000),
(3900001, 3950000),
(3950001, 4000000),
(4000001, 4050000),
(4050001, 4100000),
(4100001, 4150000),
(4150001, 4200000),
(4200001, 4250000),
(4250001, 4300000),
(4300001, 4350000),
(4350001, 4400000),
(4400001, 4450000),
(4450001, 4500000),
(4500001, 4550000),
(4550001, 4600000),
(4600001, 4650000),
(4650001, 4700000),
(4700001, 4750000),
(4750001, 4800000)
) driver( s, e)
CROSS APPLY (
	SELECT TOP (987654321987654321) ID, CHECKSUM(STRING_AGG(STRING_TO_AGG , ',') WITHIN GROUP (ORDER BY ID2)) ALL_STRINGS_CHECKSUM
	FROM # WITH (FORCESEEK)
	WHERE ID BETWEEN CAST(driver.s AS BIGINT) AND CAST(driver.e AS BIGINT)
	GROUP BY ID
) ca
OPTION (MAXDOP 8, NO_PERFORMANCE_SPOOL, FORCE ORDER);

Query performance is effectively random. The query was observed to execute as quickly as 15 seconds and as slowly as 45 seconds. In some situations there was an incredible amount of skew in row distributions between threads:

This is an unexpected situation if there are no other queries running on the server. Query performance is most disappointing.

Is there a trace flag?


Yes! The problem here is that the nested loop join uses the prefetch optimization. Paul White writes:

One of the available SQL Server optimizations is nested loops prefetching. The general idea is to issue asynchronous I/O for index pages that will be needed by the inner side — and not just for the current correlated join parameter value, but for future values too.

That sounds like it might be wildly incompatible with a demand exchange operator. Querying sys.dm_exec_query_profiles during query execution proves that the demand exchange isn’t working as expected: worker threads no longer fully process the results associated with their current row before requesting the next one. That is what can lead to wild skew between the worker threads and as a result query performance is effectively random.

Documented trace flag 8744 disables this optimization. Adding it to the query using QUERYTRACEON results in much more stable performance. The query typically finishes in about 15 seconds. Here is an example thread distribution:

If you fight the ISV fight like I do, you may not be able to enable trace flags for individual queries. If you’re desperate you could try artificially lowering the cardinality estimate from the derived table. An OPTIMIZE FOR query hint with a direct filter is my preferred way to accomplish this. I like to set the cardinality estimate equal to MAXDOP but I have no real basis for doing this. Here is the full query text:

DECLARE @filter BIGINT = 987654321987654321;
INSERT INTO ## WITH (TABLOCK)
SELECT ca.*
FROM (
VALUES
(1, 50000),
(50001, 100000),
(100001, 150000),
(150001, 200000),
(200001, 250000),
(250001, 300000),
(300001, 350000),
(350001, 400000),
(400001, 450000),
(450001, 500000),
(500001, 550000),
(550001, 600000),
(600001, 650000),
(650001, 700000),
(700001, 750000),
(750001, 800000),
(800001, 850000),
(850001, 900000),
(900001, 950000),
(950001, 1000000),
(1000001, 1050000),
(1050001, 1100000),
(1100001, 1150000),
(1150001, 1200000),
(1200001, 1250000),
(1250001, 1300000),
(1300001, 1350000),
(1350001, 1400000),
(1400001, 1450000),
(1450001, 1500000),
(1500001, 1550000),
(1550001, 1600000),
(1600001, 1650000),
(1650001, 1700000),
(1700001, 1750000),
(1750001, 1800000),
(1800001, 1850000),
(1850001, 1900000),
(1900001, 1950000),
(1950001, 2000000),
(2000001, 2050000),
(2050001, 2100000),
(2100001, 2150000),
(2150001, 2200000),
(2200001, 2250000),
(2250001, 2300000),
(2300001, 2350000),
(2350001, 2400000),
(2400001, 2450000),
(2450001, 2500000),
(2500001, 2550000),
(2550001, 2600000),
(2600001, 2650000),
(2650001, 2700000),
(2700001, 2750000),
(2750001, 2800000),
(2800001, 2850000),
(2850001, 2900000),
(2900001, 2950000),
(2950001, 3000000),
(3000001, 3050000),
(3050001, 3100000),
(3100001, 3150000),
(3150001, 3200000),
(3200001, 3250000),
(3250001, 3300000),
(3300001, 3350000),
(3350001, 3400000),
(3400001, 3450000),
(3450001, 3500000),
(3500001, 3550000),
(3550001, 3600000),
(3600001, 3650000),
(3650001, 3700000),
(3700001, 3750000),
(3750001, 3800000),
(3800001, 3850000),
(3850001, 3900000),
(3900001, 3950000),
(3950001, 4000000),
(4000001, 4050000),
(4050001, 4100000),
(4100001, 4150000),
(4150001, 4200000),
(4200001, 4250000),
(4250001, 4300000),
(4300001, 4350000),
(4350001, 4400000),
(4400001, 4450000),
(4450001, 4500000),
(4500001, 4550000),
(4550001, 4600000),
(4600001, 4650000),
(4650001, 4700000),
(4700001, 4750000),
(4750001, 4800000)
) driver( s, e)
CROSS APPLY (
	SELECT TOP (987654321987654321) ID, CHECKSUM(STRING_AGG(STRING_TO_AGG , ',') WITHIN GROUP (ORDER BY ID2)) ALL_STRINGS_CHECKSUM
	FROM # WITH (FORCESEEK)
	WHERE ID BETWEEN CAST(driver.s AS BIGINT) AND CAST(driver.e AS BIGINT)
	GROUP BY ID
) ca
WHERE driver.s <= @filter
OPTION (OPTIMIZE FOR (@filter = 350001), MAXDOP 8, NO_PERFORMANCE_SPOOL, FORCE ORDER);

Query performance is the same as with TF 8744:

 

Does this query do better than round robin partitioning when there is a busy MAXDOP 1 query running at the same time? I ran it a few times and it completed in about 15-16 seconds every time. One of the worker threads does less work and the others cover for it:

In this example the nested loop join only gets the prefetch optimization if the cardinality estimate is more than 25 rows. I do not know if that number is a fixed part of the algorithm for prefetch eligibility but it certainly feels unwise to rely on this behavior never changing in a future version of SQL Server. Note that prefetching is a trade-off. For some workloads you may be better off with round robin partitioning and prefetching compared to demand without prefetching. It’s hard to imagine a workload that would benefit from demand with prefetching but perhaps I’m not being creative enough in my thinking.

Final Thoughts


In summary, the example parallel apply query that uses demand partitioning performs 2-3 times better the query that uses round robin partitioning when another serial query is running on the server. The nested loop prefetch optimization does not work well witth exchange operator demand partitioning and should be avoided via a trace flag or other tricks if demand partitioning is in use.

There are a few things that Microsoft could do to improve the situation. A USE HINT that disables nested loop prefetching would be most welcome. I think that there’s also a fairly strong argument to make that a query pattern of a nested loop join with prefetching with a first child of a demand exchange operator is a bad pattern that the query optimizer should avoid if possible. Finally, it would be nice if there was a type of wait statistic triggered when some of a parallel query’s workers finish their work earlier than others. The problematic queries here have very little CXPACKET waits and no EXECSYNC waits. Imagine that I put a link to UserVoice requests here and imagine yourself voting for them.

Thanks for reading!

Join Joe Obbish In NYC On October 4th To Learn Everything About Columnstore

Second To None


Prior to this year’s SQL Saturday in NYC, we’re running one very special precon, with Joe Obbish:

Joe Obbish – Clustered Columnstore For Performance

Buy Tickets Here!

Clustered columnstore indexes can be a great solution for data warehouse workloads, but there’s not a lot of advanced training or detailed documentation out there. It’s easy to feel all alone when you want a second opinion or run into a problem with query performance or data loading that you don’t know how to solve.

In this full day session, I’ll teach you the most important things I know about clustered columnstore indexes. Specifically, I’ll teach you how to make the right choices with your schema, data loads, query tuning, and columnstore maintenance. All of these lessons have been learned the hard way with 4 TB of production data on large, 96+ core servers. Material is applicable from SQL Server 2016 through 2019.

Here’s what I’ll be talking about:

– How columnstore compression works and tips for picking the right data types
– Loading columnstore data quickly, especially on large servers
– Improving query performance on columnstore tables
– Maintaining your columnstore tables

This is an advanced level session. To get the most out of the material, attendees should have some practical experience with columnstore and query tuning, and a solid understanding of internals such as wait stats analysis. You don’t need to bring a laptop to follow along.

Buy Tickets Here!

Wanna save 25%? Use coupon code “actionjoe” at checkout — it’s good for the first 10 seats, so hurry up and get yours today.

Thanks for reading!

The Hardest Part Of Query Tuning: Logical Equivalence

Grey Matter


Often when query tuning, I’ll try a change that I think makes sense, only to have it backfire.

It’s not that the query got slower, it’s that the results that came back were wrong different.

Now, this can totally happen because of a bug in previously used logic, but that’s somewhat rare.

And wrong different results make testers nervous. Especially in production.

Here’s a Very Cheeky™ example.

Spread’em


This is my starting query. If I run it enough times, I’ll get a billion missing index requests.

WITH topusers AS
(
    SELECT   TOP (1)
             u.Id, u.DisplayName
    FROM     dbo.Users AS u
    ORDER BY u.Reputation DESC
)
SELECT   u.Id,
         u.DisplayName,
         SUM(p.Score * 1.0) AS PostScore,
         SUM(c.Score * 1.0) AS CommentScore,
         COUNT_BIG(*) AS CountForSomeReason
FROM     topusers AS u
JOIN     dbo.Posts AS p
    ON p.OwnerUserId = u.Id
JOIN     dbo.Comments AS c
    ON c.UserId = u.Id
WHERE    p.Score >= 5
AND      c.Score >= 1
GROUP BY u.Id, u.DisplayName;

For the sake of argument, I’ll add them all. Here they are:

CREATE INDEX ix_tabs
    ON dbo.Users ( Reputation DESC, Id )
    INCLUDE ( DisplayName );

CREATE INDEX ix_spaces
    ON dbo.Users ( Id, Reputation DESC )
    INCLUDE ( DisplayName );

CREATE INDEX ix_coke 
    ON dbo.Comments ( Score) INCLUDE( UserId );

CREATE INDEX ix_pepsi
    ON dbo.Posts ( Score ) INCLUDE( OwnerUserId );

CREATE NONCLUSTERED INDEX ix_tastes_great
    ON dbo.Posts ( OwnerUserId, Score );

CREATE NONCLUSTERED INDEX ix_less_filling
    ON dbo.Comments ( UserId, Score );

With all those indexes, the query is still dog slow.

Maybe It’s Me


I’ll take my own advice. Let’s break the query up a little bit.

DROP TABLE IF EXISTS #topusers;

WITH topusers AS
(
    SELECT   TOP (1)
             u.Id, u.DisplayName
    FROM     dbo.Users AS u
    ORDER BY u.Reputation DESC
)
SELECT *
INTO #topusers
FROM topusers;

CREATE UNIQUE CLUSTERED INDEX ix_whatever 
    ON #topusers(Id);

SELECT   u.Id,
         u.DisplayName,
         SUM(p.Score * 1.0) AS PostScore,
         SUM(c.Score * 1.0) AS CommentScore,
         COUNT_BIG(*) AS CountForSomeReason
FROM     #topusers AS u
JOIN     dbo.Posts AS p
    ON p.OwnerUserId = u.Id
JOIN     dbo.Comments AS c
    ON c.UserId = u.Id
WHERE    p.Score >= 5
AND      c.Score >= 1
GROUP BY u.Id, u.DisplayName;

Still dog slow.

Variability


Alright, I’m desperate now. Let’s try this.

DECLARE @Id INT, 
        @DisplayName NVARCHAR(40);

SELECT   TOP (1)
            @Id = u.Id, 
		    @DisplayName = u.DisplayName
FROM     dbo.Users AS u
ORDER BY u.Reputation DESC;


SELECT   @Id AS Id,
         @DisplayName AS DisplayName,
         SUM(p.Score * 1.0) AS PostScore,
         SUM(c.Score * 1.0) AS CommentScore,
         COUNT_BIG(*) AS CountForSomeReason
FROM dbo.Posts AS p 
JOIN  dbo.Comments AS c 
    ON c.UserId = p.OwnerUserId
WHERE    p.Score >= 5
AND      c.Score >= 1
AND      (c.UserId = @Id OR @Id IS NULL)
AND      (p.OwnerUserId = @Id OR @Id IS NULL);

Let’s get some worst practices involved. That always goes well.

Except here.

Getting the right results seemed like it was destined to be slow.

Differently Resulted


At this point, I tried several rewrites that were fast, but wrong.

What I had missed, and what Joe Obbish pointed out to me, is that I needed a cross join and some math to make it all work out.

WITH topusers AS
(
    SELECT   TOP (1)
             u.Id, u.DisplayName
    FROM     dbo.Users AS u
    ORDER BY u.Reputation DESC
)
SELECT     t.Id AS Id,
           t.DisplayName AS DisplayName,
           p_u.PostScoreSub * c_u.CountCSub AS PostScore,
           c_u.CommentScoreSub * p_u.CountPSub AS CommentScore,
           c_u.CountCSub * p_u.CountPSub AS CountForSomeReason
FROM       topusers AS t
JOIN       (   SELECT   p.OwnerUserId, 
                        SUM(p.Score * 1.0) AS PostScoreSub, 
						COUNT_BIG(*) AS CountPSub
               FROM     dbo.Posts AS p
               WHERE    p.Score >= 5
               GROUP BY p.OwnerUserId ) AS p_u
			   ON p_u.OwnerUserId = t.Id
CROSS JOIN (   SELECT   c.UserId, SUM(c.Score * 1.0) AS CommentScoreSub, COUNT_BIG(*) AS CountCSub
               FROM     dbo.Comments AS c
               WHERE    c.Score >= 1
               GROUP BY c.UserId ) AS c_u
               WHERE c_u.UserId = t.Id;

This finishes instantly, with the correct results.

The value of a college education!

Realizations and Slowness


After thinking about Joe’s rewrite, I had a terrible thought.

All the rewrites that were correct but slow had gone parallel.

“Parallel”

Allow me to illustrate.

In a row?

Repartition Streams usually does the opposite.

But here, it puts all the rows on a single thread.

“For correctness”

Which ends up in a 236 million row parallel-but-single-threaded-cross-hash-join.

Summary Gates Are On My Side


Which, of course, is nicely summarized by P. White.

SQL Server uses the correct join (inner or outer) and adds projections where necessary to honour all the semantics of the original query when performing internal translations between apply and join.

The differences in the plans can all be explained by the different semantics of aggregates with and without a group by clause in SQL Server.

What’s amazing and frustrating about the optimizer is that it considers all sorts of different ways to rewrite your query.

In milliseconds.

It may have even thought about a plan that would have been very fast.

But we ended up with this one, because it looked cheap.

Untuneable


The plan for Joe’s version of the query is amazingly simple.

Bruddah.

Sometimes giving the optimizer a different query to work with helps, and sometimes it doesn’t.

Rewriting queries is tough business. When you change things and still get the same plan, it can be really frustrating.

Just know that behind the scenes the optimizer is working hard to rewrite your queries, too.

If you really want to change the execution plan you end up with, you need to present the logic to the optimizer in different ways, and often with different indexes to use.

Other times, you just gotta ask Joe.

Thanks for reading!

An Extra Special Guest Blogger: Joe Obbish

Joe, colorized c.2019

Friend Zone


Today I’m thrilled to announce that I’ll be hosting all of Joe Obbish’s past and future blogging material on my site.

Not because Joe was all like “hey man, I really like what you’re doing there” or anything.

He just didn’t wanna deal with hosting his own content anymore.

Well, fine.

Joe is brilliant beyond words, and I’m happy to have him here. Hopefully it’ll raise the website IQ a little bit.

Thanks for reading!