Giter Site home page Giter Site logo

Comments (9)

king821221 avatar king821221 commented on August 26, 2024

Also as suggested by the paper, Algorithms A-Res and A-ExpJ can be used for weighted random sampling with replacement from data streams by running k concurrent A-Res and A-ExpJ with reservior size 1 and mere the k reserviors.

from datafu.

king821221 avatar king821221 commented on August 26, 2024

The paper with more detailed theoretical analysis of WRS: http://arxiv.org/pdf/1012.0256.pdf, written by the same author

from datafu.

matthayes avatar matthayes commented on August 26, 2024

Good suggestion and thanks for the links to the papers, I'll take a look!

from datafu.

king821221 avatar king821221 commented on August 26, 2024

After some initial trials, the algorithm A-Res in the algorithm report, which is also the WRS-W algorithm in the more theoretical paper, is relatively easy to implement under the ReserviorSample framework. The updates to the ReserviorSample is that we need to permit customized score to the ScoredTuple instead of Math.random() by default. So I have defined an inner static interface in ScoredTuple as follows:
public static interface ScoreGenerator
{
double generateScore(Tuple sample) throws ExecException;
}

And for ReserviorSample, it needs a score generator which equals to Math.random() as:

public static class PureRandomScoreGenerator implements ScoreGenerator
{
public PureRandomScoreGenerator(){}

  public double generateScore(Tuple sample)
  {
      return Math.random();
  }

}

For A-Res, there is a WeightedReserviorSample class extends ReserviorSample, it has a score generator which uses the weight of each sample to generate the sample's score. The weighted score generator could be defined as follows:

static class InverseWeightScoreGenerator implements ScoredTuple.ScoreGenerator
{
    private static final double EPISLON = 1e-7;

    //index of the weight field in the input tuple
    private int weightIdx;

    InverseWeightScoreGenerator(int weightIdx) 
    {
        this.weightIdx = weightIdx;
    }

    public double generateScore(Tuple sample) throws ExecException
    {
        double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
        if(Double.compare(weight, 0.0) <= 0)
        {
            weight = EPISLON;
        }
        return Math.pow(Math.random(), 1/weight);
    }
} 

The algorithm A-ExpJ is interesting to explore and it should work for streaming data using accumulate(), and for algebraic, the Initial() class could use the exponential jumps but for Intermediate() and Final(), may not be able to use it. Need more time to investigate it.

Share another paper which has specifically talked about how to use jumps in reservoir sampling to save random number generation time: http://www.cs.umd.edu/~samir/498/vitter.pdf.

from datafu.

matthayes avatar matthayes commented on August 26, 2024

Yea you're right. It seems that A-Res (or A-ES in other paper) can be quite easily be implemented using the existing Reservoir framework as you describe. This approach looks great to me. My only comment is that for InverseWeightScoreGenerator, I would rather throw an exception if the weight is not > 0, rather than use an implicit epsilon weight. Weight > 0 should be part of the contract for this UDF.

To satisfy my curiosity, I ran a test to see what the probability is that an item is selected at any given step in the sampling algorithm. If the algorithm works correctly, then the probability an item is selected at a particular step should be P(i) = w_i/Sum(w_i).

    @Test
    public void reservoirTest() throws ExecException
    {
      TupleFactory factory = TupleFactory.getInstance();
      Tuple t1 = factory.newTuple("a");
      Tuple t2 = factory.newTuple("b");
      Tuple t3 = factory.newTuple("c");
      Tuple t4 = factory.newTuple("d");
      Tuple t5 = factory.newTuple("e");

      Double w1 = 0.1,
             w2 = 0.4,
             w3 = 0.8,
             w4 = 5.0,
             w5 = 7.0;

      System.out.println("Expected: ");
      System.out.println("a: " + w1/(w1+w2+w3+w4+w5));
      System.out.println("b: " + w2/(w1+w2+w3+w4+w5));
      System.out.println("c: " + w3/(w1+w2+w3+w4+w5));
      System.out.println("d: " + w4/(w1+w2+w3+w4+w5));
      System.out.println("e: " + w5/(w1+w2+w3+w4+w5));

      Map<String,Integer> counts = new HashMap<String,Integer>();
      counts.put("a", 0);
      counts.put("b", 0);
      counts.put("c", 0);
      counts.put("d", 0);
      counts.put("e", 0);

      int numTrials = 1000000;
      for (int i=0; i<numTrials; i++)
      {    
        ScoredTuple st1 = new ScoredTuple(getScore(w1),t1);
        ScoredTuple st2 = new ScoredTuple(getScore(w2),t2);
        ScoredTuple st3 = new ScoredTuple(getScore(w3),t3);
        ScoredTuple st4 = new ScoredTuple(getScore(w4),t4);
        ScoredTuple st5 = new ScoredTuple(getScore(w5),t5);

        Reservoir reservoir = new Reservoir(1);

        reservoir.consider(st1);
        reservoir.consider(st2);
        reservoir.consider(st3);
        reservoir.consider(st4);
        reservoir.consider(st5);

        Tuple result = reservoir.poll().getTuple();
        String key = (String)result.get(0);
        counts.put(key, counts.get(key)+1);
      }


      System.out.println("Actual:");
      List<String> keys = new ArrayList<String>(counts.keySet());
      Collections.sort(keys);
      for (String key : keys)
      {
        System.out.println(key + " " + counts.get(key)/(double)numTrials);
      }
    }

Below is the output, which matches expectation.

