Description
Pipelining is the other side of the coin of streaming. When an operation is pipelined, it only requires a single scan of the data. Thus, a pipelined operation can be potentially used to handle data-stream processing.
This assignment is to write a user-defined aggregate to handle a running-average aggregation in a one-pass manner (“streaming”). We shall use the database system PostgreSQL (often called just Postgres, for short).
The Stream
In this case, the “stream” will be a table. Your query will go through the table effectively in a single scan, computing the running aggregate of average for each group. A database engine, of course, can do more complicated things: sorting the table various times, making multiple passes, etc. And we could set things up to stream data into a query for the database engine to handle, rather than have the query operate over a table. We aim to write our query and user-defined aggregate in such a way that the query is executed in a purely pipelined way, needing to scan the table only a single time.
Let us define the table for the assignment as
create table Stream (
id int,
grp int,
measure int,
constraint streamPK
primary key (id),
constraint idNotNeg
check (id >= 0),
constraint grpNotNeg
check (grp >= 0)
);
The table Stream will be populated with tuples with id
‘s ascending, starting with 0. These are like timestamps for the arrival of the tuples. Adjacent tuples may be in the same group (grp
). E.g.,
insert into Stream (id, grp, measure)
values
( 0, 0, 2),
( 1, 0, 3),
( 2, 1, 5),
( 3, 1, 7),
( 4, 1, 11),
( 5, 0, 13),
( 6, 0, 17),
( 7, 0, 19),
( 8, 0, 23),
( 9, 2, 29),
(10, 2, 31),
(11, 2, 37),
(12, 5, 41),
(13, 3, 43);
Of course, Stream could be very large. A “group“ of tuples are all adjacent tuples by id
with the same grp
value. Thus, there are six groups above. (Group value 0
is used twice, but for two different groups; group value 1
breaks the adjacency of 0
‘s.) The grp
values are just labels, so the order of the grp
labels is immaterial (unlike the id
‘s). And, of course, a single group’s sequence can be arbitrarily long.
We want to report for each tuple in the stream the running average of measure
in its group. For the first tuple of a group, the average is just that tuple’s measure
value itself. For the second tuple of the group, it is the average of the first and second tuples’ measure
values. And so forth.
Run on Stream above, our SQL “stream-processing program” should produce the answer table
id | grp | measure | average
----+-----+---------+------------------
0 | 0 | 2 | 2
1 | 0 | 3 | 2.5
2 | 1 | 5 | 5
3 | 1 | 7 | 6
4 | 1 | 11 | 7.66666666666667
5 | 0 | 13 | 13
6 | 0 | 17 | 15
7 | 0 | 19 | 16.3333333333333
8 | 0 | 23 | 18
9 | 2 | 29 | 29
10 | 2 | 31 | 30
11 | 5 | 37 | 37
12 | 3 | 41 | 41
13 | 3 | 43 | 42
(14 rows)
For this assignment, we shall use PostgreSQL, a truly excellent leading object-relational database system that is free and open-source.
- PostgreSQL
- The SQL Language
- PostgreSQL Documentation V11:
online manuals for PostgreSQL users
Postgres is ubiquitous, and you will find it in use everywhere. Feel free to install it on your own box, if you want. It is reasonably straightforward. We use it for various purposes in the EECS department, too.
For ease for this assignment, we shall use Docker again, as we did for Project #2, and a Postgres container. Or you can use the Postgres server on Prism (the EECS computing services).
PostgreSQL by Container
Installation: Docker & the Postgres Image
Install Docker on your box (your computer), the community edition, if you have not already. (Refer to the FAQ in the previous assignment.) The Postgres image we shall use is simply called postgres. This is the officially endorsed PostgreSQL image from the Docker library. Install the image for Docker by
mybox% docker pull postgres
You can also find the postgres image at GitHub, GitHub – docker-library/postgres: Docker Official Image packaging for Postgres. Documentation for working with the postgres image is at the Docker library site: postgres | Docker Documentation.
Postgres Container
Create a Postgres container in which you will run your job.
mybox% docker run --name Postgres -e POSTGRES_PASSWORD=Bleah -d postgres
The --name
flag above gives a name to your container, to make it easier to access. You can give it whatever name you want; “Postgres” is a reasonable name. The -e
flag sets an environment variable within the conatiner. Set the postgres password to what you want; you will need it if you start a psql
shell within the container. The -d
flag means the container will run in background as a “daemon” / service.
To stop your container running,
mybox% docker stop Postgres
And to start it up again,
mybox% docker start Postgres
The Postgres system is a client-server model. Running in the container is a Postgres database system. The utility psql
is a command-line client — a shell — for Postgres. The container has psql
installed.
Unlike the Hadoop container we used in Project #2, this Postgres container does not contain a skeleton linux with a bash shell that we can enter. It is meant to provide an operational Postgres server that applications (clients) from the “outside” can access. In fact, if you installed psql
natively on your box, or, say, a popular GUI client for Postgres such as PGAdmin — and you configured the container correctly — then you could run applications natively on your host machine that would interact with the Postgres server “in the bottle” (that is, within the running container). (Note that such configuration is more involved than I am willing to document for this assignment.)
We can also use the psql
shell available within the container. That is what we will do here. To invoke an interactive psql
shell,
mbox% docker run -it --rm --link Postgres:postgres postgres psql -h postgres -U postgres
(Note it is “\q
” to quit out of the shell.)
We can also put what we want into a file and “run” that file using psql
. This will be our general approach. This is two steps. First, we have to put the file in the container’s file system where the container’s psql
can find it. Second, we invoke the container’s psql
to run the file.
Say you have our SQL in myfile.sql
on your (host) machine. We use docker
to copy it to within the container.
mbox% docker cp ./myfile.sql Postgres:/docker-entrypoint-initdb.d/myfile.sql
Then to execute it via psql
:
mbox% docker exec -u postgres Postgres psql postgres postgres -f docker-entrypoint-initdb.d/myfile.sql
(Lots of occurrences of “postgres” in that command! Can you parse out which each is doing?)
PostgreSQL on Prism
We run a Postgres server named db in Prism. You are welcome to use it for this assignment.
Each student in the class has had an account created on db. A student’s db account name is the same as their Prism account name. The password has been set as the student’s student ID.
Resources:
- PostgreSQL on Prism
psql
: PostgreSQL’s shell client:
a guide to usingpsql
with Prism‘s db- Tutorial Video for using PostgreSQL on PRISM
Setting up your own PostgreSQL Server
You are welcome to install Postgres on your own machine — or may have already — and use that for the assignment.
Just ensure that your solution works either in the containerized version as above or on db, the PostgreSQL server on Prism, before submitting it.
Make an SQL script that
- creates the stream, which is
a. a table
Stream
with columnsid
,grp
, andmeasure
(like in the example above in §Pipeline ¶The Stream),b. populated with a few tuples for testing purposes,
- defines a user-defined aggregate function to handle the running average (call it, say,
runningAvg
), and - issues an SQL query that exercises your aggregate
runningAvg
, and that does it in a way that is guaranteed to be one pass.
An example might help: see §Example: Running Sum below.
Requirements
We want to ensure that our approach is pipelined. That is, Stream can be processed in a single pass. And second, we have to compute the running average in a safe way.
Thus, there are two things to avoid:
- anything in the query that would force multiple passes (e.g.,
group by
orpartition by
); - potential overflow computing the running average.
There is a 20% penalty each for violating these!
For the first, it is easy to be safe. Just stick closely to the “template” query in the example.
For the second, we want to compute the running average in such a way that we would not run into overflow problems or bad error accumulation for long running averages.
Computing mean incrementally
The “standard” way to accumulate the running mean (average) step by step would be to know the running sum — the sum of the numbers (x1,x2,…) that we have seen so far — and the running count — how many numbers we have seen so far — at given step i. For the running sum:
We set s0=0
to start things off. Our count is just i. For our state at step i, we need to keep si and i. Then, the mean at step i is si/i.
However, imagine that our sequence is very long. The sum could grow to be very large, so large that we would overflow our int
register.
Instead, we can calculate the mean, mi, directly (for i>0) at each step. (Let m0=0.)
Note that, in this case, for our state at step i, we need to keep mi
and i
. This looks better; it is a clear formulation for iteratively computing the average. But it really does nothing to address our overflow issue. Let us reformulate then.
Now, it is not a problem.
User-defined aggregate functions
A user-defined aggregate function needs two functions defined.
- a state function
This is called for each tuple iteratively that is part of the aggregation. it takes two arguments: a. state b. value
The value is from the tuple, and is what we are aggregating with respect to. The state is an internal value, or record of values (accomplished via a composite type), used for the aggregate to hold information it needs.
The return value of the state function is a new state.
- a final function
This extracts from the state the aggregate value representing the aggregation for return.
It takes one argument: state.
Note that the return value (the aggregate) must be the same type as what the aggregate function is aggregating over.
This last bit about the return type needing to be the same as the type that is being aggregated over is not always so convenient. Note that, for this assignment, the running average is over integer, but it returns a float for the aggregation value. To accommodate this, feel free to cast the integer measure
to float, and write your aggregate function in terms of float
.
OLAP SQL
The SQL standards include what are often called the “OLAP” extensions. These include window aggregation. What window aggregation allows one to do is to compute an aggregate per tuple with respect to a neighbourhood of that tuple. These are incredibly powerful extensions to SQL. For those interested to see more, look over Graeme Birchall’s SQL Cookbook, which is superb. His manual is for IBM DB2, but the SQL is standard, and so applies to Postgres too.
This is convenient in that one does not need to do an aggregation with respect to a group by
, and then to join the results back with the initial “table”. In fact, a join could be fatal for our application; it could break the pipeline.
We can use window aggregation to “look” at the tuple before (or after) the “current” tuple to check its value (with respect to to some ordering of the tuples, order by id
here). For instance, we can use this to see whether the current tuple is in the same group (grp
) as the previous tuple. The example for running sum below uses this technique.
Let us look at an SQL script that follows the three steps enumerated in §Strategy for computing a running sum. The script file: runningSum.sql. (You are welcome to use this as a starting template for your assignment.)
Create the stream
We mimic a stream with a table named Stream
.
create table Stream (
id int,
grp int,
measure int,
constraint streamPK
primary key (id),
constraint idNotNeg
check (id >= 0),
constraint grpNotNeg
check (grp >= 0)
);
And populate it with an simple example stream.
insert into Stream (id, grp, measure)
values
( 0, 0, 2),
( 1, 0, 3),
( 2, 1, 5),
( 3, 1, 7),
( 4, 1, 11),
( 5, 0, 13),
( 6, 0, 17),
( 7, 0, 19),
( 8, 0, 23),
( 9, 2, 29),
(10, 2, 31),
(11, 5, 37),
(12, 3, 41),
(13, 3, 43);
(Yes, this is the very same as above in §Pipeline ¶The Stream.)
You can, of course, test your code with a much longer stream. But you need not get carried away. A small sample run like this suffices.
The user-defined aggregate function
Running-sum‘s aggregate function uses a composite type as the type being aggregated over.
create type intRec as (
number int,
restart boolean
);
The field number
is the mesure we are summing. The field restart
indicates whether this is the start of a tuple group.
The state function then is
create function runningSum_state(int, intRec)
returns int
language plpgsql
as $f$
declare i alias for $1;
declare a alias for $2;
declare j int;
begin
if a.restart or i is null then
j := a.number;
elsif a.number is null then
j := i;
else
j := a.number + i;
end if;
return j;
end
$f$;
The aggregate function’s internal state in this case is simply and integer (int
). It holds the sum up through the present tuple.
The final function converts the state into an intRC
. Recall that the output type of an aggregator has to be the same as the type being aggregated. For this, the value of the restart
flag is immaterial.
create function runningSum_final(int)
returns intRec
language sql
as $f$
select cast(($1, false) as intRec);
$f$;
Finally, the aggregate function is declared.
create aggregate runningSum(intRec) (
sfunc = runningSum_state,
stype = int,
finalfunc = runningSum_final
);
The field stype
is declaring the type of the internal state.
The SQL query that pipelines Stream to compute the running sum is done as follows.
with
-- look at the neighbour tuple to the left to fetch its grp value
CellLeft (id, grp, measure, lft) as (
select id,
grp,
measure,
coalesce(
max(grp) over (
order by id
rows between
1 preceding
and
1 preceding ),
-1 )
from Stream
),
-- determine whether current tuple is start of a group
CellStart(id, grp, measure, start) as (
select id,
grp,
measure,
cast(
case
when grp = lft then 0
else 1
end
as boolean)
from CellLeft
),
-- bundle the measure and start-flag into an intRC
CellFlag(id, grp, intRC) as (
select id,
grp,
cast((measure, start) as intRec)
from CellStart
),
-- call our runningSum aggregator
CellRun(id, grp, measure, runningRC) as (
select id,
grp,
(intRC).number,
runningSum(intRC)
over (order by id)
from CellFlag
),
-- extract the running sum from the composite
CellAggr(id, grp, measure, running) as (
select id,
grp,
measure,
(runningRC).number
from CellRun
)
-- report
select id, grp, measure, running
from CellAggr
order by id;
Create a file named runningAvg.sql
, similar to the example of runningSum.sql
, that implements the three steps enumerated in §Strategy, but for computing a running average.
Submit
Use the “submit
” command on a PRISM machine to turn in your program.
% submit 4415 scan runningAvg.sql
Due: before midnight Monday 1 March.
My host OS is Windows. Will I have the headache of the “curse of DOS” again with the \r\n
EOF’s?!
Fortunately, no. Postgres and psql seem to handle DOS format fine.
If measure were to be null
for a record, how should it be handled?
For each tuple in the stream, you output a tuple with the group’s running average. If there has only been nulls for the measures, you would output a null. But once there is a non-null, you are effectively reporting the average of the non-nulls seen in that group so far.