本站分享:AI、大数据、数据分析师培训认证考试,包括:Python培训Excel培训Matlab培训SPSS培训SAS培训R语言培训Hadoop培训Amos培训Stata培训Eviews培训

MapReduce操作HBase错误一例

hadoop培训 cdadata 2864℃

MapReduce操作HBase错误一例

运行HBase时常会遇到个错误,我就有这样的经历。

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

检查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)

如果是这个错误,说明RPC协议不一致所造成的,解决方法:将hbase/lib目录下的hadoop-core的jar文件删除,将hadoop目录下的hadoop-0.20.2-core.jar拷贝到hbase/lib下面,然后重新启动hbase即可。第二种错误是:没有启动hadoop,先启用hadoop,再启用hbase。

在Eclipse开发中,需要加入hadoop所有的jar包以及HBase二个jar包(hbase,zooKooper)。

HBase基础可见帖子:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499112.html

建表,通过HBaseAdmin类中的create静态方法来创建表。

HTable类是操作表,例如,静态方法put可以插入数据,该类初始化时可以传递一个行键,静态方法getScanner()可以获得某一列上的所有数据,返回Result类,Result类中有个静态方法getFamilyMap()可以获得以列名为key,值为value,这刚好与hadoop中map结果是一样的。

  1. package test;
  2. import java.io.IOException;
  3. import java.util.Map;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.HColumnDescriptor;
  7. import org.apache.hadoop.hbase.HTableDescriptor;
  8. import org.apache.hadoop.hbase.client.HBaseAdmin;
  9. import org.apache.hadoop.hbase.client.HTable;
  10. import org.apache.hadoop.hbase.client.Put;
  11. import org.apache.hadoop.hbase.client.Result;
  12. public class Htable {
  13.     /**
  14.      * @param args
  15.      */
  16.     public static void main(String[] args) throws IOException {
  17.         // TODO Auto-generated method stub
  18.         Configuration hbaseConf = HBaseConfiguration.create();
  19.         HBaseAdmin admin = new HBaseAdmin(hbaseConf);
  20.         HTableDescriptor htableDescriptor = new HTableDescriptor(“table”
  21.                 .getBytes());  //set the name of table
  22.         htableDescriptor.addFamily(new HColumnDescriptor(“fam1”)); //set the name of column clusters
  23.         admin.createTable(htableDescriptor); //create a table 
  24.         HTable table = new HTable(hbaseConf, “table”); //get instance of table.
  25.         for (int i = 0; i < 3; i++) {   //for is number of rows
  26.             Put putRow = new Put((“row” + i).getBytes()); //the ith row
  27.             putRow.add(“fam1”.getBytes(), “col1”.getBytes(), “vaule1”
  28.                     .getBytes());  //set the name of column and value.
  29.             putRow.add(“fam1”.getBytes(), “col2”.getBytes(), “vaule2”
  30.                     .getBytes());
  31.             putRow.add(“fam1”.getBytes(), “col3”.getBytes(), “vaule3”
  32.                     .getBytes());
  33.             table.put(putRow);
  34.         }
  35.         for(Result result: table.getScanner(“fam1”.getBytes())){//get data of column clusters 
  36.             for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap(“fam1”.getBytes()).entrySet()){//get collection of result
  37.                 String column = new String(entry.getKey());
  38.                 String value = new String(entry.getValue());
  39.                 System.out.println(column+“,”+value);
  40.             }
  41.         }
  42.         admin.disableTable(“table”.getBytes()); //disable the table
  43.         admin.deleteTable(“table”.getBytes());  //drop the tbale
  44.     }
  45. }

以上代码不难看懂。

下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。

现在有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。

  1. package test;
  2. import java.io.IOException;
  3. import java.util.Map;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.HColumnDescriptor;
  7. import org.apache.hadoop.hbase.HTableDescriptor;
  8. import org.apache.hadoop.hbase.client.HBaseAdmin;
  9. import org.apache.hadoop.hbase.client.HTable;
  10. import org.apache.hadoop.hbase.client.Put;
  11. import org.apache.hadoop.hbase.client.Result;
  12. public class Htable {
  13.     /**
  14.      * @param args
  15.      */
  16.     public static void main(String[] args) throws IOException {
  17.         // TODO Auto-generated method stub
  18.         Configuration hbaseConf = HBaseConfiguration.create();
  19.         HBaseAdmin admin = new HBaseAdmin(hbaseConf);
  20.         HTableDescriptor htableDescriptor = new HTableDescriptor(“table”
  21.                 .getBytes());  //set the name of table
  22.         htableDescriptor.addFamily(new HColumnDescriptor(“fam1”)); //set the name of column clusters
  23.         admin.createTable(htableDescriptor); //create a table 
  24.         HTable table = new HTable(hbaseConf, “table”); //get instance of table.
  25.         for (int i = 0; i < 3; i++) {   //for is number of rows
  26.             Put putRow = new Put((“row” + i).getBytes()); //the ith row
  27.             putRow.add(“fam1”.getBytes(), “col1”.getBytes(), “vaule1”
  28.                     .getBytes());  //set the name of column and value.
  29.             putRow.add(“fam1”.getBytes(), “col2”.getBytes(), “vaule2”
  30.                     .getBytes());
  31.             putRow.add(“fam1”.getBytes(), “col3”.getBytes(), “vaule3”
  32.                     .getBytes());
  33.             table.put(putRow);
  34.         }
  35.         for(Result result: table.getScanner(“fam1”.getBytes())){//get data of column clusters 
  36.             for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap(“fam1”.getBytes()).entrySet()){//get collection of result
  37.                 String column = new String(entry.getKey());
  38.                 String value = new String(entry.getValue());
  39.                 System.out.println(column+“,”+value);
  40.             }
  41.         }
  42.         admin.disableTable(“table”.getBytes()); //disable the table
  43.         admin.deleteTable(“table”.getBytes());  //drop the tbale
  44.     }
  45. }

