DBOutputFormat 从HDFS文件系统读取数据到数据库
发布日期:2021-11-15 02:11:39 浏览次数:2 分类:技术文章

本文共 4485 字,大约阅读时间需要 14 分钟。

接上一篇文章

public class HDFS_TO_MYSQL

{

private final static Logger log = Logger.getLogger(KC86DATA_HDFS.class);

/* private static final String out_path = “hdfs://IP:9000/home/kc86_intput”;

private static final String driver = “oracle.jdbc.driver.OracleDriver”;
private static final String url = “jdbc:oracle:thin:@IP:1521:orcl”;
private static final String username = “root”;
private static final String password = “root”;*/

private static final String out_path = "hdfs://IP:9000/home/kc86_intput";private static final String driver = "com.mysql.jdbc.Driver";private static final String url = "jdbc:mysql://IP:3306/zicontd";private static final String username = "root";private static final String password = "root"; /** * 读取数据库中kc86表中返回的实体类 * */ public static class KC86Entity  implements Writable, DBWritable {	    private String AAZ217;		private String AAC001;		     /**		 * 无参构造器		 */		public KC86Entity() {			super();			// TODO Auto-generated constructor stub		}		public KC86Entity(String aAZ217, String aAC001) {			super();			AAZ217 = aAZ217;			AAC001 = aAC001;					}    		    ***//get  set方法省略***	@Override	public String toString() 	{		return "KC86Entity [AAZ217=" + AAZ217 + ", AAC001=" + AAC001 + " "]";	}	@Override	public void write(PreparedStatement statement) throws SQLException {		// TODO Auto-generated method stub				statement.setString(1, AAZ217);		statement.setString(2, AAC001);		}	@Override	public void readFields(ResultSet resultSet) throws SQLException {		// TODO Auto-generated method stub				AAZ217      = resultSet.getString(1); 		AAC001      = resultSet.getString(2);   	}	@Override	public void readFields(DataInput in) throws IOException {		// TODO Auto-generated method stub		AAZ217           =in.readUTF();		AAC001           =in.readUTF();			}	@Override	public void write(DataOutput out) throws IOException {		// TODO Auto-generated method stub		out.writeUTF(AAZ217);		out.writeUTF(AAC001);	}	  }  static class MyDBOutputFormatMRMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("打印出原始value的值"+value+"长度为"+value.getLength()); String getStr = value.toString().substring(12,value.toString().length()-1); System.out.println("打印出value的值"+getStr); String[] sp = getStr.split(","); //把等号后面的值取出来,存放到新的String[]下。 String[] split = new String[sp.length]; for(int i = 0;i
{ /* @Override protected void reduce(KC86Entity key, Iterable
value, Reducer
.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.reduce(key, value, context); }*/ @Override protected void reduce(KC86Entity key, Iterable
value, Reducer
.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub for(NullWritable nvl : value){ context.write(key, nvl); } } } /** * 读取HDFS中的数据到数据库表 * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException * */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //当前运行环境中配置的HADOOP_USER_NAME属性值 String hadoop_user_name = System.getenv("HADOOP_USER_NAME"); System.setProperty("HADOOP_USER_NAME", hadoop_user_name); Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, driver, url,username,password); Job job = Job.getInstance(conf); job.setJarByClass(HDFS_TO_MYSQL.class); job.setMapperClass(MyDBOutputFormatMRMapper.class); job.setMapOutputKeyClass(KC86Entity.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(MyDBOutputFormatMRReducer.class); job.setOutputKeyClass(KC86Entity.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); //job.setOutputFormatClass(DBOutputFormat.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); //此处给出的字段名称约束 一定要和Student类中描述读进或者写出的字段名称个数一致 String[] str = new String[] {"AAZ217","AAC001"}; DBOutputFormat.setOutput(job, "KC86_test1", str); //FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(URI.create(out_path), conf,"root"); Path p = new Path(out_path); if(!fs.exists(p)){ //fs.delete(p,true); //System.out.println("输出路径存在,已删除!"); log.info("输出路径不存在"); } //读path路径下的文件 FileInputFormat.setInputPaths(job, p); job.waitForCompletion(true); //job.waitForCompletion(true); }

}

转载地址:https://blog.csdn.net/qq_28358461/article/details/89443631 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:从HDFS中用mapreduce读取文件到Oracle数据库中BUG
下一篇:DBInputFormat从数据库读取数据到HDFS文件系统

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年04月13日 00时40分39秒