DBOutputFormat 从HDFS文件系统读取数据到数据库
发布日期:2021-11-15 02:11:39
浏览次数:2
分类:技术文章
本文共 4485 字,大约阅读时间需要 14 分钟。
接上一篇文章
public class HDFS_TO_MYSQL
{private final static Logger log = Logger.getLogger(KC86DATA_HDFS.class);
/* private static final String out_path = “hdfs://IP:9000/home/kc86_intput”;
private static final String driver = “oracle.jdbc.driver.OracleDriver”; private static final String url = “jdbc:oracle:thin:@IP:1521:orcl”; private static final String username = “root”; private static final String password = “root”;*/private static final String out_path = "hdfs://IP:9000/home/kc86_intput";private static final String driver = "com.mysql.jdbc.Driver";private static final String url = "jdbc:mysql://IP:3306/zicontd";private static final String username = "root";private static final String password = "root"; /** * 读取数据库中kc86表中返回的实体类 * */ public static class KC86Entity implements Writable, DBWritable { private String AAZ217; private String AAC001; /** * 无参构造器 */ public KC86Entity() { super(); // TODO Auto-generated constructor stub } public KC86Entity(String aAZ217, String aAC001) { super(); AAZ217 = aAZ217; AAC001 = aAC001; } ***//get set方法省略*** @Override public String toString() { return "KC86Entity [AAZ217=" + AAZ217 + ", AAC001=" + AAC001 + " "]"; } @Override public void write(PreparedStatement statement) throws SQLException { // TODO Auto-generated method stub statement.setString(1, AAZ217); statement.setString(2, AAC001); } @Override public void readFields(ResultSet resultSet) throws SQLException { // TODO Auto-generated method stub AAZ217 = resultSet.getString(1); AAC001 = resultSet.getString(2); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub AAZ217 =in.readUTF(); AAC001 =in.readUTF(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(AAZ217); out.writeUTF(AAC001); } } static class MyDBOutputFormatMRMapper extends Mapper{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("打印出原始value的值"+value+"长度为"+value.getLength()); String getStr = value.toString().substring(12,value.toString().length()-1); System.out.println("打印出value的值"+getStr); String[] sp = getStr.split(","); //把等号后面的值取出来,存放到新的String[]下。 String[] split = new String[sp.length]; for(int i = 0;i { /* @Override protected void reduce(KC86Entity key, Iterable value, Reducer .Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.reduce(key, value, context); }*/ @Override protected void reduce(KC86Entity key, Iterable value, Reducer .Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub for(NullWritable nvl : value){ context.write(key, nvl); } } } /** * 读取HDFS中的数据到数据库表 * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException * */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //当前运行环境中配置的HADOOP_USER_NAME属性值 String hadoop_user_name = System.getenv("HADOOP_USER_NAME"); System.setProperty("HADOOP_USER_NAME", hadoop_user_name); Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, driver, url,username,password); Job job = Job.getInstance(conf); job.setJarByClass(HDFS_TO_MYSQL.class); job.setMapperClass(MyDBOutputFormatMRMapper.class); job.setMapOutputKeyClass(KC86Entity.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(MyDBOutputFormatMRReducer.class); job.setOutputKeyClass(KC86Entity.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); //job.setOutputFormatClass(DBOutputFormat.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); //此处给出的字段名称约束 一定要和Student类中描述读进或者写出的字段名称个数一致 String[] str = new String[] {"AAZ217","AAC001"}; DBOutputFormat.setOutput(job, "KC86_test1", str); //FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(URI.create(out_path), conf,"root"); Path p = new Path(out_path); if(!fs.exists(p)){ //fs.delete(p,true); //System.out.println("输出路径存在,已删除!"); log.info("输出路径不存在"); } //读path路径下的文件 FileInputFormat.setInputPaths(job, p); job.waitForCompletion(true); //job.waitForCompletion(true); }
}
转载地址:https://blog.csdn.net/qq_28358461/article/details/89443631 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
关注你微信了!
[***.104.42.241]2024年04月13日 00时40分39秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
LeetCode 训练场:1460. 通过翻转子数组使两个数组相等
2019-04-26
LeetCode 训练场:1464. 数组中两元素的最大乘积
2019-04-26
LeetCode 训练场:977. 有序数组的平方
2019-04-26
LeetCode 训练场:905. 按奇偶排序数组
2019-04-26
LeetCode 训练场:283. 移动零
2019-04-26
LeetCode 训练场:1051. 高度检查器
2019-04-26
LeetCode 训练场:144. 二叉树的前序遍历
2019-04-26
LeetCode 训练场:94. 二叉树的中序遍历
2019-04-26
LeetCode 训练场:145. 二叉树的后序遍历
2019-04-26
程序员面试金典:面试题 02.03. 删除中间节点
2019-04-26
LeetCode 训练场:237. 删除链表中的节点
2019-04-26
链表是啥?如何搞定它?
2019-04-26
LeetCode 训练场:35. 搜索插入位置
2019-04-26
LeetCode 训练场:342. 4 的幂
2019-04-26
1、Maven 简介
2019-04-26
2、Maven 常用命令
2019-04-26
3、IDEA 创建 Maven 版 Hello World
2019-04-26
4、Maven 依赖管理
2019-04-26
1、Spring MVC是什么?
2019-04-26
1、什么是 MyBatis
2019-04-26