Partial Aggregation

In some of my past posts, I've discussed how SQL Server implements aggregation including the stream aggregate and hash aggregate operators.  I also used hash aggregation as an initial example in my introductory post on parallel query execution.   In this post, I'll look at a partial aggregation.  Partial aggregation is a technique that SQL Server uses to optimize parallel aggregation.  Before I begin, I just want to note that I also discuss partial aggregation in Inside Microsoft SQL Server 2005: Query Tuning and Optimization.  (See the bottom of page 187.)

Let's begin with a simple scalar aggregation example.  Recall that a scalar aggregate is an aggregate without a GROUP BY clause.  A scalar aggregate always produces a single output row.

CREATE TABLE T (A INT, B INT IDENTITY, C INT, D INT)
CREATE CLUSTERED INDEX TA ON T(A)

SELECT COUNT(*) FROM T

Not surprisingly, this query yields a trivial stream aggregate plan:

  |--Compute Scalar(DEFINE:([Expr1004]=CONVERT_IMPLICIT(int,[Expr1005],0)))
       |--Stream Aggregate(DEFINE:([Expr1005]=Count(*)))
            |--Clustered Index Scan(OBJECT:([T].[TA]))

Now, suppose that we want to parallelize this query.  Because this query outputs a single row, we cannot simply put an exchange (i.e., parallelism operator) at the root of the plan and divide the work of counting among multiple threads.  Such a strategy would yield one output row per thread which is clearly not the correct result.

Alternatively, we could put a gather streams exchange between the stream aggregate and clustered index scan operators.  This strategy would permit us to use a parallel scan while still counting in a single thread and outputting a single row.  However, we would end up moving every row from the scan through the exchange - a rather costly operation.  Thus, this option, while valid, would not yield nearly the performance we'd like to see.

Fortunately, there is a third option.  We can use a parallel scan, divide the work of counting among multiple threads (as in the first option), use an exchange to gather the per-thread counts into a single thread, and finally sum the per-thread counts to generate the grand total.  This strategy is much more efficient as we need only move a single row per thread through the exchange.  To get the optimizer to generate this plan, we need to add lots of data to the table.  To save time, I'm going to use UPDATE STATISTICS to trick the optimizer into thinking that we've added rows to the table:

UPDATE STATISTICS T WITH ROWCOUNT = 1000000, PAGECOUNT = 100000
SELECT COUNT(*) FROM T OPTION (RECOMPILE)

We need the RECOMPILE query hint to force the optimizer to generate a new plan with the new statistics.  Here is the plan we get:

  |--Compute Scalar(DEFINE:([Expr1004]=CONVERT_IMPLICIT(int,[globalagg1006],0)))
       |--Stream Aggregate(DEFINE:([globalagg1006]=SUM([partialagg1005])))
            |--Parallelism(Gather Streams)
                 |--Stream Aggregate(DEFINE:([partialagg1005]=Count(*)))
                      |--Clustered Index Scan(OBJECT:([T].[TA]))

We call the bottommost aggregate operator a partial aggregate because it computes only part of the result.  We also sometimes refer to this operator as a local aggregate because it computes the portion of the result that is local to the thread where it executes.  We refer to the topmost aggregate as a global aggregate because it computes the full result.

SQL Server is able to use partial aggregation for most aggregate functions including the standard built-ins: COUNT, SUM, AVG, MIN, and MAX.  While partial aggregation is necessary to parallelize scalar aggregates, it is also useful even for aggregates with a GROUP BY clause.  Whether the optimizer chooses to use partial aggregation depends on the number of unique groups and the size of these groups.  If the optimizer anticipates that a query will generate few large groups (such as in the scalar aggregation case), it will use partial aggregation.  However, if the optimizer expects that a query will generate many small groups, it may choose to use a single level aggregation.  With small groups, a partial aggregate cannot reduce the number of rows significantly and merely adds overhead to the query.  Moreover, with many groups it is easy to parallelize the aggregation by hashing on the GROUP BY keys and distributing different groups to different threads.

Let's see how the optimizer makes this choice.  Column B in our example has the IDENTITY property.  Although we have no real data, this property is sufficient to trick the optimizer into concluding that this column is mostly unique.  (Without a unique index, the optimizer cannot be certain that the column is indeed unique and must assume that it is not.)  Suppose we aggregate on this column:

SELECT COUNT(*) FROM T GROUP BY B

  |--Parallelism(Gather Streams)
       |--Compute Scalar(DEFINE:([Expr1004]=CONVERT_IMPLICIT(int,[Expr1007],0)))
            |--Hash Match(Aggregate, HASH:([T].[B]) DEFINE:([Expr1007]=COUNT(*)))
                 |--Parallelism(Repartition Streams, Hash Partitioning, PARTITION COLUMNS:([T].[B]))
                      |--Clustered Index Scan(OBJECT:([T].[TA]))

Notice that this query yields a normal single level, albeit parallel, aggregate operator.  Now suppose we aggregate on column C which does not have the IDENTITY property:

SELECT COUNT(*) FROM T GROUP BY C

  |--Compute Scalar(DEFINE:([Expr1004]=CONVERT_IMPLICIT(int,[globalagg1006],0)))
       |--Parallelism(Gather Streams)
            |--Stream Aggregate(GROUP BY:([T].[C]) DEFINE:([globalagg1006]=SUM([partialagg1005])))
                 |--Sort(ORDER BY:([T].[C] ASC))
                      |--Parallelism(Repartition Streams, Hash Partitioning, PARTITION COLUMNS:([T].[C]))
                           |--Hash Match(Partial Aggregate, HASH:([T].[C]), RESIDUAL:([T].[C] = [T].[C]) DEFINE:([partialagg1005]=COUNT(*)))
                                |--Clustered Index Scan(OBJECT:([T].[TA]))

This time we get a partial aggregate.  Also, observe that the partial aggregate is a hash aggregate while the global aggregate is a stream aggregate.  The optimizer is free to choose either physical aggregation operator (stream or hash) for the partial and global aggregates in a plan with partial aggregation.  The decision of which operator to use is cost based.

Finally, it is worth noting that, while a stream aggregate behaves identically whether it is computing a partial or global aggregate, a partial hash aggregate does differ slightly from a normal hash aggregate.  First, a partial hash aggregate requests only a fixed minimal memory grant as it presumes that it will be computing a relatively small number of groups.  Second, a partial hash aggregate never spills rows to tempdb.  If a partial hash aggregate runs out of memory, it simply stops aggregating and begins returning non-aggregated rows.  This behavior is safe since the global aggregate will always compute the correct final results regardless of what the partial aggregate does.  The partial aggregate is merely a performance optimization and the goal is to prevent it from stealing resources from the global aggregate.