summaryrefslogtreecommitdiff
path: root/BDA/tp2.py
blob: 6b5b00e6841d79352356ed5070838429fc2df677 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word, 1
    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def reducer_count_words(self, key, values):
        yield None, (sum(values), key)
    def reducer_find_max_word(self, _, kvp):
        yield max(kvp)

class MRLongestWord(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word, len(word)

    def combiner(self, word, len_word):
        yield None, (len_word[0], word)

    def reducer(self, _, len_word):
        yield max(len_word)

if __name__ == '__main__':
#MRWordFrequencyCount.run()
    MRLongestWord.run()