Reduce类,主要是将键值传到HBase表中

  1. package test;
  2. import java.io.IOException;
  3. import org.apache.hadoop.hbase.client.Put;
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  5. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  6. import org.apache.hadoop.io.Text;
  7. public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{
  8.     public void reduce(Text key,Iterable<Text> values,Context context){
  9.         String k = key.toString();
  10.         StringBuffer str=null;
  11.         for(Text value: values){
  12.             str.append(value.toString());
  13.         }
  14.         String v = new String(str);
  15.         Put putrow = new Put(k.getBytes());
  16.         putrow.add(“fam1”.getBytes(), “name”.getBytes(), v.getBytes());
  17.     }
  18. }

由上面可知ReducerClass继承TableReduce,在hadoop里面ReducerClass继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable。

Map,Reduce,以及Job的配置分离,比较清晰,mahout也是采用这种构架。

  1. package test;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.conf.Configured;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  11. import org.apache.hadoop.util.Tool;
  12. public class Driver extends Configured implements Tool{
  13.     @Override
  14.     public static void run(String[] arg0) throws Exception {
  15.         // TODO Auto-generated method stub
  16.         Configuration conf = HBaseConfiguration.create();
  17.         conf.set(“hbase.zookeeper.quorum.”“localhost”);
  18.         Job job = new Job(conf,“Hbase”);
  19.         job.setJarByClass(TxtHbase.class);
  20.         Path in = new Path(arg0[0]);
  21.         job.setInputFormatClass(TextInputFormat.class);
  22.         FileInputFormat.addInputPath(job, in);
  23.         job.setMapperClass(MapperClass.class);
  24.         job.setMapOutputKeyClass(Text.class);
  25.         job.setMapOutputValueClass(Text.class);
  26.         TableMapReduceUtil.initTableReducerJob(“table”, ReducerClass.class, job);
  27.        job.waitForCompletion(true);
  28.     }
  29. }

Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob(“tab1”, THReducer.class, job); 来执行reduce类。

主函数

  1. package test;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.util.ToolRunner;
  4. public class TxtHbase {
  5.     public static void main(String [] args) throws Exception{        Driver.run(new Configuration(),new THDriver(),args);     } }

读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了。

  1. package test;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.client.Result;
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  6. import org.apache.hadoop.hbase.mapred.TableMap;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapred.MapReduceBase;
  9. import org.apache.hadoop.mapred.OutputCollector;
  10. import org.apache.hadoop.mapred.Reporter;
  11. public class MapperClass extends MapReduceBase implements
  12.         TableMap<Text, Text> {
  13.     static final String NAME = “GetDataFromHbaseTest”;
  14.     private Configuration conf;
  15.     public void map(ImmutableBytesWritable row, Result values,
  16.             OutputCollector<Text, Text> output, Reporter reporter)
  17.             throws IOException {
  18.         StringBuilder sb = new StringBuilder();
  19.         for (Entry<byte[], byte[]> value : values.getFamilyMap(
  20.                 “fam1”.getBytes()).entrySet()) {
  21.             String cell = value.getValue().toString();
  22.             if (cell != null) {
  23.                 sb.append(new String(value.getKey())).append(new String(cell));
  24.             }
  25.         }
  26.         output.collect(new Text(row.get()), new Text(sb.toString()));
  27.     }

要实现这个方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。

  1. package test;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.conf.Configured;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  11. import org.apache.hadoop.util.Tool;
  12. public class Driver extends Configured implements Tool{
  13.     @Override
  14.     public static void run(String[] arg0) throws Exception {
  15.         // TODO Auto-generated method stub
  16.         Configuration conf = HBaseConfiguration.create();
  17.         conf.set(“hbase.zookeeper.quorum.”“localhost”);
  18.         Job job = new Job(conf,“Hbase”);
  19.         job.setJarByClass(TxtHbase.class);
  20.         job.setInputFormatClass(TextInputFormat.class);
  21.         job.setMapOutputKeyClass(Text.class);
  22.         job.setMapOutputValueClass(Text.class);
  23.         TableMapReduceUtilinitTableMapperJob(“table”, args0[0],MapperClass.class, job);         job.waitForCompletion(true); } }

主函数

  1. package test;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.util.ToolRunner;
  4. public class TxtHbase {
  5.     public static void main(String [] args) throws Exception{
  6.         Driver.run(new Configuration(),new THDriver(),args);
  7.     }
  8. }
  9. ——————————————————————————–

转载请注明:数据分析 » MapReduce操作HBase错误一例

喜欢 (0)or分享 (0)