|
前天看了 T1 大大的“关于两个世界体系的对话”http://www.iteye.com/topic/231515,获益良多,随笔写下2个类把文章中的 (1+2)*(3+4) 并行问题用java解决掉。
基于这个解决方案的java代码是可以(理论上)自动被并行运算的(在方法调用这个层面上),并行部分就是对所有无相关参数并行计算
闲话少说,上代码
客户端
为了让效果明显,加法和乘法里面都sleep了1000ms
Test类有3个method, add(a,b) , mul(a,b,c), trace(starttime,result),在传统的顺序执行中是这样调用的
Test t=new Test();int a=t.add(1,2);int b=t.add(3,4);int c=t.add(2,5);int result=t.mul(a,b,c);运行结果main: + 1 2 => 3main: + 3 4 => 7main: + 2 5 => 7main: * 3 7 7 => 147147 | time cost : 4003 | freeM/totalM : 1816576/2031616
用了并行运算就要这样调用
Test t=new Test();Func add=Func.define(t,"add");Func mul=Func.define(t,"mul");Var a=add.c(1,2);Var b=add.c(3,4);Var c=add.c(2,5);Var result=mul.c(a,b,c);result.run();运行结果Thread-1: + 1 2 => 3Thread-2: + 3 4 => 7Thread-3: + 2 5 => 7Thread-6: * 3 7 7 => 147147 | time cost : 2010 | freeM/totalM : 1716992/2031616
用了并发的写法以后,3个加法是同时由3个不同线程执行的,直到3个加法都执行完毕,乘法才被调用。在一个完全按照sequence方式写的并行程序的运行时间大大的减少了,4003ms => 2010ms。
Test.java
/* * Created on 2008-9-9 * Title: * Description: * @author Gordon Hu * @version 1.0 */package gordon.concurrence;public class Test { public static void main(String[] args) throws Exception { Test t=new Test(); boolean parallel=true; if(parallel){ Func add=Func.define(t,"add"); Func mul=Func.define(t,"mul"); Func println=Func.define(t,"trace"); Var a=add.c(1,2); Var b=add.c(3,4); Var c=add.c(2,5); Var i3=mul.c(a,b,c); long start=System.currentTimeMillis(); Var timeMe=println.c(start,i3); timeMe.run(); }else{ long START=System.currentTimeMillis(); int A=t.add(1,2); int B=t.add(3,4); int C=t.add(2,5); int I3=t.mul(A,B,C); t.trace(START,I3); } } public int add(Integer a,Integer b){ int rst=a+b; try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { // TODO } System.out.println(Thread.currentThread().getName()+ ": + "+a+" "+b+" => "+rst); return rst; } public int mul(Integer a,Integer b,Integer c){ int rst=a*b*c; try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { // TODO } System.out.println(Thread.currentThread().getName()+ ": * "+a+" "+b+" "+c+" => "+rst); return rst; } public void trace(Long start,Object o){ System.out.println(o+" | time cost : "+(System.currentTimeMillis()-start)+" | freeM/totalM : "+Runtime.getRuntime().freeMemory()+"/"+Runtime.getRuntime().totalMemory()); }}
帮助类
Var.java
/* * Created on 2008-9-9 * Title: * Description: * @author Gordon Hu * @version 1.0 */package gordon.concurrence;import java.util.HashSet;import java.util.Set;public abstract class Var implements Runnable { public abstract Func getFunc(); public abstract void run(); public abstract void update(Var n,Object rst); private Set<Var> updater=new HashSet<Var>(); public void setUpdater(Set<Var> updater) { this.updater.addAll(updater); } public void addUpdater(Var updater){ this.updater.add(updater); } public Set<Var> getUpdater() { return updater; }}
每个Var其实就是一个方法运行时候所需要的context,包括input parameters和return结果要调用的callback
Threading.java
/* * Created on 2008-9-9 * Title: * Description: * @author Gordon Hu * @version 1.0 */package gordon.concurrence;public class Threading { public static void runThread(Runnable r){ new Thread(r).start(); }}
这个类就是所有异步调用的底层设施了,我这里就直接new thread了,可以用threadpool之类的,也可以用grid computation,比如coherence之类的
接下来这个Func其实就是些反射动态调用并且暴露一个统一的类似function的接口。
动态调用里面我用了方法名字匹配(不包括参数类型,只要名字符合的第一个方法就被调用,这样主要是因为本人倾向可变长参数,不喜欢overloading,所以像System.out对象里面的println方法就不能直接用)。另外还负责了生成一个Var。
Func.java
/* * Created on 2008-9-9 * Title: * Description: * @author Gordon Hu * @version 1.0 */package gordon.concurrence;import java.util.ArrayList;import java.util.List;public abstract class Func { public abstract Object apply(Object... args); public Var c(final Object... args){ final Func self=this; Var n=new Var(){ //....create a Var and bind args to it //....具体实现省略,详情看附件中的src }; //set callbacks for(Object o:args){ if(o instanceof Var)((Var)o).addUpdater(n); } return n; } public static Func define(final Object worker,final String method){ //define a Func from a given object and its method name return new Func(){ public Object apply(Object... args) { return run(worker,method,args); } }; } public static Runnable getUpdateThread(final Var updater,final Var node,final Object rst){ return new Runnable(){ public void run(){ updater.update(node,rst); } }; } public static Object run(Object worker, String action, Object... arguments) { //dynamically call run a method //execute worker.action(arguments) and return result //....具体实现省略,详情看附件中的src }} |
|