Expected: 
a: 0.007518796992481203
b: 0.03007518796992481
c: 0.06015037593984962
d: 0.37593984962406013
e: 0.5263157894736842
Actual:
a 0.00753
b 0.030121
c 0.0602
d 0.37574
e 0.526409

I'm not sure about A-ExpJ, I'll have to read about this some more.

I say go ahead and implement WeightedReservoirSample and send a pull request :)

By the way, functionally this should be the same as the other sampling UDF we have named WeightedSample. However WeightedReservoirSample should be more scalable, as WeightedSample must have the entire input DataBag in available in memory.

from datafu.

king821221 avatar king821221 commented on August 26, 2024

Matt,

Thanks for doing the test. Based on the way of your test, I also did a similar test on the item selection probability of the A-ES algorithm, in 2 scenarios, one is for streaming accumulate, another is for algebraic, with different reservoir size. The algebraic test is to simulate 6 mappers, each mapper processes at most ('z' - 'a' + 1)/5 records, 3 combiners, each combiner processes intermediate output from 2 mappers at most and a single reducer.

Reservoir size = 1

Item Key Relative Weight Prob in Accumulate Prob in Algebraic
a, 0.0011926058437686346, 0.0011913, 0.0011982
b, 0.0041741204531902205, 0.0041797, 0.0042096
c, 0.007155635062611807, 0.0071823, 0.0071578
d, 0.010137149672033394, 0.0101473, 0.0101771
e, 0.013118664281454981, 0.0130835, 0.013146
f, 0.016100178890876567, 0.0162021, 0.0161025
g, 0.019081693500298154, 0.0190917, 0.0190899
h, 0.02206320810971974, 0.022064, 0.0220665
i, 0.025044722719141328, 0.0249758, 0.025031
j, 0.028026237328562912, 0.0280669, 0.0280236
k, 0.0310077519379845, 0.0310296, 0.0309236
l, 0.03398926654740608, 0.0339153, 0.0340092
m, 0.03697078115682767, 0.0369121, 0.0370534
n, 0.03995229576624926, 0.0399809, 0.0399577
o, 0.042933810375670844, 0.0429687, 0.0429506
p, 0.04591532498509243, 0.0459207, 0.0458607
q, 0.04889683959451401, 0.04892, 0.0487893
r, 0.0518783542039356, 0.0519111, 0.0519265
s, 0.054859868813357186, 0.0548843, 0.0548824
t, 0.057841383422778773, 0.0577482, 0.0578938
u, 0.06082289803220036, 0.0608782, 0.0608574
v, 0.06380441264162194, 0.0638068, 0.0638006
w, 0.06678592725104353, 0.0666936, 0.0668446
x, 0.06976744186046512, 0.0697849, 0.069749
y, 0.0727489564698867, 0.0728888, 0.072769
z, 0.07573047107930829, 0.0755722, 0.07553

Reservoir size = 15

Item Key Relative Weight Prob in Accumulate Prob in Algebraic
a, 0.0011926058437686346, 0.002039913333333333, 0.0020385
b, 0.0041741204531902205, 0.006890593333333334, 0.0069048266666666665
c, 0.007155635062611807, 0.011433306666666667, 0.01143422
d, 0.010137149672033394, 0.015684846666666665, 0.01566496
e, 0.013118664281454981, 0.019638053333333332, 0.01962222
f, 0.016100178890876567, 0.02326916, 0.023283493333333332
g, 0.019081693500298154, 0.026654206666666666, 0.02664202
h, 0.02206320810971974, 0.029789493333333333, 0.029796826666666668
i, 0.025044722719141328, 0.0326792, 0.032689006666666666
j, 0.028026237328562912, 0.03535410666666667, 0.03535218666666667
k, 0.0310077519379845, 0.03780555333333333, 0.03780592666666667
l, 0.03398926654740608, 0.04007218666666667, 0.04007675333333333
m, 0.03697078115682767, 0.042164073333333336, 0.04215794
n, 0.03995229576624926, 0.04408344, 0.04407983333333333
o, 0.042933810375670844, 0.045832413333333336, 0.04584804
p, 0.04591532498509243, 0.04745052, 0.04743756
q, 0.04889683959451401, 0.04892004, 0.04890682666666667
r, 0.0518783542039356, 0.05028523333333333, 0.050301853333333334
s, 0.054859868813357186, 0.05154288666666667, 0.051549573333333334
t, 0.057841383422778773, 0.05270928, 0.05270800666666667
u, 0.06082289803220036, 0.053765313333333335, 0.053752626666666664
v, 0.06380441264162194, 0.05473123333333333, 0.05472701333333333
w, 0.06678592725104353, 0.05562057333333333, 0.055639393333333335
x, 0.06976744186046512, 0.05646065333333333, 0.05645708
y, 0.0727489564698867, 0.05721247333333333, 0.05721626666666667
z, 0.07573047107930829, 0.057911246666666666, 0.05790704666666667

From the output, we may see that for both accumulate and algebraic, the A-ES produces probability that is close to the w/sum(w), so this algorithm should work for both accumulate and algebraic.

The test code:

public class WeightedSamplingTests extends PigTests
{
double getScore(double rand, double weight)
{
return Math.pow(rand, 1/weight);
}

class Pair
{
    Tuple t;
    double w;

