SQL is one of the major tools of data analysis. It provides filtering, transforming and aggregation functionalities, and we can use it to process big volume of data with the help of Hive and Hadoop. However, legacy SQL does not support operations like grouped ranking and moving average, because the
GROUP BY clause can only produce one aggregation result for each group, but not for each row. Fortunately, with the new SQL standard coming, we can use the
WINDOW clause to compute aggregations on a set of rows and return the result for each row.
For instance, if we want to calculate the two-day moving average for each stock, we can write the following query:
ROWS BETWEEN AND are all newly added SQL keywords to support windowing operations. In this query,
PARTITION BY and
ORDER BY works like
GROUP BY and
ORDER BY after the
WHERE clause, except it doesn’t collapse the rows, but only divides them into non-overlapping partitions to work on.
ROWS BETWEEN AND here constructs a window frame. In this case, each frame contains the previous row and current row. We’ll discuss more on frames later. Finally,
AVG is a window function that computes results on each frame. Note that
WINDOW clause can also be directly appended to window function:
SELECT AVG(`close`) OVER (PARTITION BY `stock`) AS `mavg` FROM `t_stock`;
SQL window query introduces three concepts, namely window partition, window frame and window function.
PARTITION clause divides result set into window partitions by one or more columns, and the rows within can be optionally sorted by one or more columns. If there’s not
PARTITION BY, the entire result set is treated as a single partition; if there’s not
ORDER BY, window frames cannot be defined, and all rows within the partition constitutes a single frame.
Window frame selects rows from partition for window function to work on. There’re two ways of defining frame in Hive,
RANGE. For both types, we define the upper bound and lower bound. For instance,
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW selects rows from the beginning of the partition to the current row;
SUM(close) RANGE BETWEEN 100 PRECEDING AND 200 FOLLOWING selects rows by the distance from the current row’s value. Say current
200, and this frame will includes rows whose
close values range from
400, within the partition. All possible combinations of frame definitions are listed as follows, and the default definition is
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
(ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
All window functions compute results on the current frame. Hive supports the following functions:
LAST_VALUE(col)returns the column value of first / last row within the frame;
LAG(col, n)returns the column value of n-th row before / after current row;
ROW_NUMBER()assigns a sequence of the current row within the frame. The difference is
RANK()will contain duplicate if there’re identical values.
MIN(col)works as usual.
First, let’s create some test data of employee incomes in Hive:
CREATE TABLE t_employee (id INT, emp_name VARCHAR(20), dep_name VARCHAR(20),
We can use the
RANK() function to find out who earns the most within each department:
SELECT dep_name, emp_name, salary
Normally when there’s duplicates,
RANK() returns the same value for each row and skip the next sequence number. Use
DENSE_RANK() if you want consecutive ranks.
We can calculate the cumulative distribution of salaries among all departments. For example, salary
4000‘s cumulative distribution is
0.55, which means 55% people’s salaries are less or equal to
4000. To calculate this, we first count the frequencies of every salary, and do a cumulative summing:
This can also be done with Hive’s
CUME_DIST() window function. There’s another
PERCENT_RANK() function, which computes the rank of the salary as percentage.
We can divide click events into different sessions by setting a timeout, in this case 30 minutes, and assign an id to each session:
First, in subquery
b, we use the
LAG(col) function to calculate the time difference between current row and previous row, and if it’s more than 30 minutes, a new session is marked. Then we do a cumulative sum of the
new_session field so that each session will get an incremental sequence.
Briefly speaking, window query consists of two steps: divide records into partitions, and evaluate window functions on each of them. The partitioning process is intuitive in map-reduce paradigm, since Hadoop will take care of the shuffling and sorting. However, ordinary UDAF can only return one row for each group, but in window query, there need to be a table in, table out contract. So the community introduced Partitioned Table Function (PTF) into Hive.
PTF, as the name suggests, works on partitions, and inputs / outputs a set of table rows. The following sequence diagram lists the major classes of PTF mechanism.
PTFOperator reads data from sorted source and create input partitions;
WindowTableFunction manages window frames, invokes window functions (UDAF), and writes the results to output partitions.
The HIVE-896 ticket (link) contains discussions on introducing analytical window functions into Hive, and this slide (link), authored by one of the committers, explains how they implemented and merged PTF into Hive.