blob: ea2afd164e208f22a343d4cee58b29be6d6c0786 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
WORD_RE = re.compile(r"[\w']+")
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield word, 1
def steps(self):
return [
MRStep(mapper=self.mapper,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
def reducer_count_words(self, key, values):
yield None, (sum(values), key)
def reducer_find_max_word(self, _, kvp):
yield max(kvp)
if __name__ == '__main__':
MRWordFrequencyCount.run()
|