    public Pair(Tuple t, double w) {this.t = t; this.w = w;}
}

private Reservoir testAccumulate(List<Pair> tws, int size) throws ExecException {
    Reservoir reservoir = new Reservoir(size);
    for(int j = 0; j < tws.size(); j++)
    { 
       ScoredTuple st = new ScoredTuple(getScore(Math.random(), tws.get(j).w), tws.get(j).t);
       //System.out.println(String.format("st: (%f, %f, %s)", tws.get(j).w, st.getScore(), st.getTuple()));
       reservoir.consider(st);
    }
    //System.out.println("-----------------------");
    return reservoir;
}

private Reservoir testAlgebraic(List<Pair> tws, int size, int partitions, int combiners) throws ExecException {
    Reservoir reservoir = new Reservoir(size);
    List<Pair> subtws = new ArrayList<Pair>();
    List<Reservoir> subreservoirs = new ArrayList<Reservoir>();
    int reservoir_size = 15;
    for(int i = 0; i < tws.size(); i++)
    {
       if (subtws.size() * partitions >= tws.size()) {
          Reservoir subresevoir = new Reservoir(size);
          for(int j = 0; j < subtws.size(); j++) {
              ScoredTuple st = new ScoredTuple(getScore(Math.random(), subtws.get(j).w), subtws.get(j).t);
              //System.out.println(String.format("st: (%f, %s)", st.getScore(), st.getTuple()));
              subresevoir.consider(st);
          }
          subreservoirs.add(subresevoir);
          subtws.clear();
          //System.out.println("************************");
       }
       subtws.add(tws.get(i));
    }
    if(subtws.size() > 0) {
       Reservoir subresevoir = new Reservoir(size);
       for(int j = 0; j < subtws.size(); j++) { 
          ScoredTuple st = new ScoredTuple(getScore(Math.random(), subtws.get(j).w), subtws.get(j).t);
          //System.out.println(String.format("st: (%f, %s)", st.getScore(), st.getTuple()));
          subresevoir.consider(st);
       }
       subreservoirs.add(subresevoir);
       //System.out.println("************************");
    }
    List<Reservoir> intermediateReservoirs = new ArrayList<Reservoir>();
    Reservoir intermReservoir = new Reservoir(size);
    int combined = 0;
    for(int i = 0; i < subreservoirs.size(); i++) {
       if(combined >= combiners) {
          intermediateReservoirs.add(intermReservoir);
          intermReservoir = new Reservoir(size);
          combined = 0;
          //System.out.println("#####################");
       }
       Reservoir r = subreservoirs.get(i);
       for (ScoredTuple st : r) { 
           //System.out.println(String.format("choose interm st: (%f, %s)", st.getScore(), st.getTuple()));
           intermReservoir.consider(st);
       }
       combined++;
    }
    if(combined > 0) {
       intermediateReservoirs.add(intermReservoir);
       //System.out.println("#####################");
    }
    Reservoir finalReservoir = new Reservoir(size);
    for(Reservoir r : intermediateReservoirs) {
       for (ScoredTuple st : r) { 
           //System.out.println(String.format("choose final st: (%f, %s)", st.getScore(), st.getTuple()));
           finalReservoir.consider(st);
       }
    }
    //System.out.println("-----------------------");
    return finalReservoir;
}

@Test
public void reservoirTest() throws ExecException
{
  TupleFactory factory = TupleFactory.getInstance();
  int i = 0;
  double sum = 0.0;
  List<Pair> tws = new ArrayList<Pair>();
  Map<String,Integer> acc_counts = new HashMap<String,Integer>();
  Map<String,Integer> alg_counts = new HashMap<String,Integer>();

  for(char c = 'a'; c <= 'z'; c=(char)((int)c + 1))
  {
      Tuple t = factory.newTuple(1);
      t.set(0, String.format("%c", c)); 
      Double w = 0.1 + i * 0.25;
      sum += w;
      tws.add(new Pair(t,w));
      acc_counts.put(String.format("%c", c), 0); 
      alg_counts.put(String.format("%c", c), 0); 
      i++;
  }

  int numTrials = 10000000;

  for(i = 0; i < numTrials; i++) {
    Collections.shuffle(tws); 
    Reservoir reservoir = testAccumulate(tws,  reservoir_size); 
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       acc_counts.put(key, (acc_counts.get(key) == null ? 0: acc_counts.get(key) ) +1);
    }
    reservoir = testAlgebraic(tws,  reservoir_size, 5, 2); 
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       alg_counts.put(key, (alg_counts.get(key) == null ? 0: alg_counts.get(key) ) +1);
    }
  }
  List<String> keys = new ArrayList<String>(acc_counts.keySet());
  Collections.sort(keys);

  System.out.println("Output:");
  for(i = 0; i < tws.size(); i++)
  {
      Tuple t = tws.get(i).t;
      Double w = tws.get(i).w;
      System.out.println(t.get(0) + "\t" + w / sum + "\t" + acc_counts.get((String)t.get(0)) / (double)numTrials + "\t" + alg_counts.get((String)t.get(0)) / (double)numTrials);
  }
}

}

from datafu.

matthayes avatar matthayes commented on August 26, 2024

Thanks for the sample code you posted! I've enhanced it with a comparison to the WeightedSample UDF, which implements the algorithm that uses P(i)=w_i/Sum(w_i) for each step. This should be the baseline for comparison.

I've been thinking about the Accumulator and Algebraic versions. It seems to me that if Accumulator works, then so should Algebraic, because they each just take the top N of the scores.

Here is the modified code I used to test against baseline:

package datafu.test.pig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.testng.annotations.Test;

import datafu.pig.sampling.Reservoir;
import datafu.pig.sampling.ScoredTuple;
import datafu.pig.sampling.WeightedSample;

public class WeightedSamplingTests extends PigTests
{
  WeightedSample weightedSample = new WeightedSample();

