Stream Mill Example -- Approximate Frequent Items


Problem Statement

This is a windowed approximate frequent items algorithm suitable for delta computation. It approximately computes the frequent items over a sliding window, using bounded memory.

The algorithm maintains k hash-tables over the current window. Each hash-table has a corresponding hash-function. Each hash entry in the hash-tables is an integer, which is used as a counter. When an item enters the window, we iterate through the k hash-functions and determine the k key values. For each key value, we increment the counter at that location in the corresponding hash-table. Similarly, when an item expires out of the window, we decrement the corresponding k counters. Finally, the approximate frequency of an item is determined by taking the minimum value of the k counters.

Note, that this minimum value may over estimate the frequency of the item, if all k keys have some item mapped to it. This algorithm can also be viewed as a bloom-filter with two exceptions, 1) there are k different hash-tables instead of just one and 2) each entry is an integer(counter) as opposed to a bit. In addition to the delta maintenance property, the algorithm also provides bounded error estimates. Thus, given the available amount of memory we can estimate the expected error. Below we provide ESL code that implements this algorithm efficiently. Here hcount UDA is used to maintain the hashtables, and getWinFreqs UDA returns the frequent items regularly.

Query:

  1. Definitions and Initializations:
  2. /* definitions */
    table S(index1 int, index2 int, cnt int) memory;
    table itemsIds(ids int) memory;
    table tupleCnt(cnt int) memory;
    table hs(h int, ah int, bh int) memory;
    table mPMValues(m int, P int, M int) memory;
    
    stream items(item int) source 'port4435';
    
    /* initalization of tables, at system reboot */
    insert into S values(1, 1, 0);
    insert into S values(1, 2, 0);
    insert into S values(1, 3, 0);
    insert into S values(1, 4, 0);
    insert into S values(1, 5, 0);
    insert into S values(2, 1, 0);
    insert into S values(2, 2, 0);
    insert into S values(2, 3, 0);
    insert into S values(2, 4, 0);
    insert into S values(2, 5, 0);
    insert into S values(3, 1, 0);
    insert into S values(3, 2, 0);
    insert into S values(3, 3, 0);
    insert into S values(3, 4, 0);
    insert into S values(3, 5, 0);
    insert into S values(0, 1, 0);
    insert into S values(0, 2, 0);
    insert into S values(0, 3, 0);
    insert into S values(0, 4, 0);
    insert into S values(0, 5, 0);
    
    insert into itemsIds values (0);
    insert into itemsIds values (1);
    insert into itemsIds values (2);
    insert into itemsIds values (3);
    insert into itemsIds values (4);
    insert into itemsIds values (5);
    insert into itemsIds values (6);
    insert into itemsIds values (7);
    insert into itemsIds values (8);
    insert into itemsIds values (9);
    insert into itemsIds values (10);
    insert into itemsIds values (11);
    insert into itemsIds values (12);
    insert into itemsIds values (13);
    insert into itemsIds values (14);
    insert into itemsIds values (15);
    
    insert into hs values(1, 7, 13);
    insert into hs values(2, 22, 6);
    insert into hs values(3, 24, 11);
    insert into hs values(4, 14, 27);
    insert into hs values(5, 17, 31);
    
    insert into mPMValues values(4, 31, 16);
    insert into tupleCnt values(0);
    
  3. UDA for Maintaining Hashtable:
  4. /* delta comp with expire aggregate */
    window aggregate hcount(k int):int {
      aggregate updateCnt(k int, h int, ah int, bh int, val int):int{
        initialize:iterate: {
          update S set cnt = cnt+val
          where index1 = ((ah*k+bh)%(select P from mPMValues))%(select m from mPMValues)
                and index2 = h;
        }
      };
      initialize:iterate: {
        select updateCnt(k, h, ah, bh, 1)
        from hs;
      }
    
      expire: {
        select updateCnt(k, h, ah, bh, -1)
        from hs;
      }
    };
    
  5. UDA for Returning Frequent Items:
  6. aggregate getWinFreqs(item int, win int):(index int, item int, minC int) {
      table tupleCnt(cnt int);
      table tIndex(index int);
    
      aggregate getItemFreqs(item int, M int, index int):(item int, minC int, index int) {
       table minCTable(minC int);
    
      aggregate getMinC(item1 int, h int, ah int, bh int):int {
        table minTable(val int);
        initialize: {
          insert into minTable select cnt from S where index1 = ((ah*item1+bh)%(select P from mPMValues))%(select m from mPMValues)
                                                       and index2 = h;
        }
        iterate: {
          insert into minTable select cnt from S where index1 = (((ah*item1+bh)%(select P from mPMValues))%(select m from mPMValues)) and index2 = h;
        }
        terminate: {
          insert into return select min(val) from minTable;
        }
      };
    
      initialize:iterate: {
        delete from minCTable where 1=1;
        insert into minCTable select getMinC(item, h, ah, bh) from hs;
        insert into return select index, item, minC from minCTable;
      }
    };
    
    
      initialize: {
        insert into tupleCnt values(1);
        insert into tIndex values(1);
      }
      iterate: {
        update tupleCnt set cnt = cnt+1;
    
        insert into return
              select getItemFreqs(ids, M, index) from tIndex, itemsIds,mPMValues
              where (select cnt from tupleCnt)%win = 0;
    
        update tIndex set index = index+1 where (select cnt from tupleCnt)%win = 0;
      }
    };
    
  7. Calling the UDAs: calculates the hashtables on 1000-count sliding windows, and return frequent items every 100 tuples:
  8. select hcount(item) over (rows 999 preceding) from items;
    select getWinFreqs(item, 100) from items;
    

Reference

"Dynamically Maintaining Frequent Items over a Data Stream" by Cheqing Jin et al


Copyright © 2001-2002 UCLA Web Information System Laboratory. All Rights Reserved.
Maintained by Yijian Bai.