I just spent a few hours trying to get behind the algorithms underlying sampling without replacement out there and this topic is more complex than I initially thought. That's exciting! For the benefit of a future readers (have a good day!) I document my insights here including a ready to use function which respects the given inclusion probabilities further below. A nice and quick mathematical overview of the various methods can be found here: Tillé: Algorithms of sampling with equal or unequal probabilities. For example Jason's method can be found on page 46. The caveat with his method is that the weights are not proportional to the inclusion probabilities as also noted in the document. Actually, the i-th inclusion probabilities can be recursively computed as follows:
def inclusion_probability(i, weights, k):
"""
Computes the inclusion probability of the i-th element
in a randomly sampled k-tuple using Jason's algorithm
(see https://stackoverflow.com/a/2149533/7729124)
"""
if k <= 0: return 0
cum_p = 0
for j, weight in enumerate(weights):
# compute the probability of j being selected considering the weights
p = weight / sum(weights)
if i == j:
# if this is the target element, we don't have to go deeper,
# since we know that i is included
cum_p += p
else:
# if this is not the target element, than we compute the conditional
# inclusion probability of i under the constraint that j is included
cond_i = i if i < j else i-1
cond_weights = weights[:j] + weights[j+1:]
cond_p = inclusion_probability(cond_i, cond_weights, k-1)
cum_p += p * cond_p
return cum_p
And we can check the validity of the function above by comparing
In : for i in range(3): print(i, inclusion_probability(i, [1,2,3], 2))
0 0.41666666666666663
1 0.7333333333333333
2 0.85
to
In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: random_weighted_sample_no_replacement([(1,'a'),(2,'b'),(3,'c')],2))
Out: Counter({'a': 4198, 'b': 7268, 'c': 8534})
One way - also suggested in the document above - to specify the inclusion probabilities is to compute the weights from them. The whole complexity of the question at hand stems from the fact that one cannot do that directly since one basically has to invert the recursion formula, symbolically I claim this is impossible. Numerically it can be done using all kind of methods, e.g. Newton's method. However the complexity of inverting the Jacobian using plain Python becomes unbearable quickly, I really recommend looking into numpy.random.choice
in this case.
Luckily there is method using plain Python which might or might not be sufficiently performant for your purposes, it works great if there aren't that many different weights. You can find the algorithm on page 75&76. It works by splitting up the sampling process into parts with the same inclusion probabilities, i.e. we can use random.sample
again! I am not going to explain the principle here since the basics are nicely presented on page 69. Here is the code with hopefully a sufficient amount of comments:
def sample_no_replacement_exact(items, k, best_effort=False, random_=None, ε=1e-9):
"""
Returns a random sample of k elements from items, where items is a list of
tuples (weight, element). The inclusion probability of an element in the
final sample is given by
k * weight / sum(weights).
Note that the function raises if a inclusion probability cannot be
satisfied, e.g the following call is obviously illegal:
sample_no_replacement_exact([(1,'a'),(2,'b')],2)
Since selecting two elements means selecting both all the time,
'b' cannot be selected twice as often as 'a'. In general it can be hard to
spot if the weights are illegal and the function does *not* always raise
an exception in that case. To remedy the situation you can pass
best_effort=True which redistributes the inclusion probability mass
if necessary. Note that the inclusion probabilities will change
if deemed necessary.
The algorithm is based on the splitting procedure on page 75/76 in:
http://www.eustat.eus/productosServicios/52.1_Unequal_prob_sampling.pdf
Additional information can be found here:
https://stackoverflow.com/questions/2140787/
:param items: list of tuples of type weight,element
:param k: length of resulting sample
:param best_effort: fix inclusion probabilities if necessary,
(optional, defaults to False)
:param random_: random module to use (optional, defaults to the
standard random module)
:param ε: fuzziness parameter when testing for zero in the context
of floating point arithmetic (optional, defaults to 1e-9)
:return: random sample set of size k
:exception: throws ValueError in case of bad parameters,
throws AssertionError in case of algorithmic impossibilities
"""
# random_ defaults to the random submodule
if not random_:
random_ = random
# special case empty return set
if k <= 0:
return set()
if k > len(items):
raise ValueError("resulting tuple length exceeds number of elements (k > n)")
# sort items by weight
items = sorted(items, key=lambda item: item[0])
# extract the weights and elements
weights, elements = list(zip(*items))
# compute the inclusion probabilities (short: π) of the elements
scaling_factor = k / sum(weights)
π = [scaling_factor * weight for weight in weights]
# in case of best_effort: if a inclusion probability exceeds 1,
# try to rebalance the probabilities such that:
# a) no probability exceeds 1,
# b) the probabilities still sum to k, and
# c) the probability masses flow from top to bottom:
# [0.2, 0.3, 1.5] -> [0.2, 0.8, 1]
# (remember that π is sorted)
if best_effort and π[-1] > 1 + ε:
# probability mass we still we have to distribute
debt = 0.
for i in reversed(range(len(π))):
if π[i] > 1.:
# an 'offender', take away excess
debt += π[i] - 1.
π[i] = 1.
else:
# case π[i] < 1, i.e. 'save' element
# maximum we can transfer from debt to π[i] and still not
# exceed 1 is computed by the minimum of:
# a) 1 - π[i], and
# b) debt
max_transfer = min(debt, 1. - π[i])
debt -= max_transfer
π[i] += max_transfer
assert debt < ε, "best effort rebalancing failed (impossible)"
# make sure we are talking about probabilities
if any(not (0 - ε <= π_i <= 1 + ε) for π_i in π):
raise ValueError("inclusion probabilities not satisfiable: {}" \
.format(list(zip(π, elements))))
# special case equal probabilities
# (up to fuzziness parameter, remember that π is sorted)
if π[-1] < π[0] + ε:
return set(random_.sample(elements, k))
# compute the two possible lambda values, see formula 7 on page 75
# (remember that π is sorted)
λ1 = π[0] * len(π) / k
λ2 = (1 - π[-1]) * len(π) / (len(π) - k)
λ = min(λ1, λ2)
# there are two cases now, see also page 69
# CASE 1
# with probability λ we are in the equal probability case
# where all elements have the same inclusion probability
if random_.random() < λ:
return set(random_.sample(elements, k))
# CASE 2:
# with probability 1-λ we are in the case of a new sample without
# replacement problem which is strictly simpler,
# it has the following new probabilities (see page 75, π^{(2)}):
new_π = [
(π_i - λ * k / len(π))
/
(1 - λ)
for π_i in π
]
new_items = list(zip(new_π, elements))
# the first few probabilities might be 0, remove them
# NOTE: we make sure that floating point issues do not arise
# by using the fuzziness parameter
while new_items and new_items[0][0] < ε:
new_items = new_items[1:]
# the last few probabilities might be 1, remove them and mark them as selected
# NOTE: we make sure that floating point issues do not arise
# by using the fuzziness parameter
selected_elements = set()
while new_items and new_items[-1][0] > 1 - ε:
selected_elements.add(new_items[-1][1])
new_items = new_items[:-1]
# the algorithm reduces the length of the sample problem,
# it is guaranteed that:
# if λ = λ1: the first item has probability 0
# if λ = λ2: the last item has probability 1
assert len(new_items) < len(items), "problem was not simplified (impossible)"
# recursive call with the simpler sample problem
# NOTE: we have to make sure that the selected elements are included
return sample_no_replacement_exact(
new_items,
k - len(selected_elements),
best_effort=best_effort,
random_=random_,
ε=ε
) | selected_elements
Example:
In : sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c')],2)
Out: {'b', 'c'}
In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c'),(4,'d')],2))
Out: Counter({'a': 2048, 'b': 4051, 'c': 5979, 'd': 7922})
The weights sum up to 10, hence the inclusion probabilities compute to: a
→ 20%, b
→ 40%, c
→ 60%, d
→ 80%. (Sum: 200% = k.) It works!
Just one word of caution for the productive use of this function, it can be very hard to spot illegal inputs for the weights. An obvious illegal example is
In: sample_no_replacement_exact([(1,'a'),(2,'b')],2)
ValueError: inclusion probabilities not satisfiable: [(0.6666666666666666, 'a'), (1.3333333333333333, 'b')]
b
cannot appear twice as often as a
since both have to be always be selected. There are more subtle examples. To avoid an exception in production just use best_effort=True, which rebalances the inclusion probability mass such that we have always a valid distribution. Obviously this might change the inclusion probabilities.