    double getScore(double rand, double weight)
    {
        return Math.pow(rand, 1/weight);
    }

    class Pair
    {
        Tuple t;
        double w;

        public Pair(Tuple t, double w) {this.t = t; this.w = w;}
    }

    private Reservoir testWeightedSample(List<Pair> tws, int size) throws ExecException {


      List<Tuple> tuples = new ArrayList<Tuple>(tws.size());
      for (Pair p : tws)
      {
        Tuple t = TupleFactory.getInstance().newTuple(2);
        t.set(0, p.t.get(0));
        t.set(1, ((Double)p.w).intValue());
        tuples.add(t);
      }

      DataBag b = BagFactory.getInstance().newDefaultBag(tuples);     

      Tuple t = TupleFactory.getInstance().newTuple(3);
      t.set(0, b);
      t.set(1, 1);
      t.set(2, size);


      DataBag result;
      try
      {
        result = weightedSample.exec(t);
      }
      catch (IOException e)
      {
        throw new RuntimeException(e);
      }

      Reservoir reservoir = new Reservoir(size);

      for (Tuple rt : result)
      {
        // score doesn't matter in this case, we're just using the reservoir as a way to return the tuples
        String val = (String)rt.get(0);
        Tuple it = TupleFactory.getInstance().newTuple(1);
        it.set(0, val);
        ScoredTuple st = new ScoredTuple(1.0, it);
        reservoir.consider(st);
      }

      return reservoir;
    }

    private Reservoir testAccumulate(List<Pair> tws, int size) throws ExecException {
      Reservoir reservoir = new Reservoir(size);
      for(int j = 0; j < tws.size(); j++)
      { 
         ScoredTuple st = new ScoredTuple(getScore(Math.random(), tws.get(j).w), tws.get(j).t);
         reservoir.consider(st);
      }
      return reservoir;
  }

    private Reservoir testAlgebraic(List<Pair> tws, int size, int partitions, int combiners) throws ExecException {
        List<Pair> subtws = new ArrayList<Pair>();
        List<Reservoir> subreservoirs = new ArrayList<Reservoir>();
        for(int i = 0; i < tws.size(); i++)
        {
           if (subtws.size() * partitions >= tws.size()) {
              Reservoir subresevoir = new Reservoir(size);
              for(int j = 0; j < subtws.size(); j++) {
                  ScoredTuple st = new ScoredTuple(getScore(Math.random(), subtws.get(j).w), subtws.get(j).t);
                  subresevoir.consider(st);
              }
              subreservoirs.add(subresevoir);
              subtws.clear();
           }
           subtws.add(tws.get(i));
        }
        if(subtws.size() > 0) {
           Reservoir subresevoir = new Reservoir(size);
           for(int j = 0; j < subtws.size(); j++) { 
              ScoredTuple st = new ScoredTuple(getScore(Math.random(), subtws.get(j).w), subtws.get(j).t);
              subresevoir.consider(st);
           }
           subreservoirs.add(subresevoir);
        }
        List<Reservoir> intermediateReservoirs = new ArrayList<Reservoir>();
        Reservoir intermReservoir = new Reservoir(size);
        int combined = 0;
        for(int i = 0; i < subreservoirs.size(); i++) {
           if(combined >= combiners) {
              intermediateReservoirs.add(intermReservoir);
              intermReservoir = new Reservoir(size);
              combined = 0;
           }
           Reservoir r = subreservoirs.get(i);
           for (ScoredTuple st : r) { 
               intermReservoir.consider(st);
           }
           combined++;
        }
        if(combined > 0) {
           intermediateReservoirs.add(intermReservoir);
        }
        Reservoir finalReservoir = new Reservoir(size);
        for(Reservoir r : intermediateReservoirs) {
           for (ScoredTuple st : r) { 
               finalReservoir.consider(st);
           }
        }
        return finalReservoir;
    }

    @Test
    public void reservoirTest() throws ExecException
    {
      int sampleSize = 10;

      TupleFactory factory = TupleFactory.getInstance();
      int i = 0;
      double sum = 0.0;
      List<Pair> tws = new ArrayList<Pair>();
      Map<String,Integer> acc_counts = new HashMap<String,Integer>();
      Map<String,Integer> alg_counts = new HashMap<String,Integer>();
      Map<String,Integer> ws_counts = new HashMap<String,Integer>();

      for(char c = 'a'; c <= 'z'; c=(char)((int)c + 1))
      {
          Tuple t = factory.newTuple(1);
          t.set(0, String.format("%c", c)); 
          Double w = 1.0 + i * 2.0;
          sum += w;
          tws.add(new Pair(t,w));
          acc_counts.put(String.format("%c", c), 0); 
          alg_counts.put(String.format("%c", c), 0); 
          ws_counts.put(String.format("%c", c), 0); 
          i++;
      }

      int numTrials = 1000000;

      for(i = 0; i < numTrials; i++) {
        Collections.shuffle(tws); 
        Reservoir reservoir = testAccumulate(tws,  sampleSize); 
        while(reservoir.size() > 0) {
           Tuple result = reservoir.poll().getTuple();
           String key = (String)result.get(0);
           acc_counts.put(key, (acc_counts.get(key) == null ? 0: acc_counts.get(key) ) +1);
        }

        reservoir = testWeightedSample(tws,  sampleSize); 
        while(reservoir.size() > 0) {
           Tuple result = reservoir.poll().getTuple();
           String key = (String)result.get(0);
           ws_counts.put(key, (ws_counts.get(key) == null ? 0: ws_counts.get(key) ) +1);
        }

        reservoir = testAlgebraic(tws,  sampleSize, 5, 2); 
        while(reservoir.size() > 0) {
           Tuple result = reservoir.poll().getTuple();
           String key = (String)result.get(0);
           alg_counts.put(key, (alg_counts.get(key) == null ? 0: alg_counts.get(key) ) +1);
        }
      }
      List<String> keys = new ArrayList<String>(acc_counts.keySet());
      Collections.sort(keys);

      System.out.println("Output:");

      Collections.sort(tws, new Comparator<Pair>() {

        @Override
        public int compare(Pair o1, Pair o2)
        {
          try
          {
            return ((String)o1.t.get(0)).compareTo((String)o2.t.get(0));
          }
          catch (ExecException e)
          {
            throw new RuntimeException(e);
          }
        }

      });

      System.out.println("char" + "\t" + "rel_weight" + "\t" + "ws" + "\t" + "acc" + "\t" + "alg");

      for(i = 0; i < tws.size(); i++)
      {
          Tuple t = tws.get(i).t;
          Double w = tws.get(i).w;
          System.out.println(t.get(0) + "\t" + w / sum + "\t" 
              + ws_counts.get((String)t.get(0)) / (double)numTrials + "\t" 
              + acc_counts.get((String)t.get(0)) / (double)numTrials + "\t" 
              + alg_counts.get((String)t.get(0)) / (double)numTrials);
      }
    }
}

