from mrjob.job import MRJob from mrjob.step import MRStep import re WORD_RE = re.compile(r"[\w']+") class MRWordFrequencyCount(MRJob): def mapper(self, _, line): for word in WORD_RE.findall(line): yield word, 1 def steps(self): return [ MRStep(mapper=self.mapper, reducer=self.reducer_count_words), MRStep(reducer=self.reducer_find_max_word) ] def reducer_count_words(self, key, values): yield None, (sum(values), key) def reducer_find_max_word(self, _, kvp): yield max(kvp) if __name__ == '__main__': MRWordFrequencyCount.run()