Hash Aggregate

In my prior two posts, I wrote about the stream aggregate operator.  Stream aggregate is great for scalar aggregates and for aggregations where we have an index to provide a sort order on the group by column(s) or where we need to sort anyhow (e.g., due to an order by clause).

The other aggregation operator, hash aggregate, is similar to hash join.  It does not require (or preserve) sort order, requires memory, and is blocking (i.e., it does not produce any results until it has consumed its entire input).  Hash aggregate excels at efficiently aggregating very large data sets.

Here is pseudo-code for the hash aggregate algorithm:

for each input row
begin
calculate hash value on group by column(s)
check for a matching row in the hash table
if we do not find a match
insert a new row into the hash table
else
update the matching row with the input row
end
output all rows in the hash table

While stream aggregate computes just one group at a time, hash aggregate computes all of the groups simultaneously.  We use a hash table to store these groups.  With each new input row, we check the hash table to see whether the new row belongs to an existing group.  If it does, we simply update the existing group.  If not, we create a new group.  Since the input data is unsorted, any row can belong to any group.  Thus, we cannot output any results until we’ve finished processing every input row.

Memory and spilling

As with hash join, the hash aggregate requires memory.  Before executing a query with a hash aggregate, SQL Server uses cardinality estimates to estimate how much memory we need to execute the query.  With a hash join, we store each build row, so the total memory requirement is proportional to the number and size of the build rows.  The number of rows that join and the output cardinality of the join have no effect on the memory requirement of the join.  With a hash aggregate, we store one row for each group, so the total memory requirement is actually proportional to the number and size of the output groups or rows.  If we have fewer unique values of the group by column(s) and fewer groups, we need less memory.  If we have more unique values of the group by column(s) and more groups, we need more memory.

So, what happens if we run out of memory?  Again, like hash join, if we run out of memory, we must begin spilling rows to tempdb.  We spill one or more buckets or partitions including any partially aggregated results along with any additional new rows that hash to the spilled buckets or partitions.  Although we do not attempt to aggregate the spilled new rows, we do hash them and divide them up into several buckets or partitions.  Once we’ve finished processing all input groups, we output the completed in-memory groups and repeat the algorithm by reading back and aggregating one spilled partition at a time.  By dividing the spilled rows into multiple partitions, we reduce the size of each partition and, thus, reduce the risk that the algorithm will need to repeat many times.

Note that while duplicate rows are a big problem for hash join as they lead to skew in the size of the different hash buckets and make it difficult to divide the work into small uniform portions, duplicates are actually quite helpful for hash aggregate since they collapse into a single group.

Examples

The optimizer tends to favor hash aggregation for tables with more rows and groups.  For example, with only 100 rows and 10 groups, we get a sort and a stream aggregate:

create table t (a int, b int, c int)

set nocount on

declare @i int

set @i = 0

while @i < 100

  begin

    insert t values (@i % 10, @i, @i * 3)

    set @i = @i + 1

  end

 

select sum(b) from t group by a

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1010]=COUNT_BIG([t].[b]), [Expr1011]=SUM([t].[b])))
|--Sort(ORDER BY:([t].[a] ASC))
|--Table Scan(OBJECT:([t]))

But, with 1000 rows and 100 groups, we get a hash aggregate:

truncate table t

declare @i int

set @i = 100

while @i < 1000

  begin

    insert t values (@i % 100, @i, @i * 3)

    set @i = @i + 1

  end

 

select sum(b) from t group by a

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Hash Match(Aggregate, HASH:([t].[a]), RESIDUAL:([t].[a] = [t].[a]) DEFINE:([Expr1010]=COUNT_BIG([t].[b]), [Expr1011]=SUM([t].[b])))
|--Table Scan(OBJECT:([t]))

Notice that we hash on the group by column.  The residual predicate from the hash aggregate is used to compare rows in the hash table to input rows in case we have a hash value collision.

Notice also that with the hash aggregate we do not need a sort.  A sort requires more memory than the hash aggregate since we must sort 1000 rows but only need memory in the hash aggregate for 100 groups.  However, if we explicitly request a sort using an order by clause in the query, we get the stream aggregate again:

select sum(b) from t group by a order by a

  |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Stream Aggregate(GROUP BY:([t].[a]) DEFINE:([Expr1010]=COUNT_BIG([t].[b]), [Expr1011]=SUM([t].[b])))