I only tried the sample size of 10. The output is below. This shows that all three implementations select items with the same probabilities. This convinces me that the idea is sound.

char    rel_weight  ws  acc alg
a   0.0014792899408284023   0.019785    0.019497    0.019745
b   0.004437869822485207    0.057928    0.058386    0.058345
c   0.0073964497041420114   0.094893    0.094921    0.095233
d   0.010355029585798817    0.131173    0.130529    0.131068
e   0.013313609467455622    0.165654    0.165503    0.165255
f   0.016272189349112426    0.198347    0.199659    0.199265
g   0.019230769230769232    0.231491    0.23091 0.230101
h   0.022189349112426034    0.263127    0.262375    0.26221
i   0.02514792899408284 0.291829    0.291917    0.291397
j   0.028106508875739646    0.320628    0.320679    0.320857
k   0.03106508875739645 0.348081    0.348236    0.349276
l   0.034023668639053255    0.374501    0.374937    0.375048
m   0.03698224852071006 0.399236    0.400102    0.40042
n   0.03994082840236687 0.426052    0.424482    0.425138
o   0.042899408284023666    0.447925    0.447625    0.448216
p   0.04585798816568047 0.470937    0.470315    0.470248
q   0.04881656804733728 0.492232    0.492417    0.492145
r   0.051775147928994084    0.512246    0.513338    0.512722
s   0.05473372781065089 0.533265    0.532445    0.533208
t   0.057692307692307696    0.551851    0.553025    0.55205
u   0.060650887573964495    0.570772    0.569647    0.569817
v   0.06360946745562131 0.58743 0.588411    0.586838
w   0.06656804733727811 0.60423 0.604707    0.603842
x   0.0695266272189349  0.620592    0.620412    0.621465
y   0.07248520710059171 0.635125    0.635678    0.63451
z   0.07544378698224852 0.65067 0.649847    0.651581

from datafu.

king821221 avatar king821221 commented on August 26, 2024

Share my experiment of A-ExpJ algorithm, including sample selection probability validation against w/sum(w) and the weighted sample UDF, and algorithm running performance against non jump algorithms.

From the experiment, the sample selection probability of A-ExpJ is close to w/sum(w) and the output of weighted sample UDF; the running performance of A-ExpJ is much better than non jump algorithms when processing large data set.

(1) Verify sample selection probability

Following gives the average squared error between the sample selection probability and the expected probablity (w/sum(w)) in each algorithm

err_ws : 1.5984219682027621E-7

err_acc: 1.5097790244528676E-7

err_alg: 1.544331066166316E-7

err_skip_acc: 1.4900214947861513E-7

err_skip_alg: 1.5166721806389135E-7

err_ws (weighted sample)
err_acc (weighted reservoir sample accumulate)
err_alg (weighted reservoir sample algebraic)
err_skip_acc (weighted reservoir sample with exp-J accumulate)
err_skip_alg (weighted reservoir sample with exp-J algebraic)

(2) Verify performance

Following gives the total processing time, in milliseconds of different algorithms, in sampling 100 from 260K species in each iteration and run this sampling for 100 iterations

accumulateDuration algebraicDuration accumulateExpJDuration algebraicExpJDuration
7507 7428 727 1082

Experiment code based on Matt's refactored test code.

package datafu.test.pig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.testng.annotations.Test;

import datafu.pig.sampling.Reservoir;
import datafu.pig.sampling.ScoredTuple;
import datafu.pig.sampling.WeightedSample;

public class WeightedSamplingTests extends PigTests
{
WeightedSample weightedSample = new WeightedSample();

double getScore(double rand, double weight)
{
    return Math.pow(rand, 1/weight);
}

class Pair
{
    Tuple t;
    double w;

