[Solved-3 Solutions] In pig, Computing median in map reduce ?



What is meadian ?

  • The "median" is the "middle" value in the list of numbers. To find the median, your numbers have to be listed in numerical order from smallest to largest, so we may have to rewrite your list before we can find the median.

Problem:

Can someone example the computation of median/quantiles in map reduce ?

Solution 1:

  • To find the median (middle number) in a series is going to require that 1 reducer is passed the entire range of numbers to determine which is the 'middle' value.
  • Depending on the range and uniqueness of values in your input set, we could introduce a combiner to output the frequency of each value - reducing the number of map outputs sent to your single reducer. Your reducer can then consume the sort value / frequency pairs to identify the median.
  • Another way we could scale this (again if we know the range and rough distribution of values) is to use a custom partitioner that distributes the keys by range buckets (0-99 go to reducer 0, 100-199 to reducer 2, and so on).
  • This will however require some secondary job to examine the reducer outputs and perform the final median calculation (knowing for example the number of keys in each reducer, we can calculate which reducer output will contain the median, and at which offset)

Solution 2:

  1. Have a mapper for each partition compute the desired quantiles, and output them to a new data set. This data set should be several order of magnitues smaller
  2. Within this data set, compute the quantiles again, similar to "median of medians". These could be an initial estimates.
  3. Repartition the data according to these quantiles. The goal is that in the end, the true quantile is guaranteed to be in one partition, and there should be at most one of the desired quantiles in each partition
  4. Within each of the partitions, perform a QuickSelect (in O(n)) to find the true quantile.
  • Each of the steps is in linear time. The most costly step is part 3, as it will require the whole data set to be redistributed, so it generates O(n) network traffic.
  • We can probably optimize the process by choosing "alternate" quantiles for the first iteration. Say, we want to find the global median.
  • We can't find it in a linear process easily, but we can probably narrow it down to 1/kth of the data set, when it is split into k partitions. So instead of having each node report its median, have each node additionally report the objects at (k-1)/(2k) and (k+1)/(2k).
  • This should allow we to narrow down the range of values where the true median must lie signficantly.
  • So in the next step, we can each node send those objects that are within the desired range to a single master node, and choose the median within this range only.

Solution 3:

  • In many real-world scenarios, the cardinality of values in a dataset will be relatively small. In such cases, the problem can be efficiently solved with two MapReduce jobs:
    1. Calculate frequencies of values in your dataset (Word Count job, basically)
    2. Identity mapper + a reducer which calculates median based on < value - frequency> pairs
  • Job 1. will drastically reduce the amount of data and can be executed fully in parallel. Reducer of job 2. will only have to process n (n = cardinality of your value set) items instead of all values, as with the naive approach.

Below, an example reducer of the job 2.

import sys
item_to_index_range = []
total_count = 0
# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
    item, count = line.strip().split("\t", 1)
    new_total_count = total_count + int(count)
    item_to_index_range.append((item, (total_count + 1,   new_total_count + 1)))
    total_count = new_total_count
# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
    middle_items_indexes += [total_count / 2]
# Retrieve middle item(s) 
middle_items = []
for i in middle_items_indexes:
    for item, index_range in item_to_index_range:
        if i in range(*index_range):
            middle_items.append(item)
            continue

print sum(middle_items) / float(len(middle_items))
  • Using a combiner as a mean to calculate frequencies of values. However, in MapReduce, combiners are not guaranteed to be always executed.

Related Searches to In pig,Computing median in map reduce