Mapreduce统计hdfs文档单词分组总数并落地到关系型数据库
发布日期:2021-11-15 02:11:44
浏览次数:1
分类:技术文章
本文共 7251 字,大约阅读时间需要 24 分钟。
package com.zicontd.mifss.datawork.dboutput;
import java.io.DataInput;
import java.io.DataOutput;/**
-
Title: DiseaseStatistics
-
Description:
-
Company: http://www.zicontd.com/
- @author WHQ
- @date 2019年5月1日 / /*********************** Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。 通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记), 而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将作为map方法的结果输出, 其余的工作都交有MapReduce框架处理。
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。
Map过程输出中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入, 所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。
此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。 还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。 任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。*********************/
import java.io.IOException;
import java.net.URI; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.log4j.Logger;import com.zicontd.mifss.datawork.dbinput.KC86DATA_HDFS;
import com.zicontd.mifss.datawork.dboutput.KC86DATA_ORACLE.KC86Entity; import ;public class DiseaseStatistics {
/*private static final String input_path = "hdfs://IP:9000/home/kc86_intput";private static final String driver = "com.mysql.jdbc.Driver";private static final String url = "jdbc:mysql://localhost:3306/zicontd?useUnicode=true&characterEncoding=UTF-8";private static final String username = "root";private static final String password = "root";*/private static final String input_path = "hdfs://IP:9000/home/kc86_intput";private static final String driver = "oracle.jdbc.driver.OracleDriver";private static final String url = "jdbc:oracle:thin:@IP:1521:orcl";private static final String username = "USERNAME";private static final String password = "PASSWORD";private final static Logger log = Logger.getLogger(DiseaseStatistics.class);public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //当前运行环境中配置的HADOOP_USER_NAME属性值 String hadoop_user_name = System.getenv("HADOOP_USER_NAME"); System.setProperty("HADOOP_USER_NAME", hadoop_user_name); Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, driver, url,username,password); Job job = Job.getInstance(conf); job.setJarByClass(DiseaseStatistics.class); job.setMapperClass(TokenizerMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 输出格式 job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Bt.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); FileSystem fs = FileSystem.get(URI.create(input_path), conf,"root"); Path p = new Path(input_path); if(!fs.exists(p)){ //fs.delete(p,true); //System.out.println("输出路径存在,已删除!"); log.info("输出路径不存在"); } //读path路径下的文件 FileInputFormat.setInputPaths(job, p); //此处给出的字段名称约束 一定要和Student类中描述读进或者写出的字段名称个数一致 String[] fieldNames = new String[] {"NAME","NUM"}; DBOutputFormat.setOutput(job, "\"KC86_DiseaseStatistics\"", fieldNames); System.exit(job.waitForCompletion(true) ? 0 : 1); }//通过继承类Mapper来实现map处理逻辑public static class TokenizerMapper extends Mapper{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String newStr = value.toString().trim(); String newStr1 = newStr.substring(newStr.indexOf("[")+1,newStr.indexOf("]")); String[] sp = newStr1.split(","); StringBuffer AKC185sb = new StringBuffer(); for(String s1:sp) { String tempS = s1; String[] s = tempS.split("="); //获取"AKC185"的值 if(s[0].trim().equals("AKC185")) { AKC185sb.append(s[1].trim()); } } /* StringTokenizer itr = new StringTokenizer(AKC185sb.toString()); while(itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(word, one); }*/ word.set(AKC185sb.toString().trim()); context.write(word, one); }} //继承Reducer类,重写其方法 public static class IntSumReducer extends Reducer { LongWritable result = new LongWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //定义一个计数器 int sum = 0; IntWritable val; //通过value这个迭代器,遍历这一组kv中所有的value,进行累加 for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) { val = (IntWritable)i$.next(); } this.result.set(sum); Bt bt = new Bt(key.toString(), result.get()); context.write(bt, null); } } public static class Bt implements Writable, DBWritable { private String bname; private Long num; /** * @param bname * @param num */ public Bt(String bname, Long num) { super(); this.bname = bname; this.num = num; } public String getBname() { return bname; } public void setBname(String bname) { this.bname = bname; } public Long getNum() { return num; } public void setNum(Long num) { this.num = num; } @Override public void write(PreparedStatement statement) throws SQLException { // TODO Auto-generated method stub statement.setString(1, bname); statement.setLong(2, num); } @Override public void readFields(ResultSet resultSet) throws SQLException { // TODO Auto-generated method stub bname = resultSet.getString(1); num = resultSet.getLong(2); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(this.bname); out.writeLong(this.num); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.bname =in.readUTF(); this.num =in.readLong(); } }
}
转载地址:https://blog.csdn.net/qq_28358461/article/details/89886472 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2024年03月23日 11时27分42秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
java开发招聘试题_客户化开发招聘试题-Java开发.doc
2019-04-21
java jdk win10 1335_win10下安装java jdk,tomcat
2019-04-21
php中的变量名称用什么表示,PHP变量,方法,类等名称中的有效字符是什么?
2019-04-21
solr比mysql的优势_Solr与Elasticsearch的优缺点比较总结和归纳
2019-04-21
华为博士招聘上机考试题目_牛客网-华为-2020届校园招聘上机考试-3
2019-04-21
python中for可以做变量名吗_Python中使用动态变量名的方法
2019-04-21
mysql 日期转换天数_MySQL 日期操作 增减天数、时间转换、时间戳
2019-04-21
java对象去重复_JAVA中List对象去除重复值的方法
2019-04-21
java bss_[转] .bss段和.data段的区别
2019-04-21
java上传图片损坏_大神求助 上传图片后 图片损坏
2019-04-21
java socket唯一标识符_Java Socket编程之常识网络基础知识
2019-04-21
java给xyz大小排序_java递归实现string xyz排序
2019-04-21
arctime必须要java_Arctime使用教程 Arctime常见问题解答
2019-04-21
mysql 自适应字段宽度_box-sizing解决自适应布局容器宽度问题
2019-04-21
java 配置文件配置路径_Java读取配置文件路径设置
2019-04-21