    public Pair(Tuple t, double w) {this.t = t; this.w = w;}
}

private Reservoir testWeightedSample(List<Pair> tws, int size) throws ExecException {


  List<Tuple> tuples = new ArrayList<Tuple>(tws.size());
  for (Pair p : tws)
  {
    Tuple t = TupleFactory.getInstance().newTuple(2);
    t.set(0, p.t.get(0));
    t.set(1, ((Double)p.w).intValue());
    tuples.add(t);
  }

  DataBag b = BagFactory.getInstance().newDefaultBag(tuples);     

  Tuple t = TupleFactory.getInstance().newTuple(3);
  t.set(0, b);
  t.set(1, 1);
  t.set(2, size);


  DataBag result;
  try
  {
    result = weightedSample.exec(t);
  }
  catch (IOException e)
  {
    throw new RuntimeException(e);
  }

  Reservoir reservoir = new Reservoir(size);

  for (Tuple rt : result)
  {
    // score doesn't matter in this case, we're just using the reservoir as a way to return the tuples
    String val = (String)rt.get(0);
    Tuple it = TupleFactory.getInstance().newTuple(1);
    it.set(0, val);
    ScoredTuple st = new ScoredTuple(1.0, it);
    reservoir.consider(st);
  }

  return reservoir;
}

private Reservoir testAccumulate(List<Pair> tws, int size) throws ExecException {
  Reservoir reservoir = new Reservoir(size);
  for(int j = 0; j < tws.size(); j++)
  { 
     ScoredTuple st = new ScoredTuple(getScore(Math.random(), tws.get(j).w), tws.get(j).t);
     reservoir.consider(st);
  }
  return reservoir;

}

private Reservoir testAlgebraic(List<Pair> tws, int size, int partitions, int combiners) throws ExecException {
    List<Pair> subtws = new ArrayList<Pair>();
    List<Reservoir> subreservoirs = new ArrayList<Reservoir>();
    for(int i = 0; i < tws.size(); i++)
    {
       if (subtws.size() * partitions >= tws.size()) {
          Reservoir subresevoir = new Reservoir(size);
          for(int j = 0; j < subtws.size(); j++) {
              ScoredTuple st = new ScoredTuple(getScore(Math.random(), subtws.get(j).w), subtws.get(j).t);
              subresevoir.consider(st);
          }
          subreservoirs.add(subresevoir);
          subtws.clear();
       }
       subtws.add(tws.get(i));
    }
    if(subtws.size() > 0) {
       Reservoir subresevoir = new Reservoir(size);
       for(int j = 0; j < subtws.size(); j++) { 
          ScoredTuple st = new ScoredTuple(getScore(Math.random(), subtws.get(j).w), subtws.get(j).t);
          subresevoir.consider(st);
       }
       subreservoirs.add(subresevoir);
    }
    List<Reservoir> intermediateReservoirs = new ArrayList<Reservoir>();
    Reservoir intermReservoir = new Reservoir(size);
    int combined = 0;
    for(int i = 0; i < subreservoirs.size(); i++) {
       if(combined >= combiners) {
          intermediateReservoirs.add(intermReservoir);
          intermReservoir = new Reservoir(size);
          combined = 0;
       }
       Reservoir r = subreservoirs.get(i);
       for (ScoredTuple st : r) { 
           intermReservoir.consider(st);
       }
       combined++;
    }
    if(combined > 0) {
       intermediateReservoirs.add(intermReservoir);
    }
    Reservoir finalReservoir = new Reservoir(size);
    for(Reservoir r : intermediateReservoirs) {
       for (ScoredTuple st : r) { 
           finalReservoir.consider(st);
       }
    }
    return finalReservoir;
}

private Reservoir testAccumulateExpJ(List<Pair> tws, int size) throws ExecException {
    return accumulateExpJ(tws, size);
}

private Reservoir accumulateExpJ(List<Pair> tws, int size) throws ExecException {
    Reservoir reservoir = new Reservoir(size);
    double particalWeightSum = 0;
    int j = 0;
    for(; j < size && j < tws.size(); j++) {
      ScoredTuple st = new ScoredTuple(getScore(Math.random(), tws.get(j).w), tws.get(j).t);
      reservoir.consider(st); 
    }
    double r = Math.random();
    ScoredTuple head = reservoir.peek();
    double TW = head.getScore();
    double xw = Math.log(r + 0.00001) / Math.log(TW + 0.00001);
    for(; j < tws.size(); j++)
    {
      if(Double.compare(particalWeightSum, xw) < 0 && Double.compare(particalWeightSum + tws.get(j).w, xw) >= 0)
      {
         reservoir.poll();
         double tw = Math.pow(TW, tws.get(j).w);
         /*
         * A trick seen in the paper's demo code
         * re-use the random number generated at the beginning of each jump
         * and normalize it before using it to generate a new random number
         * this achieves with better running performance and selection probability close to w/sum(w)
         * performance:
         * accumulateDuration algebraicDuration accumulateExpJDuration algebraicExpJDuration
         * 7432    7496    717     1342
         * squared error: 
         * err_ws  err_acc err_alg err_skip_acc    err_skip_alg
         * 1.5416302647468705E-7   1.543968619383665E-7    1.5367478372261782E-7   1.936841827740179E-7 1.589753533080795E-7
         *  the normalization goes the following deduction:
         *     particalWeightSum < log(r) / log(TW) <= particalWeightSum + tws.get(j).w)
         *     => log(TW ^ particalWeightSum) > log(r) >= log(TW ^ ( particalWeightSum + tws.get(j).w))
         *     => TW ^ particalWeightSum > r >= TW ^ ( particalWeightSum + tws.get(j).w)
         double low = Math.pow(TW, particalWeightSum + tws.get(j).w);
         double high = Math.pow(TW, particalWeightSum);
         double r2 = tw + (1.0 - tw) * (r - low) / (high - low);
         */
         double r2 = tw + (1.0 - tw) * Math.random();
         ScoredTuple st = new ScoredTuple(getScore(r2, tws.get(j).w), tws.get(j).t); 
         reservoir.consider(st);
         head = reservoir.peek();
         TW = head.getScore();
         xw = Math.log(r + 0.00001) / Math.log(TW + 0.00001);
         r = Math.random();
         particalWeightSum = 0;
      } else {
         particalWeightSum += tws.get(j).w;
      }
    } 

    return reservoir;
}

private Reservoir testAlgebraicExpJ(List<Pair> tws, int size, int partitions, int combiners) throws ExecException {
    Reservoir reservoir = new Reservoir(size);
    List<Pair> subtws = new ArrayList<Pair>();
    List<Reservoir> subreservoirs = new ArrayList<Reservoir>();
    for(int i = 0; i < tws.size(); i++)
    {
       if (subtws.size() * partitions >= tws.size()) {
          Reservoir subresevoir = accumulateExpJ(subtws, size); 
          subreservoirs.add(subresevoir);
          subtws.clear();
       }
       subtws.add(tws.get(i));
    }
    if(subtws.size() > 0) {
       Reservoir subresevoir = accumulateExpJ(subtws, size); 
       subreservoirs.add(subresevoir);
    }
    List<Reservoir> intermediateReservoirs = new ArrayList<Reservoir>();
    Reservoir intermReservoir = new Reservoir(size);
    int combined = 0;
    for(int i = 0; i < subreservoirs.size(); i++) {
       if(combined >= combiners) {
          intermediateReservoirs.add(intermReservoir);
          intermReservoir = new Reservoir(size);
          combined = 0;
       }
       Reservoir r = subreservoirs.get(i);
       for (ScoredTuple st : r) { 
           intermReservoir.consider(st);
       }
       combined++;
    }
    if(combined > 0) {
       intermediateReservoirs.add(intermReservoir);
    }
    Reservoir finalReservoir = new Reservoir(size);
    for(Reservoir r : intermediateReservoirs) {
       for (ScoredTuple st : r) { 
           finalReservoir.consider(st);
       }
    }
    return finalReservoir;
}


@Test
public void reservoirTest() throws ExecException
{
  int sampleSize = 10;

  TupleFactory factory = TupleFactory.getInstance();
  int i = 0;
  double sum = 0.0;
  List<Pair> tws = new ArrayList<Pair>();
  Map<String,Integer> acc_counts = new HashMap<String,Integer>();
  Map<String,Integer> alg_counts = new HashMap<String,Integer>();
  Map<String,Integer> ws_counts = new HashMap<String,Integer>();
  Map<String,Integer> skip_acc_counts = new HashMap<String,Integer>();
  Map<String,Integer> skip_alg_counts = new HashMap<String,Integer>();

  for(int id = 0; id < 3; id++)
  {
     for(char c = 'a'; c <= 'z'; c=(char)((int)c + 1))
     {
         Tuple t = factory.newTuple(1);
         t.set(0, String.format("%c%d", c, id)); 
         Double w = 1.0 + i * 2.0;
         sum += w;
         tws.add(new Pair(t,w));
         acc_counts.put(String.format("%c", c), 0); 
         alg_counts.put(String.format("%c", c), 0); 
         ws_counts.put(String.format("%c", c), 0); 
         skip_acc_counts.put(String.format("%c", c), 0); 
         skip_alg_counts.put(String.format("%c", c), 0); 
         i++;
      }
  }

  int numTrials = 1000000;

  for(i = 0; i < numTrials; i++) {
    Collections.shuffle(tws); 
    Reservoir reservoir = testAccumulate(tws,  sampleSize); 
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       acc_counts.put(key, (acc_counts.get(key) == null ? 0: acc_counts.get(key) ) +1);
    }

    reservoir = testWeightedSample(tws,  sampleSize); 
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       ws_counts.put(key, (ws_counts.get(key) == null ? 0: ws_counts.get(key) ) +1);
    }

