yangyiqian 发表于 2013-1-15 02:23:54

转:多路归并排序(远远大于内存的数据进行外排序)

http://hi.baidu.com/qq350884961/blog/item/8eeafd4583308d89b2b7dc2c.html对远远大于内存的数据进行外排序,在多路比较的时候用败者树效率会更高。这个算法可以在建立倒排索引的时候使用package my.sort;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.BufferedWriter;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.FileWriter;import java.io.IOException;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.Random;/*** 基于大数据量的外排序算法,分为二路归并和多路归并* @author java2king* @link http://hi.baidu.com/qq350884961*/public class ExternalSort {      public static int ITEM_COUNT = 10000000; //总数       public static int BUFFER_SIZE = 1024*4*1000;// 一次缓冲读取            public static int FILE_COUNT = 1024*1000*1*4;// 每个文件的记录数1            public static File MAIN_FILE = new File("mainset");//要排序的文件      /**   * 二路归并   * @param file   * @return   * @throws IOException   */    public File sort(File file) throws IOException {          ArrayList<File> files = split(file);          return process(files);      }      /**   * 多路归并   * @param file   * @throws IOException   */    public void mSort(File file) throws IOException{          ArrayList<File> files = split(file);          multipleMerge(files);                }      // recursive method to merge the lists until we are left with a      // single merged list      private File process(ArrayList<File> list) throws IOException {          if (list.size() == 1) {            return list.get(0);          }          ArrayList<File> inter = new ArrayList<File>();          for (Iterator<File> itr = list.iterator(); itr.hasNext();) {            File one = itr.next();            if (itr.hasNext()) {                  File two = itr.next();                  inter.add(merge(one, two));            } else {                // return one;                  inter.add(one);            }          }          return process(inter);      }      /**   * Splits the original file into a number of sub files.       */    private ArrayList<File> split(File file) throws IOException {          ArrayList<File> files = new ArrayList<File>();          int[] buffer = new int;          FileInputStream fr = new FileInputStream(file);          BufferedInputStream bin = new BufferedInputStream(fr,BUFFER_SIZE);          DataInputStream din=new DataInputStream(bin);            boolean fileComplete = false;                  while (!fileComplete) {            int index = buffer.length;            for (int i = 0; i < buffer.length && !fileComplete; i++) {                  try {                     buffer = din.readInt();                  } catch (Exception e) {                      fileComplete = true;                      index = i;                  }            }            if (index != 0 && buffer > -1) {                  Arrays.sort(buffer, 0, index);                  File f = new File("set" + new Random().nextInt());         //       File temp = File.createTempFile("josp", ".tmp", f);                     FileOutputStream writer = new FileOutputStream(f);                  BufferedOutputStream bOutputStream = new BufferedOutputStream(writer);                                 DataOutputStream dout=new DataOutputStream(bOutputStream);                   for (int j = 0; j < index; j++) {                      dout.writeInt(buffer);                  }                  dout.close();                  bOutputStream.close();                  writer.close();                  files.add(f);                               }          }          din.close();          bin.close();          fr.close();          return files;      }      /**   * 多路归并   * @param list   * @throws IOException   */    private void multipleMerge(ArrayList<File> list) throws IOException {                  int fileSize = list.size();                  if(fileSize == 1){            return;          }                  ArrayList<DataInputStream> dinlist = new ArrayList<DataInputStream>();                  int[] ext = new int;//比较数组      // File output = new File("multipleMerged");          FileOutputStream os = new FileOutputStream(MAIN_FILE);          BufferedOutputStream bout = new BufferedOutputStream(os);          DataOutputStream dout = new DataOutputStream(bout);          for (int i = 0; i < fileSize; i++) {            try {                  dinlist.add(i, new DataInputStream(new BufferedInputStream(                        new FileInputStream(list.get(i)), BUFFER_SIZE)));            } catch (Exception e) {                  e.printStackTrace();            }          }          int index = 0;          for (int i = 0; i < fileSize; i++) {            try {                  ext = dinlist.get(i).readInt();            } catch (Exception e) {                  System.err.println("file_" + i + "为空");                  ext = -1;            }          }          int count = fileSize;          int[] sum = new int;                  while (count > 1) {            index = getMinIndex(ext);            dout.writeInt(ext);            sum++;            try {                  ext = dinlist.get(index).readInt();            } catch (Exception e) {                  ext = -1;                  count--;                  dinlist.get(index).close();          //      System.err.println(index + "空,写进:" +sum);                              }          }          int sIndex = getSIndex(ext);          dout.writeInt(ext);          while (true) {            try {                  dout.writeInt(dinlist.get(sIndex).readInt());            } catch (Exception e) {                  dinlist.get(sIndex).close();                  break;            }          }          dout.close();      }      //找到剩下的最后一个文件输入流      public int getSIndex(int[] ext){          int result = 0;          for (int i = 0; i < ext.length; i++) {            if(ext!= -1){                  result = i;                  break;            }          }          return result;      }      //找到数据中最小的一个      public int getMinIndex(int[] ext){          int min = 2147483647;          int index = -1;          for (int i = 0; i < ext.length; i++) {            if(ext != -1 && ext < min){                  min = ext;                  index = i;            }          }          return index;      }      /**   * 二路归并   *       * @param one   * @param two   * @return   * @throws IOException   */    private File merge(File one, File two) throws IOException {          FileInputStream fis1 = new FileInputStream(one);          FileInputStream fis2 = new FileInputStream(two);          BufferedInputStream bin1 = new BufferedInputStream(fis1,BUFFER_SIZE);          BufferedInputStream bin2 = new BufferedInputStream(fis2,BUFFER_SIZE);                  DataInputStream din1=new DataInputStream(bin1);            DataInputStream din2=new DataInputStream(bin2);                      File output = new File("merged" + new Random().nextInt());          FileOutputStream os = new FileOutputStream(output);          BufferedOutputStream bout = new BufferedOutputStream(os);          DataOutputStream dout=new DataOutputStream(bout);                int a = -1;//= din1.readInt();          int b = -1;//= din2.readInt();                  boolean finished = false;          boolean emptyA = false;//          int flag = 0;          while (!finished) {            if (flag != 1) {                  try {                      a = din1.readInt();                  } catch (Exception e) {                      emptyA = true;                      break;                  }            }            if (flag != 2) {                  try {                      b = din2.readInt();                  } catch (Exception e) {                      emptyA = false;                      break;                  }            }            if(a > b){                  dout.writeInt(b);                  flag = 1;            }else if( a < b){                  dout.writeInt(a);                  flag = 2;            }else if(a == b){                  dout.write(a);                  dout.write(b);                  flag = 0;            }          }          finished = false;          if(emptyA){            dout.writeInt(b);            while(!finished){                  try {                      b = din2.readInt();                  } catch (Exception e) {                      break;                  }                  dout.writeInt(b);            }          }else{            dout.writeInt(a);            while(!finished){                  try {                      a = din1.readInt();                  } catch (Exception e) {                      break;                  }                  dout.writeInt(a);            }          }          dout.close();          os.close();          bin1.close();          bin2.close();          bout.close();          return output;      }             /**   * @param args   * @throws IOException   */    public static void main(String[] args) throws IOException {                        Random random = new Random(System.currentTimeMillis());          FileOutputStream fw = new FileOutputStream(MAIN_FILE);          BufferedOutputStream bout = new BufferedOutputStream(fw);          DataOutputStream dout=new DataOutputStream(bout);                      for (int i = 0; i < ITEM_COUNT; i++) {            int ger = random.nextInt();            ger = ger < 0 ? -ger : ger;            dout.writeInt(ger);          }          dout.close();          bout.close();          fw.close();          ExternalSort sort = new ExternalSort();          System.out.println("Original:");          long start = System.currentTimeMillis();          sort.mSort(MAIN_FILE);                           long end = System.currentTimeMillis();          System.out.println((end - start)/1000 + "s");          recordFile((end - start)/1000 ,true);      }      private static void recordFile(long time,boolean isBuffer)            throws FileNotFoundException, IOException {          BufferedWriter bw = new BufferedWriter(new FileWriter("log",true));          bw.write("FILE_COUNT = "+FILE_COUNT+";对"+ ITEM_COUNT + "条数据 "+ ITEM_COUNT*4/(1024*1204) +"MB排序耗时:" + time + "s ");          if(isBuffer){            bw.write(" 使用缓冲:"+BUFFER_SIZE*4/(1024*1204) +"MB");          }          bw.newLine();          bw.close();      }}
页: [1]
查看完整版本: 转:多路归并排序(远远大于内存的数据进行外排序)