六狼论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 43|回复: 0

java中的自动并行运算小试

[复制链接]

升级  30%

3

主题

3

主题

3

主题

童生

Rank: 1

积分
15
 楼主| 发表于 2013-1-27 05:01:01 | 显示全部楼层 |阅读模式
前天看了 T1 大大的“关于两个世界体系的对话”http://www.iteye.com/topic/231515,获益良多,随笔写下2个类把文章中的 (1+2)*(3+4) 并行问题用java解决掉。
基于这个解决方案的java代码是可以(理论上)自动被并行运算的(在方法调用这个层面上),并行部分就是对所有无相关参数并行计算

闲话少说,上代码

客户端
为了让效果明显,加法和乘法里面都sleep了1000ms
Test类有3个method, add(a,b) , mul(a,b,c), trace(starttime,result),在传统的顺序执行中是这样调用的
Test t=new Test();int a=t.add(1,2);int b=t.add(3,4);int c=t.add(2,5);int result=t.mul(a,b,c);运行结果main: + 1 2 => 3main: + 3 4 => 7main: + 2 5 => 7main: * 3 7 7 => 147147 | time cost : 4003 | freeM/totalM : 1816576/2031616
用了并行运算就要这样调用
Test t=new Test();Func add=Func.define(t,"add");Func mul=Func.define(t,"mul");Var a=add.c(1,2);Var b=add.c(3,4);Var c=add.c(2,5);Var result=mul.c(a,b,c);result.run();运行结果Thread-1: + 1 2 => 3Thread-2: + 3 4 => 7Thread-3: + 2 5 => 7Thread-6: * 3 7 7 => 147147 | time cost : 2010 | freeM/totalM : 1716992/2031616

用了并发的写法以后,3个加法是同时由3个不同线程执行的,直到3个加法都执行完毕,乘法才被调用。在一个完全按照sequence方式写的并行程序的运行时间大大的减少了,4003ms => 2010ms

Test.java
/* * Created on 2008-9-9 * Title: * Description:  * @author Gordon Hu * @version 1.0 */package gordon.concurrence;public class Test {        public static void main(String[] args) throws Exception {        Test t=new Test();        boolean parallel=true;        if(parallel){            Func add=Func.define(t,"add");            Func mul=Func.define(t,"mul");            Func println=Func.define(t,"trace");            Var a=add.c(1,2);            Var b=add.c(3,4);            Var c=add.c(2,5);            Var i3=mul.c(a,b,c);            long start=System.currentTimeMillis();            Var timeMe=println.c(start,i3);            timeMe.run();        }else{            long START=System.currentTimeMillis();            int A=t.add(1,2);            int B=t.add(3,4);            int C=t.add(2,5);            int I3=t.mul(A,B,C);            t.trace(START,I3);        }    }        public int add(Integer a,Integer b){    int rst=a+b;    try {            Thread.currentThread().sleep(1000);        } catch (InterruptedException e) {            // TODO        }        System.out.println(Thread.currentThread().getName()+ ": + "+a+" "+b+" => "+rst);        return rst;    }    public int mul(Integer a,Integer b,Integer c){        int rst=a*b*c;        try {            Thread.currentThread().sleep(1000);        } catch (InterruptedException e) {            // TODO        }        System.out.println(Thread.currentThread().getName()+ ": * "+a+" "+b+" "+c+" => "+rst);        return rst;    }    public void trace(Long start,Object o){        System.out.println(o+" | time cost : "+(System.currentTimeMillis()-start)+" | freeM/totalM : "+Runtime.getRuntime().freeMemory()+"/"+Runtime.getRuntime().totalMemory());    }}


帮助类
Var.java
/* * Created on 2008-9-9 * Title: * Description:  * @author Gordon Hu * @version 1.0 */package gordon.concurrence;import java.util.HashSet;import java.util.Set;public abstract class Var implements Runnable {    public abstract Func getFunc();    public abstract void run();    public abstract void update(Var n,Object rst);    private Set<Var> updater=new HashSet<Var>();    public void setUpdater(Set<Var> updater) {        this.updater.addAll(updater);    }        public void addUpdater(Var updater){        this.updater.add(updater);    }    public Set<Var> getUpdater() {        return updater;    }}
每个Var其实就是一个方法运行时候所需要的context,包括input parameters和return结果要调用的callback

Threading.java
/* * Created on 2008-9-9 * Title: * Description:  * @author Gordon Hu * @version 1.0 */package gordon.concurrence;public class Threading {    public static void runThread(Runnable r){        new Thread(r).start();    }}
这个类就是所有异步调用的底层设施了,我这里就直接new thread了,可以用threadpool之类的,也可以用grid computation,比如coherence之类的


接下来这个Func其实就是些反射动态调用并且暴露一个统一的类似function的接口。
动态调用里面我用了方法名字匹配(不包括参数类型,只要名字符合的第一个方法就被调用,这样主要是因为本人倾向可变长参数,不喜欢overloading,所以像System.out对象里面的println方法就不能直接用)。另外还负责了生成一个Var。
Func.java
/* * Created on 2008-9-9 * Title: * Description:  * @author Gordon Hu * @version 1.0 */package gordon.concurrence;import java.util.ArrayList;import java.util.List;public abstract class Func {    public abstract Object apply(Object... args);    public Var c(final Object... args){        final Func self=this;        Var n=new Var(){                //....create a Var and bind args to it                //....具体实现省略,详情看附件中的src            };        //set callbacks        for(Object o:args){            if(o instanceof Var)((Var)o).addUpdater(n);        }        return n;    }            public static Func define(final Object worker,final String method){        //define a Func from a given object and its method name        return new Func(){                public Object apply(Object... args) {                    return run(worker,method,args);                }            };    }        public static Runnable getUpdateThread(final Var updater,final Var node,final Object rst){        return new Runnable(){            public void run(){                updater.update(node,rst);            }        };    }    public static Object run(Object worker, String action,                                 Object... arguments) {        //dynamically call run a method        //execute worker.action(arguments) and return result        //....具体实现省略,详情看附件中的src    }}
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

本版积分规则

快速回复 返回顶部 返回列表