|--Sort(ORDER BY:([t].[a] ASC))
|--Table Scan(OBJECT:([t]))

If the table gets big enough and the number of groups remains small enough, eventually the optimizer will decide that it is cheaper to use the hash aggregate and sort after we aggregate.  For example, with 10,000 rows and only 100 groups, the optimizer decides that it is better to hash and sort only 100 group than to sort 10,000 rows:

truncate table t

set nocount on

declare @i int

set @i = 0

while @i < 10000

  begin

    insert t values (@i % 100, @i, @i * 3)

    set @i = @i + 1

  end

 

select sum(b) from t group by a order by a

  |--Sort(ORDER BY:([t].[a] ASC))
|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0) THEN NULL ELSE [Expr1011] END))
|--Hash Match(Aggregate, HASH:([t].[a]), RESIDUAL:([t].[a] = [t].[a]) DEFINE:([Expr1010]=COUNT_BIG([t].[b]), [Expr1011]=SUM([t].[b])))
|--Table Scan(OBJECT:([t]))

Distinct

Just like stream aggregate, hash aggregate can be used to implement distinct operations:

select distinct a from t

  |--Hash Match(Aggregate, HASH:([t].[a]), RESIDUAL:([t].[a] = [t].[a]))
|--Table Scan(OBJECT:([t]))

Or for something a little more interesting:

select sum(distinct b), sum(distinct c) from t group by a

  |--Hash Match(Inner Join, HASH:([t].[a])=([t].[a]), RESIDUAL:([t].[a] = [t].[a]))
|--Compute Scalar(DEFINE:([t].[a]=[t].[a]))
| |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1018]=(0) THEN NULL ELSE [Expr1019] END))
| |--Hash Match(Aggregate, HASH:([t].[a]), RESIDUAL:([t].[a] = [t].[a]) DEFINE:([Expr1018]=COUNT_BIG([t].[b]), [Expr1019]=SUM([t].[b])))
| |--Hash Match(Aggregate, HASH:([t].[a], [t].[b]), RESIDUAL:([t].[a] = [t].[a] AND [t].[b] = [t].[b]))
| |--Table Scan(OBJECT:([t]))
|--Compute Scalar(DEFINE:([t].[a]=[t].[a]))
|--Compute Scalar(DEFINE:([Expr1005]=CASE WHEN [Expr1020]=(0) THEN NULL ELSE [Expr1021] END))
|--Hash Match(Aggregate, HASH:([t].[a]), RESIDUAL:([t].[a] = [t].[a]) DEFINE:([Expr1020]=COUNT_BIG([t].[c]), [Expr1021]=SUM([t].[c])))
|--Hash Match(Aggregate, HASH:([t].[a], [t].[c]), RESIDUAL:([t].[a] = [t].[a] AND [t].[c] = [t].[c]))
|--Table Scan(OBJECT:([t]))

This plan is logically equivalent to the last plan from my stream aggregate post, but it uses hash aggregate and hash join instead of sorts, stream aggregates, and merge join.  We have two hash aggregates to eliminate duplicates (one for "distinct b" and one for "distinct c") and two more hash aggregates to compute the two sums.  The hash join "glues" the resulting rows together to form the final result.

Hints

You can use the “order group” and “hash group” query hints to force stream aggregate and hash aggregate respectively.  These hints affect all aggregation operations in the entire query.  For example:

select sum(b) from t group by a option(order group)

select sum(b) from t group by a option(hash group)

Sql profiler

You can use the SQL profiler “Hash Warning” event class (in the “Errors and Warnings” event category) to detect when a query with a hash join or hash aggregate spills.  Spilling generates I/O and can adversely affect performance.  See Books Online for more information about this event.

Comments

  • Anonymous
    May 17, 2007
    The comment has been removed
  • Anonymous
    July 09, 2007
    In my last post , I gave an overview of the PIVOT operator. In this post, I'm going to take a look at
  • Anonymous
    September 21, 2007
    In this post, I'm going to discuss how aggregation WITH ROLLUP works. The WITH ROLLUP clause permits
  • Anonymous
    September 21, 2007
    In this post, I&#39;m going to discuss how aggregation WITH ROLLUP works. The WITH ROLLUP clause permits
  • Anonymous
    January 18, 2008
    In some of my past posts, I've discussed at how SQL Server implements aggregation including the stream
  • Anonymous
    January 18, 2008
    In some of my past posts, I&#39;ve discussed at how SQL Server implements aggregation including the stream