/* definitions */ table S(index1 int, index2 int, cnt int) memory; table itemsIds(ids int) memory; table tupleCnt(cnt int) memory; table hs(h int, ah int, bh int) memory; table mPMValues(m int, P int, M int) memory; stream items(item int) source 'port4435'; /* initalization of tables, at system reboot */ insert into S values(1, 1, 0); insert into S values(1, 2, 0); insert into S values(1, 3, 0); insert into S values(1, 4, 0); insert into S values(1, 5, 0); insert into S values(2, 1, 0); insert into S values(2, 2, 0); insert into S values(2, 3, 0); insert into S values(2, 4, 0); insert into S values(2, 5, 0); insert into S values(3, 1, 0); insert into S values(3, 2, 0); insert into S values(3, 3, 0); insert into S values(3, 4, 0); insert into S values(3, 5, 0); insert into S values(0, 1, 0); insert into S values(0, 2, 0); insert into S values(0, 3, 0); insert into S values(0, 4, 0); insert into S values(0, 5, 0); insert into itemsIds values (0); insert into itemsIds values (1); insert into itemsIds values (2); insert into itemsIds values (3); insert into itemsIds values (4); insert into itemsIds values (5); insert into itemsIds values (6); insert into itemsIds values (7); insert into itemsIds values (8); insert into itemsIds values (9); insert into itemsIds values (10); insert into itemsIds values (11); insert into itemsIds values (12); insert into itemsIds values (13); insert into itemsIds values (14); insert into itemsIds values (15); insert into hs values(1, 7, 13); insert into hs values(2, 22, 6); insert into hs values(3, 24, 11); insert into hs values(4, 14, 27); insert into hs values(5, 17, 31); insert into mPMValues values(4, 31, 16); insert into tupleCnt values(0);
/* delta comp with expire aggregate */ window aggregate hcount(k int):int { aggregate updateCnt(k int, h int, ah int, bh int, val int):int{ initialize:iterate: { update S set cnt = cnt+val where index1 = ((ah*k+bh)%(select P from mPMValues))%(select m from mPMValues) and index2 = h; } }; initialize:iterate: { select updateCnt(k, h, ah, bh, 1) from hs; } expire: { select updateCnt(k, h, ah, bh, -1) from hs; } };
aggregate getWinFreqs(item int, win int):(index int, item int, minC int) { table tupleCnt(cnt int); table tIndex(index int); aggregate getItemFreqs(item int, M int, index int):(item int, minC int, index int) { table minCTable(minC int); aggregate getMinC(item1 int, h int, ah int, bh int):int { table minTable(val int); initialize: { insert into minTable select cnt from S where index1 = ((ah*item1+bh)%(select P from mPMValues))%(select m from mPMValues) and index2 = h; } iterate: { insert into minTable select cnt from S where index1 = (((ah*item1+bh)%(select P from mPMValues))%(select m from mPMValues)) and index2 = h; } terminate: { insert into return select min(val) from minTable; } }; initialize:iterate: { delete from minCTable where 1=1; insert into minCTable select getMinC(item, h, ah, bh) from hs; insert into return select index, item, minC from minCTable; } }; initialize: { insert into tupleCnt values(1); insert into tIndex values(1); } iterate: { update tupleCnt set cnt = cnt+1; insert into return select getItemFreqs(ids, M, index) from tIndex, itemsIds,mPMValues where (select cnt from tupleCnt)%win = 0; update tIndex set index = index+1 where (select cnt from tupleCnt)%win = 0; } };
select hcount(item) over (rows 999 preceding) from items; select getWinFreqs(item, 100) from items;
"Dynamically Maintaining Frequent Items over a Data Stream" by Cheqing Jin et al