    reservoir = testAlgebraic(tws,  sampleSize, 3, 3); 
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       alg_counts.put(key, (alg_counts.get(key) == null ? 0: alg_counts.get(key) ) +1);
    }

    reservoir = testAccumulateExpJ(tws, sampleSize);
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       skip_acc_counts.put(key, (skip_acc_counts.get(key) == null ? 0: skip_acc_counts.get(key) ) +1);
    }

    reservoir = testAlgebraicExpJ(tws, sampleSize, 3, 3);
    while(reservoir.size() > 0) {
       Tuple result = reservoir.poll().getTuple();
       String key = (String)result.get(0);
       skip_alg_counts.put(key, (skip_alg_counts.get(key) == null ? 0: skip_alg_counts.get(key) ) +1);
    }

  }
  List<String> keys = new ArrayList<String>(acc_counts.keySet());
  Collections.sort(keys);

  System.out.println("Output:");

  Collections.sort(tws, new Comparator<Pair>() {

    @Override
    public int compare(Pair o1, Pair o2)
    {
      try
      {
        return ((String)o1.t.get(0)).compareTo((String)o2.t.get(0));
      }
      catch (ExecException e)
      {
        throw new RuntimeException(e);
      }
    }

  });

  int sum_ws, sum_acc, sum_alg, sum_skip_acc, sum_skip_alg;

  sum_ws = sum_acc = sum_alg = sum_skip_acc = sum_skip_alg = 0;

  for(i = 0; i < tws.size(); i++) {
      Tuple t = tws.get(i).t;
      sum_ws += ws_counts.get((String)t.get(0));
      sum_acc += acc_counts.get((String)t.get(0));
      sum_alg += alg_counts.get((String)t.get(0));
      sum_skip_acc += skip_acc_counts.get((String)t.get(0));
      sum_skip_alg += skip_alg_counts.get((String)t.get(0));
  }

  System.out.println("char" + "\t" + "rel_weight" + "\t" + "ws" + "\t" + "acc" + "\t" + "alg" + "\t" + "acc_expj" + "\t" + "alg_expj");

  double err_ws, err_acc, err_alg, err_skip_acc, err_skip_alg;

  err_ws = err_acc = err_alg = err_skip_acc = err_skip_alg = 0;

  for(i = 0; i < tws.size(); i++)
  {
      Tuple t = tws.get(i).t;
      Double w = tws.get(i).w;
      double prob_w = w / sum;
      double prob_ws = ws_counts.get((String)t.get(0)) / (double)sum_ws;
      double prob_acc = acc_counts.get((String)t.get(0)) / (double)sum_acc;
      double prob_alg = alg_counts.get((String)t.get(0)) / (double)sum_alg;
      double prob_skip_acc = skip_acc_counts.get((String)t.get(0)) / (double)sum_skip_acc;
      double prob_skip_alg = skip_alg_counts.get((String)t.get(0)) / (double)sum_skip_alg; 
      System.out.println(t.get(0) + "\t" + prob_w + "\t" 
          + prob_ws + "\t" 
          + prob_acc + "\t" 
          + prob_alg + "\t"
          + prob_skip_acc + "\t"
          + prob_skip_alg);
      err_ws += Math.pow(prob_w - prob_ws, 2);
      err_acc += Math.pow(prob_w - prob_acc, 2);
      err_alg += Math.pow(prob_w - prob_alg, 2);
      err_skip_acc += Math.pow(prob_w - prob_skip_acc, 2);
      err_skip_alg += Math.pow(prob_w - prob_skip_alg, 2);
  }

  err_ws /= tws.size();
  err_acc /= tws.size();
  err_alg /= tws.size();
  err_skip_acc /= tws.size();
  err_skip_alg /= tws.size();

  System.out.println("err_ws" + "\t" + "err_acc" + "\t" + "err_alg" + "\t" + "err_skip_acc" + "\t" + "err_skip_alg");

  System.out.println(err_ws + "\t" 
      + err_acc + "\t"
      + err_alg + "\t"
      + err_skip_acc + "\t"
      + err_skip_alg); 
}

