MapReduce实现分组排序
2. 按照年龄分组降序输出所有人的成绩 结果如下:
发布日期:2021-06-29 12:24:03
浏览次数:2
分类:技术文章
本文共 12418 字,大约阅读时间需要 41 分钟。
MapReduce实现分组排序
以某次竞赛为例,分别进行如果实现:
- 取每组中男生前三名成绩和女生前三名成绩
- 按照年龄分组降序输出所有人的成绩
- 等价的SQL
0. 预备知识
0.1 基于MapReduce实现分组、排序:
分组: 相当于group by。 MapReduce的实现:相当于分区,以求处理手机上网日志为例,把手机号和非手机号分为两组。
- 在map和reduce阶段进行排序,比较的是k2。v2是不参与排序比较的。如果想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
- 分组时也是按照k2进行比较的。
0.2 数据准备:
文件score.txt,并通过hadoop fs -put命令把准备好的数据上传到HDFS上。
jangz 23 male 98John 34 male 100Tom 45 male 99Lily 32 female 40Linda 34 female 100Chaces 28 male 98Dong 29 male 30Daniel 33 male 100Marvin 24 male 100Chaos 30 female 84Mei 23 female 90Newhire 18 female 100Summer 59 male 90
1. 实现取每组中男生前三名成绩和女生前三名成绩
问题分析:
p1: 取每组中前三名成绩,所以需要进行一次分组,那该如何分组?
p2: 要前三名成绩,则需要对全部数据进行排序,这样才能提取出来前三名,那该如何排序?
p3: 男生和女生?这个又该怎么区分?
解决思路:
s1: 在MapReduce中,分组相当于分区,所以我们通过分区的形式实现分组。而分组的根据因素是哪个?当然不可能是成绩来分组,很自然,我们能想到的就是分两组:男生组和女生组。如此,我们直接自定义Partitioner#getPartition即可。——> 解决p1和p3
s2: 成绩取各组前三的话,如果数据已经排序过,我们直接在各组中取三条记录即可。如果是最简单的操作,也是最节省网络资源的。
代码实现:
自定义Bean用于存储数据:
package mapreduce.topk2.top;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class Document implements WritableComparable取每组中男生前三名成绩和女生前三名成绩:{ private String name; private Integer age; private String gender; private Integer score; public Document() { } public Document(String name, Integer age, String gender, Integer score) { this.name = name; this.age = age; this.gender = gender; this.score = score; } public void set(String name, Integer age, String gender, Integer score) { this.name = name; this.age = age; this.gender = gender; this.score = score; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); out.writeUTF(gender); out.writeInt(score); } @Override public void readFields(DataInput in) throws IOException { this.name = in.readUTF(); this.age = in.readInt(); this.gender = in.readUTF(); this.score = in.readInt(); } @Override public int compareTo(Document o) { if (this.score != o.score) { return -this.score.compareTo(o.score); } else { return this.name.compareTo(o.name); } } @Override public String toString() { return name + "\t" + age + "\t" + gender + "\t" + score; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } public Integer getScore() { return score; } public void setScore(Integer score) { this.score = score; }}
package mapreduce.topk2.top;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.log4j.Logger;public class Top3GroupByGenderExample extends Configured implements Tool { private static final Logger log = Logger.getLogger(Top3GroupByGenderExample.class); public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { log.error("Usage: Top3GroupByGenderExample"); System.exit(2); } ToolRunner.run(conf, new Top3GroupByGenderExample(), otherArgs); } @Override public int run(String[] args) throws Exception { FileSystem fs = FileSystem.get(getConf()); Path outPath = new Path(args[1]); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = Job.getInstance(getConf(), "Top3GroupByGenderExampleJob"); job.setJarByClass(Top3GroupByGenderExample.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Document.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(2); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Document.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true) ? 0 : 1; } public static class MyMapper extends Mapper { private Document document = new Document(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { log.info("MyMapper in<" + key.get() + "," + value.toString() + ">"); String line = value.toString(); String[] infos = line.split("\t"); String name = infos[0]; Integer age = Integer.parseInt(infos[1]); String gender = infos[2]; Integer score = Integer.parseInt(infos[3]); document.set(name, age, gender, score); context.write(document, NullWritable.get()); log.info("MyMapper out<" + document + ">"); } } public static class MyPartitioner extends Partitioner { @Override public int getPartition(Document key, NullWritable value, int numPartitions) { String gender = key.getGender(); return (gender.hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static class MyReducer extends Reducer { private int k = 3; private int counter = 0; @Override protected void reduce(Document key, Iterable v2s, Context context) throws IOException, InterruptedException { log.info("MyReducer in<" + key + ">"); if (counter < k) { context.write(key, NullWritable.get()); counter += 1; log.info("MyReducer out<" + key + ">"); } } }}
结果如下:
执行命令:
得出结果:
道理很简单,先按照年龄分组,然后每组成绩降序输出。
代码实现:
自定义Bean:
package mapreduce.topk2.groupsort;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class Person implements WritableComparable实现按年龄分组降序输出所有人的成绩:{ private String name; private Integer age; private String gender; private Integer score; public Person() { } public Person(String name, Integer age, String gender, Integer score) { this.name = name; this.age = age; this.gender = gender; this.score = score; } public void set(String name, Integer age, String gender, Integer score) { this.name = name; this.age = age; this.gender = gender; this.score = score; } @Override public String toString() { return name + "\t" + age + "\t" + gender + "\t" + score; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); out.writeUTF(gender); out.writeInt(score); } @Override public void readFields(DataInput in) throws IOException { this.name = in.readUTF(); this.age = in.readInt(); this.gender = in.readUTF(); this.score = in.readInt(); } /** * Sort by score desc. */ @Override public int compareTo(Person o) { return -this.score.compareTo(o.score); } public String getName() { return name; } public Integer getAge() { return age; } public String getGender() { return gender; } public Integer getScore() { return score; }}
package mapreduce.topk2.groupsort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * *Title: GroupByAgeDescScoreExample
*Description:
* @author jangz * @date 2017/9/29 14:14 */public class GroupByAgeDescScoreExample extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.out.println("Usage: GroupByAgeDescScoreExample"); System.exit(2); } ToolRunner.run(conf, new GroupByAgeDescScoreExample(), otherArgs); } @Override public int run(String[] args) throws Exception { FileSystem fs = FileSystem.get(getConf()); Path outPath = new Path(args[1]); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = Job.getInstance(getConf(), "GroupByAgeDescScoreExampleJob"); job.setJarByClass(GroupByAgeDescScoreExample.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Person.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(3); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Person.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true) ? 0 : 1; } public static class MyMapper extends Mapper { private Person person = new Person(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("MyMapper in<" + key.get() + "," + value.toString() + ">"); String line = value.toString(); String[] infos = line.split("\t"); String name = infos[0]; Integer age = Integer.parseInt(infos[1]); String gender = infos[2]; Integer score = Integer.parseInt(infos[3]); person.set(name, age, gender, score); context.write(person, NullWritable.get()); System.out.println("MyMapper out<" + person + ">"); } } public static class MyPartitioner extends Partitioner { @Override public int getPartition(Person key, NullWritable value, int numPartitions) { Integer age = key.getAge(); if (age < 20) { return 0; } else if (age <= 50) { return 1; } else { return 2; } } } public static class MyReducer extends Reducer { private Text k = new Text(); @Override protected void reduce(Person key, Iterable v2s, Context context) throws IOException, InterruptedException { System.out.println("MyReducer in<" + key + ">"); context.write(key, NullWritable.get()); System.out.println("MyReducer out<" + k + "," + key + ">"); } }}
执行命令:
得出结果:
3. 等价的SQL
SQL脚本:
CREATE TABLE score ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(50), age INT, gender VARCHAR(10), score INT);INSERT INTO score(name, age, gender, score)VALUES('jangz', 23, 'male', 98),('John', 34, 'male', 100),('Tom', 45, 'male', 99),('Lily', 32, 'female', 40),('Linda', 34, 'female', 100),('Chaces', 28, 'male', 98),('Dong', 29, 'male', 30),('Daniel', 33, 'male', 100),('Marvin', 24, 'male', 100),('Chaos', 30, 'female', 84),('Mei', 23, 'female', 90),('Newhire', 18, 'female', 100),('Summer', 59, 'male', 90);
3.1 取每组中男生前三名成绩和女生前三名成绩
采用MapReduce的‘分而治之’的思想:
(SELECT name, age, gender, scoreFROM scoreWHERE gender='female'ORDER BY gender, score DESC, nameLIMIT 3)UNION ALL(SELECT name, age, gender, scoreFROM scoreWHERE gender='male'ORDER BY gender, score DESC, nameLIMIT 3);
3.2 按照年龄分组降序输出所有人的成绩
SELECT name, age, gender, scoreFROM scoreGROUP BY ageORDER BY score DESC, name
Summary
1. 在map和reduce阶段进行排序,比较的是k2,v2不参与排序比较。(map和reduce阶段都有partition、sort、combine操作)。
2. 为了数据操作方便,我们可以自定义Bean,并让其实现WritableComparable接口,重写write、readFields和compareTo方法,当然,一定要记得重写toString方法,不然reduce最终输出的结果会达不到预期。
3. 针对非同类数据进行分组,比如按照年龄段,那么我们可以采用分区的形式实现分组。
但是针对同类数据进行分组,比如ip(长度是固定的)和出现次数组合键k2,我们可以采用GroupingComparator。
4. MapReduce是什么?MapReduce是一个运行在大规模集群上,能够可靠且容错地并行处理海量数据集的软件框架。
针对代码,大家也可以到进行下载,自己打包运行。博主设计,仅供参考!
转载地址:https://buildupchao.blog.csdn.net/article/details/78067776 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
逛到本站,mark一下
[***.202.152.39]2024年04月09日 04时24分12秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
基于JAVA的停车场管理系统
2019-04-29
基于SSM的网上购物系统的设计与开发
2019-04-29
基于SSM框架的BS微博系统的设计与实现
2019-04-29
超市订单管理系统
2019-04-29
基于ssm的民宿网站
2019-04-29
基于JavaWeb的物流管理系统的设计与实现
2019-04-29
基于Java的飞机大战游戏的设计与实现论文
2019-04-29
基于java实现的超级马里奥游戏
2019-04-29
keepalived 实现高可用,负载均衡
2019-04-29
linux发送邮件通知
2019-04-29
linux不删除文件:替换rm命令
2019-04-29
Centos6 搭建lnmp环境
2019-04-29
Hbase优化:使用压缩snappy,lz4
2019-04-29
maven 安装第三方jar包到本地仓库
2019-04-29
hbase数据结构模型
2019-04-29
Shell编程:return 返回脚本调用的状态码
2019-04-29
Hbase Shell 调用java代码:通过比较器,强过滤查询
2019-04-29
Linux防删除,恢复删除
2019-04-29
linux: shell脚本日常功夫
2019-04-29
linux脚本: 批量管理主机
2019-04-29