Stream Aggregate

There are two physical operators that SQL Server uses to compute general purpose aggregates where we have a GROUP BY clause.  One of these operators is stream aggregate which as we saw last week is used for scalar aggregates.  The other operator is hash aggregate.  In this post, I’ll take a closer look at how stream aggregate works.

The algorithm

Stream aggregate relies on data arriving sorted by the group by column(s).  If we are grouping on more than one column, we can choose any sort order that includes all of the columns.  For example, if we are grouping on columns a and b, we can sort on “(a, b)” or on “(b, a)”.  As with merge join, the sort order may be delivered by an index or by an explicit sort operator.  The sort order ensures that sets of rows with the same value for the group by columns will be adjacent to one another.

Here is pseudo-code for the stream aggregate algorithm:

clear the current aggregate results
clear the current group by columns
for each input row
begin
if the input row does not match the current group by columns
begin
output the aggregate results
clear the current aggregate results
set the current group by columns to the input row
end
update the aggregate results with the input row
end

For example, if we are computing a SUM, the stream aggregate considers each input row.  If the input row belongs to the current group (i.e., the group by columns of the input row match the group by columns of the previous row), we update the current SUM by adding the appropriate value from the input row to the running total.  If the input row belongs to a new group (i.e., the group by columns of the input row do not match the group by columns of the previous row), we output the current SUM, reset the SUM to zero, and start a new group.

Simple examples

create table t (a int, b int, c int)

select sum(c) from t group by a, b

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Stream Aggregate(GROUP BY:([t].[b], [t].[a]) DEFINE:([Expr1010]=COUNT_BIG([t].[c]), [Expr1011]=SUM([t].[c])))
|--Sort(ORDER BY:([t].[b] ASC, [t].[a] ASC))
|--Table Scan(OBJECT:([t]))

This is basically the same plan that we saw for the scalar aggregate SUM query except that we need to sort the data before we aggregate.  (You can think of the scalar aggregate as one big group containing all of the rows; thus, for a scalar aggregate there is no need to sort the rows into different groups.)

Stream aggregate preserves the input sort order so if we request order on the group by columns or on a subset of the group by columns, we do not need to sort again:

select sum(c) from t group by a, b order by a

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Stream Aggregate(GROUP BY:([t].[a], [t].[b]) DEFINE:([Expr1010]=COUNT_BIG([t].[c]), [Expr1011]=SUM([t].[c])))
|--Sort(ORDER BY:([t].[a] ASC, [t].[b] ASC))
|--Table Scan(OBJECT:([t]))

Note that the sort columns are reversed from the first example.  Previously, we didn’t care whether we sorted on “(a, b)” or “(b, a)”.  Now since the query includes an order by clause, we choose to sort on column a first.

If we have an appropriate index, we do not need to sort at all:

create clustered index tab on t(a,b)

select sum(c) from t group by a, b

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Stream Aggregate(GROUP BY:([t].[a], [t].[b]) DEFINE:([Expr1010]=COUNT_BIG([t].[c]), [Expr1011]=SUM([t].[c])))
|--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Select distinct

If we have an index to provide order, stream aggregate can also be used to implement select distinct.  (If we need to sort to get the order, we can just let the sort distinct directly; there is no reason to use the stream aggregate.)  Select distinct is essentially the same as group by on all selected columns with no aggregate functions.  For example:

select distinct a, b from t

Can also be written as:

select a, b from t group by a, b

Both queries use the same plan:

  |--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))
|--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Notice that the stream aggregate has a group by clause, but no defined columns.

Distinct aggregates

Consider this query:

select sum(distinct b) from t group by a

We need to eliminate duplicate values of column b from each group.  In my last post, we saw one way to do this using a sort distinct.  However, if we have an appropriate index, we can also use the stream aggregate to eliminate duplicates:

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1010]=COUNT_BIG([t].[b]), [Expr1011]=SUM([t].[b])))
|--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))
|--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

The bottommost stream aggregate eliminates duplicates while the topmost one performs the aggregation.

Multiple distincts

Finally, consider this query:

select sum(distinct b), sum(distinct c) from t group by a

  |--Merge Join(Inner Join, MANY-TO-MANY MERGE:([t].[a])=([t].[a]), RESIDUAL:([t].[a] = [t].[a]))
|--Compute Scalar(DEFINE:([t].[a]=[t].[a]))
| |--Compute Scalar(DEFINE:([Expr1005]=CASE WHEN [Expr1018]=(0) THEN NULL ELSE [Expr1019] END))
| |--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1018]=COUNT_BIG([t].[c]), [Expr1019]=SUM([t].[c])))
| |--Sort(DISTINCT ORDER BY:([t].[a] ASC, [t].[c] ASC))
| |--Clustered Index Scan(OBJECT:([t].[tab]))
|--Compute Scalar(DEFINE:([t].[a]=[t].[a]))
|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1020]=(0) THEN NULL ELSE [Expr1021] END))
|--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1020]=COUNT_BIG([t].[b]), [Expr1021]=SUM([t].[b])))
|--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))
|--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

As in the multiple scalar distinct example from my prior post, we need to break this query into two parts – one for each distinct set.  Note that the computation of “sum(distinct c)” requires a sort to distinct on column c while the computation of “sum(distinct b)” uses an ordered scan of the clustered index and a stream aggregate as shown above.  We join these two sets of sums on the group by column (in this case column a) to generate the final result.  Since the two input sets are already sorted on the group by column, we can use a merge join.  (The compute scalar operators that appear to define “[t].[a] = [t].[a]” are needed for internal purposes and can be disregarded.)

The merge join ought to be one-to-many not many-to-many since the aggregates ensure uniqueness on the group by column (and join column).  This is a minor performance issue not a correctness issue.  If we rewrite the original query as an explicit join, we do get a one-to-many join:

select sum_b, sum_c

from

  (select a, sum(distinct b) as sum_b from t group by a) r

  join

  (select a, sum(distinct c) as sum_c from t group by a) s

  on r.a = s.a

  |--Merge Join(Inner Join, MERGE:([t].[a])=([t].[a]), RESIDUAL:([t].[a]=[t].[a]))
|--Compute Scalar(DEFINE:([Expr1009]=CASE WHEN [Expr1020]=(0) THEN NULL ELSE [Expr1021] END))
| |--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1020]=COUNT_BIG([t].[c]), [Expr1021]=SUM([t].[c])))
| |--Sort(DISTINCT ORDER BY:([t].[a] ASC, [t].[c] ASC))
| |--Clustered Index Scan(OBJECT:([t].[tab]))
|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1022]=(0) THEN NULL ELSE [Expr1023] END))
|--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1022]=COUNT_BIG([t].[b]), [Expr1023]=SUM([t].[b])))
|--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))
|--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Next …

In my next post, I will write about the other aggregation operator: hash aggregate.