@Test
public void reservoirExpJPerfTest() throws ExecException
{
  int sampleSize = 100;

  TupleFactory factory = TupleFactory.getInstance();
  int i = 0;
  double sum = 0.0;
  List<Pair> tws = new ArrayList<Pair>();

  for(int id = 0; id < 10000; id++)
  {
     for(char c = 'a'; c <= 'z'; c=(char)((int)c + 1))
     {
         Tuple t = factory.newTuple(1);
         t.set(0, String.format("%c%d", c, id)); 
         Double w = 0.001 + i * 0.0002;
         tws.add(new Pair(t,w));
         i++;
      }
  }

  List<List<Pair>> test_tws = new ArrayList<List<Pair>>();

  int numTrials = 100;

  for(i = 0; i < numTrials; i++) {
    Collections.shuffle(tws); 
    List<Pair> shuffled_tws = new ArrayList<Pair>();
    shuffled_tws.addAll(tws);
    test_tws.add(shuffled_tws);
  }

  long testAccumulateDuration, testAlgebraicDuration, testAccumulateExpJDuration, testAlgebraicExpJDuration;

  testAccumulateDuration = testAlgebraicDuration = testAccumulateExpJDuration = testAlgebraicExpJDuration = 0;

  long startTime = System.currentTimeMillis();

  Reservoir reservoir = null;

  for(i = 0; i < numTrials; i++) {
    reservoir = testAccumulate(test_tws.get(i),  sampleSize); 
    reservoir = null; 
  }

  testAccumulateDuration = System.currentTimeMillis() - startTime; 

  startTime = System.currentTimeMillis();

  for(i = 0; i < numTrials; i++) {
    reservoir = testAlgebraic(test_tws.get(i),  sampleSize, 10, 5); 
    reservoir = null; 
  }

  testAlgebraicDuration = System.currentTimeMillis() - startTime; 

  startTime = System.currentTimeMillis();

  for(i = 0; i < numTrials; i++) {
    reservoir = testAccumulateExpJ(test_tws.get(i), sampleSize);
    reservoir = null; 
  }

  testAccumulateExpJDuration = System.currentTimeMillis() - startTime; 

  startTime = System.currentTimeMillis();

  for(i = 0; i < numTrials; i++) {
    reservoir = testAlgebraicExpJ(test_tws.get(i), sampleSize, 10, 5);
    reservoir = null; 
  }

  testAlgebraicExpJDuration = System.currentTimeMillis() - startTime; 

  System.out.println("Output:");

  System.out.println("accumulateDuration" + "\t" + "algebraicDuration" + "\t" + "accumulateExpJDuration" + "\t" + "algebraicExpJDuration");

  System.out.println(testAccumulateDuration + "\t" + testAlgebraicDuration + "\t" + testAccumulateExpJDuration + "\t" + testAlgebraicExpJDuration);

}
}

from datafu.

matthayes avatar matthayes commented on August 26, 2024

Closing as now tracked by DATAFU-16.

from datafu.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.