Reservoir sampling is a family of randomized algorithms for randomly choosing a sample of k items from a list S of n items, where n is either very large or unknown until the list is traversed. In most of the applications, n is so large that the items in the list can only be streamed to the algorithm rather than being fit into memory all at once. When the sampling is done, each item in the list should have equal probability of k/n being chosen. Reservoir sampling is a very important technical to sample from large datasets or the so-called big data sources.
In this post, I will first review the basic single-worker sequential algorithm and show how to extend it to a distributed/parallel environment.
A simple sequential algorithm runs in O(n) time and uses O(k) space. The basic idea is to do a single pass of the list S while maintaining and updating a reservoir list R. The algorithm stores the first k items of L into R. Then it iterates through the rest of L. For the i-th item of L where i > k, it reserves this item with probability k/i. If it decides to reserve a new item, it randomly expurges an old item from the reservoir list R and puts the newly reserved one into its place. It can be shown by total probability and induction that, when the whole list L is traversed, the reservoir list R will contain k random samples of L. The algorithm can be expressed as the Java below (I’m a bit lazy here to put n in the second for-loop. In reality, items are usually streamed in and thus n is not known. However, the algorithm still works as the part in the second for-loop does not depend on n):
for(int i = 0; i < k; ++i)
R[i] = L[i];
for(int i = k; i < n; ++i) {
j = rand.nextInt(i+1);
if(j < k)
R[j] = L[i];
}
return R;
Now let’s consider a distributed environment such as Hadoop where the input stream consists of several sub-streams and each sub-stream is feed to a single work. How can we extend the simple algorithm to efficiently sample all sub-streams in parallel and still generate k uniform samples from the entire input stream in the end?
Without loss of generality, let us assume there are two sub-streams of size m and n, respectively. Both m and n are far greater than k. In the first step of the algorithm, workers work on their own sub-streams in parallel, using the basic algorithm. When both workers finish their sub-stream traversal, two reservoir lists R and S are generated. In addition, both workers also count the number of items in their own sub-streams during the traversal, and thus m and n are known when R and S are available.
The critical step is to combine the two reservoir lists to get k items out of them. To do this, we assign weights to items according to the sizes of the sub-stream where they were sampled in the first step, and then do a second sampling phase. We run k iterations for this secondary sampling. In each iteration, we flip a random coin such that, with probability p = m/(m+n), we pick one random sample from reservoir list R, and with probability 1-p, we pick one random sample from reservoir list S. At the end of the k-th iteration, we will get the final reservoir list for the entire stream. This algorithm is described as follows:
for(sub-stream s: sub-streams) do in parallel {
simple sequential reservoir sampling and count length of s;
}
double p = (double) m / (m+n);
for(int i = 0; i < k; ++i){
j = rand.nextDouble();
if(j <= p)
move a random item from R to T;
else
move a random item from S to T;
}
return T;
It can be shown by induction that in each iteration of the second sampling phase, any item in the entire stream has probability of 1/(m+n) being chosen. Again, by total probability, any item has probability of k/(m+n) being chosen during the execution of the algorithm, and thus the algorithm generates k random samples from the entire stream. The nice part of this algorithm is that it runs in O(max(m,n)) time and uses O(k) in space.
To generalize the algorithm to cases with more than two sub-streams, one only needs to combine reservoirs lists in pairs, which can also be done in parallel. The proof techniques remain the same and thus is omitted.
I have implemented this algorithm under an extended version of the Java 8 Stream programming model, which I will probably write a post or two to discuss later. I have already been running the algorithm under different distributed computing platforms, including Hadoop, as a standalone application or a subroutine to other applications. I can tell that this algorithm fits pretty well in these platforms and is very efficient in generating uniform random samples from large datasets.

Pingback: DistributableStream — A Java Stream Computational Model for Big Data Processing (part III) | Balls & Bins, A Mix of Random Thoughts