Mapreduce统计hdfs文档单词分组总数并落地到关系型数据库
发布日期:2021-11-15 02:11:44 浏览次数:1 分类:技术文章

本文共 7251 字,大约阅读时间需要 24 分钟。

package com.zicontd.mifss.datawork.dboutput;

import java.io.DataInput;

import java.io.DataOutput;

/**

  • Title: DiseaseStatistics

  • Description:

  • Company: http://www.zicontd.com/

  • @author WHQ
  • @date 2019年5月1日
    /
    /
    ***********************
    Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。
    通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),
    而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将作为map方法的结果输出,
    其余的工作都交有MapReduce框架处理。

Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。

Map过程输出中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,
所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。

此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。
还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。
任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。

*********************/

import java.io.IOException;

import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.log4j.Logger;

import com.zicontd.mifss.datawork.dbinput.KC86DATA_HDFS;

import com.zicontd.mifss.datawork.dboutput.KC86DATA_ORACLE.KC86Entity;
import ;

public class DiseaseStatistics {

/*private static final String input_path = "hdfs://IP:9000/home/kc86_intput";private static final String driver = "com.mysql.jdbc.Driver";private static final String url = "jdbc:mysql://localhost:3306/zicontd?useUnicode=true&characterEncoding=UTF-8";private static final String username = "root";private static final String password = "root";*/private static final String input_path = "hdfs://IP:9000/home/kc86_intput";private static final String driver = "oracle.jdbc.driver.OracleDriver";private static final String url = "jdbc:oracle:thin:@IP:1521:orcl";private static final String username = "USERNAME";private static final String password = "PASSWORD";private final static Logger log = Logger.getLogger(DiseaseStatistics.class);public static void  main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {	//当前运行环境中配置的HADOOP_USER_NAME属性值	String hadoop_user_name = System.getenv("HADOOP_USER_NAME");	System.setProperty("HADOOP_USER_NAME", hadoop_user_name);		Configuration conf = new Configuration();	DBConfiguration.configureDB(conf, driver, url,username,password);		Job job = Job.getInstance(conf);		job.setJarByClass(DiseaseStatistics.class);	job.setMapperClass(TokenizerMapper.class);    job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(IntWritable.class);            // 输出格式	job.setReducerClass(IntSumReducer.class);    job.setOutputKeyClass(Bt.class);    job.setOutputValueClass(NullWritable.class);        job.setInputFormatClass(TextInputFormat.class);    job.setOutputFormatClass(DBOutputFormat.class);  		    FileSystem fs = FileSystem.get(URI.create(input_path), conf,"root");	Path p = new Path(input_path);	if(!fs.exists(p)){		//fs.delete(p,true);		//System.out.println("输出路径存在,已删除!");		log.info("输出路径不存在");	}		//读path路径下的文件	FileInputFormat.setInputPaths(job, p);	//此处给出的字段名称约束 一定要和Student类中描述读进或者写出的字段名称个数一致	String[] fieldNames = new String[] {"NAME","NUM"};	DBOutputFormat.setOutput(job, "\"KC86_DiseaseStatistics\"", fieldNames);	System.exit(job.waitForCompletion(true) ? 0 : 1);		}//通过继承类Mapper来实现map处理逻辑public static class TokenizerMapper extends Mapper
{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String newStr = value.toString().trim(); String newStr1 = newStr.substring(newStr.indexOf("[")+1,newStr.indexOf("]")); String[] sp = newStr1.split(","); StringBuffer AKC185sb = new StringBuffer(); for(String s1:sp) { String tempS = s1; String[] s = tempS.split("="); //获取"AKC185"的值 if(s[0].trim().equals("AKC185")) { AKC185sb.append(s[1].trim()); } } /* StringTokenizer itr = new StringTokenizer(AKC185sb.toString()); while(itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(word, one); }*/ word.set(AKC185sb.toString().trim()); context.write(word, one); }} //继承Reducer类,重写其方法 public static class IntSumReducer extends Reducer
{ LongWritable result = new LongWritable(); @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //定义一个计数器 int sum = 0; IntWritable val; //通过value这个迭代器,遍历这一组kv中所有的value,进行累加 for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) { val = (IntWritable)i$.next(); } this.result.set(sum); Bt bt = new Bt(key.toString(), result.get()); context.write(bt, null); } } public static class Bt implements Writable, DBWritable { private String bname; private Long num; /** * @param bname * @param num */ public Bt(String bname, Long num) { super(); this.bname = bname; this.num = num; } public String getBname() { return bname; } public void setBname(String bname) { this.bname = bname; } public Long getNum() { return num; } public void setNum(Long num) { this.num = num; } @Override public void write(PreparedStatement statement) throws SQLException { // TODO Auto-generated method stub statement.setString(1, bname); statement.setLong(2, num); } @Override public void readFields(ResultSet resultSet) throws SQLException { // TODO Auto-generated method stub bname = resultSet.getString(1); num = resultSet.getLong(2); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(this.bname); out.writeLong(this.num); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.bname =in.readUTF(); this.num =in.readLong(); } }

}

转载地址:https://blog.csdn.net/qq_28358461/article/details/89886472 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:message from server: "Host '192.168.76.89' is not allowed to connect to this MySQL server
下一篇:Oracle数据库实现自增长

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年03月23日 11时27分42秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

java开发招聘试题_客户化开发招聘试题-Java开发.doc 2019-04-21
java jdk win10 1335_win10下安装java jdk,tomcat 2019-04-21
java list二分查找_java中的ArrayList和LinkedList的二分查找速度比 | 学步园 2019-04-21
php中的变量名称用什么表示,PHP变量,方法,类等名称中的有效字符是什么? 2019-04-21
pic32mx是什么cpu_PIC32MX单片机外设库使用(Ⅰ)- 系统时钟及I/O口基本设置 2019-04-21
用c 在mysql上存图片_C 批量保存图片进 mysql 利用MYSQL_BIND插入longblob 2019-04-21
mysql 1045 28000_mysql报关于用户密码1045(28000),几种处理方法 (zhuan) 2019-04-21
solr比mysql的优势_Solr与Elasticsearch的优缺点比较总结和归纳 2019-04-21
华为博士招聘上机考试题目_牛客网-华为-2020届校园招聘上机考试-3 2019-04-21
python中for可以做变量名吗_Python中使用动态变量名的方法 2019-04-21
mysql 日期转换天数_MySQL 日期操作 增减天数、时间转换、时间戳 2019-04-21
java对象去重复_JAVA中List对象去除重复值的方法 2019-04-21
java bss_[转] .bss段和.data段的区别 2019-04-21
java上传图片损坏_大神求助 上传图片后 图片损坏 2019-04-21
java socket唯一标识符_Java Socket编程之常识网络基础知识 2019-04-21
java给xyz大小排序_java递归实现string xyz排序 2019-04-21
arctime必须要java_Arctime使用教程 Arctime常见问题解答 2019-04-21
mysql pxc mysql5.7_mysql之PXC5.7.18集群系列——1. Percona XtraDB Cluster 搭建 2019-04-21
mysql 自适应字段宽度_box-sizing解决自适应布局容器宽度问题 2019-04-21
java 配置文件配置路径_Java读取配置文件路径设置 2019-04-21