stream person(person_id int, name char(20), email char(20), credit_card int, city char(20), state char(2), reg_time timestamp) source 'person.so' order by reg_time; stream bid(auction_id int, price int, bidder_id int, bid_time timestamp) source 'bid.so' order by bid_time; stream auction(auction_id int, item_name char(10), seller_id int, initial_price int, category_id int, expire_date timestamp, input_time timestamp) source 'auction.so' order by input_time;
Stream Mill implementation:
insert into stdout select auction_id, 0.8239*price, bidder_id, bid_time from bid;
Stream Mill implementation:
insert into stdout select auction_id, price from bid where auction_id = 1 OR auction_id = 2 OR auction_id = 3;
Stream Mill: The query is divided into two parts, the first part does a union of streams auction and person (such that we can avoid the blocking join operation, and no windowing is needed.) The second part uses an user defined aggregate on the unioned streams to filter out the correct tuples.
insert into query3 select 0, auction_id, seller_id, initial_price, -1, '', '', '', input_time from auction union select 1, -1, -1, -1, person_id, name, city, state, reg_time from person; insert into stdout select local_items(tuple_type, auction_id, seller_id, person_id, name, city, state) from query3 where tuple_type = 0 or (tuple_type = 1 and state = 'KS' or state= 'NY' or state = 'SD');
The definition for unioned stream "query3"
stream query3(tuple_type int, auction_id int, seller_id int, initial_price int, person_id int, name char(20), city char(20), state char(2), in_time timestamp);
The definition for user-defined aggregate "local_items"
AGGREGATE local_items(tuple_type int, auction_id int, seller_id int, person_id int, name char(20), city char(20), state char(2)): (name char(20), city char(20), state char(2), auction_id int) { TABLE persons(pid int, pname char(20), pcity char(20), pstate char(2)) MEMORY; INITIALIZE : ITERATE : { insert into persons values(person_id, name, city, state) where tuple_type = 1; insert into return select pname, pcity, pstate, auction_id from persons where tuple_type = 0 and seller_id = pid; } };
insert into query4 select 1, -1, -1, -1, -1, bid_time, auction_id, price, bid_time from bid union select 0, auction_id, initial_price, category_id, seller_id, expire_date, -1, -1, input_time from auction; insert into stdout select cat_avg(tuple_type, auction_id, initial_price, category_id, expire_date, b_auc_id, bid_price, in_time) from query4;
The definition for unioned stream "query4"
stream query4(tuple_type int, auction_id int, initial_price int, category_id int, seller_id int, expire_date timestamp, b_auc_id int, bid_price int, in_time timestamp);
The definition for user-defined aggregate "cat_avg"
AGGREGATE cat_avg(tuple_type int, auction_id int, initial_price int, category_id int, expire_date timestamp, b_auc_id int, bid_price int, in_time timestamp): (cat_id int, avg_price int) { TABLE open_auctions(aid int, price int, cid int, expire timestamp) MEMORY; TABLE cat_avg_tmp(category int, cur_total int, cur_count int) MEMORY; INITIALIZE : ITERATE : { insert into open_auctions values(auction_id, initial_price, category_id, expire_date) where tuple_type = 0; insert into cat_avg_tmp values(category_id, 0, 0) where tuple_type = 0 and not exists (select category from cat_avg_tmp where category = category_id); update cat_avg_tmp set cur_total = cur_total + (select sum(o.price) from open_auctions o where cat_avg_tmp.category = o.cid), cur_count = cur_count + (select count(o2.price) from open_auctions o2 where cat_avg_tmp.category = o2.cid) where exists (select * from open_auctions where expire < in_time and cid = cat_avg_tmp.category); insert into return select category, cur_total/cur_count from cat_avg_tmp where cur_count > 0 and exists (select * from open_auctions where expire < in_time); delete from open_auctions where expire < in_time; update open_auctions set price = bid_price where tuple_type = 1 and expire >= in_time and price < bid_price and aid = b_auc_id; } };