hbase数据导入:
参考,在把代码copy下来后,发现运行总是报错:
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved org.apache.hadoop.io.LongWritable;
原因是map的输出必须按照现有的版本来写,也就是extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>
要这样写,不能简单的写成extends Mapper,
代码还是贴出来:
生成hfile的代码:
package com.bonc.db;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TestHFileToHBase { public static class TestHFileToHBaseMapper extends Mapper{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\\|"); ImmutableBytesWritable rowkey = new ImmutableBytesWritable( values[0].toString().trim().getBytes()); KeyValue kvProtocol; if (values.length>1){ kvProtocol = new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), values[1].toString().trim() .getBytes()); }else{ kvProtocol=new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), "NULL".getBytes()); } context.write(rowkey, kvProtocol); // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(), // "SRCIP".getBytes(), values[1].getBytes()); // context.write(k, kvSrcip);// HFileOutputFormat.getRecordWriter } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "TestHFileToHBase"); job.setJarByClass(TestHFileToHBase.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(TestHFileToHBaseMapper.class); job.setReducerClass(KeyValueSortReducer.class);// job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); // job.setNumReduceTasks(4); // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class); // HBaseAdmin admin = new HBaseAdmin(conf); HTable table = new HTable(conf, "url_rule"); HFileOutputFormat.configureIncrementalLoad(job, table); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
hfile导入到表的代码:
1 package com.bonc.db; 2 import java.io.IOException; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; 9 import org.apache.hadoop.hbase.util.Bytes; 10 11 public class TestLoadIncrementalHFileToHBase { 12 13 // private static final byte[] TABLE = Bytes.toBytes("hua"); 14 // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID"); 15 // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID"); 16 17 public static void main(String[] args) throws IOException { 18 Configuration conf = HBaseConfiguration.create(); 19 // byte[] TABLE = Bytes.toBytes("hua"); 20 byte[] TABLE = Bytes.toBytes(args[0]); 21 HTable table = new HTable(TABLE); 22 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 23 loader.doBulkLoad(new Path(args[1]), table); 24 // loader.doBulkLoad(new Path("/hua/testHFileResult/"), table); 25 } 26 27 }
悲剧的是在从hfile导入到表的时候报错:
java.io.IOException: java.io.IOException: Failed rename of maprfs://133.0.76.41:7222/user/hbasetest/url_type/4447706551787973235 to maprfs://133.0.76.41:7222/hbase/url_rule/732e6d3d150caa8bd3d8d228e3d9c9a0/url_type/914168143008836217
at org.apache.hadoop.hbase.regionserver.StoreFile.rename(StoreFile.java:512)虽然解决办法在这里:
但是我实在是没看懂。so,我采用了最原始的方法:
split将文件分割成小文件,然后:
1 package com.bonc.db; 2 import java.io.BufferedReader; 3 import java.io.FileReader; 4 import java.io.IOException; 5 6 import org.apache.hadoop.hbase.client.HTable; 7 import org.apache.hadoop.hbase.client.HTablePool; 8 import org.apache.hadoop.hbase.client.Put; 9 10 import com.bonc.URLMatch.HBaseMain;11 public class URL_HBase {12 13 public static void main(String[] args) throws IOException{14 //文件绝对路径改成你自己的文件路径15 FileReader fr=new FileReader(args[0]);16 //可以换成工程目录下的其他文本文件17 HTablePool pool = new HTablePool(HBaseMain.conf, 1000);18 HTable table = (HTable) pool.getTable("url_rule");19 BufferedReader br=new BufferedReader(fr);20 while(br.readLine()!=null){21 String[] s=br.readLine().toString().split("\\|");22 if(s.length>1){23 Put put = new Put(s[0].trim().getBytes());24 put.add("url_type".getBytes(), "type".getBytes(), s[1].trim().getBytes());25 table.put(put);26 }else{27 System.out.println(s);28 }29 }30 br.close();31 }32 }
终于成功了,希望有人能够帮我翻译一下,怎么解决是个什么意思。。唉。