六狼论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 1051|回复: 0

分拆TableSplit 让多个mapper同时读取

[复制链接]

升级  62.67%

38

主题

38

主题

38

主题

秀才

Rank: 2

积分
144
 楼主| 发表于 2013-1-30 01:57:18 | 显示全部楼层 |阅读模式
默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。

由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取mapred.min.split.sizemapred.max.split.size

所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。

HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan,UserViewHisMapper2.class, Text.class, Text.class,genRecommendations);
而这个方法,最终是调用以下方法进行初始化设置的:
public static void initTableMapperJob(byte[] table, Scan scan,      Class<? extends TableMapper> mapper,      Class<? extends WritableComparable> outputKeyClass,      Class<? extends Writable> outputValueClass, Job job,      boolean addDependencyJars)  throws IOException {      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,              outputValueClass, job, addDependencyJars, TableInputFormat.class);  }

所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:

public class TableInputFormat extends TableInputFormatBaseimplements Configurable

最终要修改的则是TableInputFormatBase这个类,修改其以下方法:

public List<InputSplit> getSplits(JobContext context) throws IOException {}

这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
  public List<InputSplit> getSplits(JobContext context) throws IOException {if (table == null) {    throw new IOException("No table was provided.");}    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();    if (keys == null || keys.getFirst() == null ||        keys.getFirst().length == 0) {      throw new IOException("Expecting at least one region.");    }    int count = 0;    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);    for (int i = 0; i < keys.getFirst().length; i++) {      if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {        continue;      }      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).        getHostname();      byte[] startRow = scan.getStartRow();      byte[] stopRow = scan.getStopRow();      // determine if the given start an stop key fall into the region      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&          (stopRow.length == 0 ||           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {        byte[] splitStart = startRow.length == 0 ||          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?            keys.getFirst()[i] : startRow;        byte[] splitStop = (stopRow.length == 0 ||          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&          keys.getSecond()[i].length > 0 ?            keys.getSecond()[i] : stopRow;        InputSplit split = new TableSplit(table.getTableName(),          splitStart, splitStop, regionLocation);        splits.add(split);        if (LOG.isDebugEnabled())          LOG.debug("getSplits: split -> " + (count++) + " -> " + split);      }    }    return splits;  }

这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:

public List<InputSplit> getSplits(JobContext context) throws IOException {if (table == null) {throw new IOException("No table was provided.");}Pair<byte[][], byte[][]> keys = table.getStartEndKeys();if (keys == null || keys.getFirst() == null|| keys.getFirst().length == 0) {throw new IOException("Expecting at least one region.");}int count = 0;List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);for (int i = 0; i < keys.getFirst().length; i++) {if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {continue;}String regionLocation = table.getRegionLocation(keys.getFirst()[i],true).getHostname();byte[] startRow = scan.getStartRow();byte[] stopRow = scan.getStopRow();// determine if the given start an stop key fall into the regionif ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0)&& (stopRow.length == 0 || Bytes.compareTo(stopRow,keys.getFirst()[i]) > 0)) {byte[] splitStart = startRow.length == 0|| Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys.getFirst()[i] : startRow;byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0)&& keys.getSecond()[i].length > 0 ? keys.getSecond()[i]: stopRow;Scan scan1 = new Scan();scan1.setStartRow(splitStart);scan1.setStopRow(splitStop);scan1.setFilter(new KeyOnlyFilter());scan1.setBatch(500);ResultScanner resultscanner = table.getScanner(scan1);//用来保存该region的所有keyList<String> rows = new ArrayList<String>();//Iterator<Result>  it = resultscanner.iterator();for(Result rs : resultscanner){if(rs.isEmpty())continue;rows.add(new String(rs.getRow()));}int splitSize = rows.size() / mappersPerSplit;for (int j = 0; j < mappersPerSplit; j++) {TableSplit tablesplit = null;if (j == mappersPerSplit - 1)tablesplit = new TableSplit(table.getTableName(),rows.get(j * splitSize).getBytes(),rows.get(rows.size() - 1).getBytes(),regionLocation);elsetablesplit = new TableSplit(table.getTableName(),rows.get(j * splitSize).getBytes(),rows.get(j * splitSize + splitSize).getBytes(), regionLocation);splits.add(tablesplit);if (LOG.isDebugEnabled())LOG.debug((new StringBuilder()).append("getSplits: split -> ").append(i++).append(" -> ").append(tablesplit).toString());}resultscanner.close();}}return splits;}

通过配置设置需要拆分的split数。
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

本版积分规则

快速回复 返回顶部 返回列表