xiaoshan5634 发表于 2013-2-1 12:51:30

tomcat线程池的实现

Tomcat的线程池主要使用了ThreadPool.java、ThreadPoolRunnable.java、ThreadWithAttributes.java,其中ThreadPoolRunnable.java是一个接口,所有的需要使用线程池的服务都可以实现这个接口。而实现的核心则再ThreadPool.java中的两个内部类:MonitorRunnable.java和ControlRunnable.java。
MonitorRunnable.java在线程池启动之后定期(60s)的扫描线程数,如果空闲的线程大于最大空闲线程数,则结束多余的线程。
ControlRunnable.java是所有启动的线程,若由任务需要执行,ThreadPool会先找一个空闲的ControlRunnable来执行,若没有空闲的,则创建,若现有的busy线程已经达到最大值,则等待。任务执行结束后通知ControlRunnable继续wait,直到有任务执行或被MonitorRunnable回收。
若要使用线程池可以实现Runnable接口,或者可以实现ThreadPoolRunnable 接口,当然自己还可以扩展这个类,以便实现更多的使用线程池的方式。
ThreadPool.java

package com.xiao.tomcatthreadpool;import java.util.Hashtable;public class ThreadPool {public static final int MAX_THREADS = 10;    public static final int MAX_THREADS_MIN = 4;    public static final int MAX_SPARE_THREADS = 5;    public static final int MIN_SPARE_THREADS = 2;    public static final int WORK_WAIT_TIMEOUT = 10*1000;      private String name = "TP";    private boolean isDaemon;    private boolean stopThePool;    private int maxSpareThreads;    private int minSpareThreads;    private int currentThreadCount;    private int currentThreadsBusy;    private int maxThreads;      private int threadPriority = Thread.NORM_PRIORITY;    private int sequence = 0;      private ControlRunnable[] pool;    private MonitorRunnable monitor;      protected Hashtable threads=new Hashtable();      public ThreadPool() {    maxThreads = MAX_THREADS;    maxSpareThreads = MAX_SPARE_THREADS;    minSpareThreads = MIN_SPARE_THREADS;    currentThreadCount = 0;    currentThreadsBusy = 0;    stopThePool = false;    }      public static ThreadPool createThreadPool() {    return new ThreadPool();    }      public synchronized void start() {    stopThePool = false;    currentThreadCount = 0;    currentThreadsBusy = 0;    pool = new ControlRunnable;    openThreads(minSpareThreads);    if (maxSpareThreads < maxThreads) {            monitor = new MonitorRunnable(this);      }    }      public void run(Runnable r) {    ControlRunnable c = findControlRunnable();      c.runIt(r);    }      public void runIt(ThreadPoolRunnable r) {    if(null == r) {            throw new NullPointerException();      }      ControlRunnable c = findControlRunnable();      c.runIt(r);    }      public ControlRunnable findControlRunnable() {    ControlRunnable c;    if ( stopThePool ) {            throw new IllegalStateException();      }    synchronized(this) {    while (currentThreadsBusy == currentThreadCount) {    if (currentThreadCount < maxThreads) {                  int toOpen = currentThreadCount + minSpareThreads;                  openThreads(toOpen);                } else {                try {                        this.wait();                  }                  catch(InterruptedException e) {                                        }                }    }    if(0 == currentThreadCount || stopThePool) {                throw new IllegalStateException();            }    int pos = currentThreadCount - currentThreadsBusy - 1;            c = pool;            pool = null;            currentThreadsBusy++;    }    return c;    }      public void openThreads(int toOpen) {    if(toOpen > maxThreads) {    toOpen = maxThreads;    }    for(int i=currentThreadCount; i<toOpen; i++) {    pool = new ControlRunnable(this);    }    currentThreadCount = toOpen;    }      public void addThread(ThreadWithAttributes t, ControlRunnable r) {    threads.put(t, r);    }      public void removeThread(Thread t) {    threads.remove(t);    }      public synchronized void notifyThreadEnd(ControlRunnable r) {    currentThreadCount --;    currentThreadsBusy --;    notify();    }      public synchronized void returnController(ControlRunnable r) {    if(0 == currentThreadCount || stopThePool) {            r.terminate();            return;      }    currentThreadsBusy--;    pool = r;      notify();    }      public synchronized void checkSpareControllers() {    if(stopThePool) {            return;      }    if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {    int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads;    for(int i=0; i<toFree; i++) {    ControlRunnable cr = pool;    cr.terminate();    pool = null;    currentThreadCount --;    }    }      }      /**   * MonitorRunnable主要任务是监控线程数   * 如果线程数大于最大线程则回收线程   */    public static class MonitorRunnable implements Runnable {    ThreadPool tp;    Thread t;    boolean shouldTerminate;    int interval = WORK_WAIT_TIMEOUT;    public MonitorRunnable(ThreadPool tp) {    this.tp = tp;    this.start();    }      public void setInterval(int i) {    interval = i;    }    public void start() {    shouldTerminate = false;    t = new Thread(this);    t.setDaemon(tp.getDaemon());      t.setName(tp.getName() + "-Monitor");    t.start();    }      public void stop() {    terminal();    }      public synchronized void terminal() {    this.shouldTerminate = true;    this.notify();    }      public void run() {    while(true) {    try {    synchronized(this) {    this.wait(interval);    }    if(shouldTerminate) {                        break;                  }    //System.out.println("currentThreadCount=" + currentThreadCount + " currentThreadsBusy=" + currentThreadsBusy + " ");    tp.checkSpareControllers();    } catch(InterruptedException e) {      }    }    }    }      public static class ControlRunnable implements Runnable {    private ThreadPool tp;    private boolean shouldTerminate;    private ThreadWithAttributes   t;      private ThreadPoolRunnable   toRun;      private Runnable toRunRunnable;      private boolean    shouldRun;      public ControlRunnable(ThreadPool tp) {    toRun = null;    shouldTerminate = false;            shouldRun = false;            this.tp = tp;            t = new ThreadWithAttributes(tp, this);            t.setDaemon(true);            t.setName(tp.getName() + "-Processor" + tp.incSequence());            t.setPriority(tp.getThreadPriority());            tp.addThread(t, this);            t.start();    }      public void run() {    boolean _shouldRun = false;            boolean _shouldTerminate = false;            ThreadPoolRunnable _toRun = null;            try {            while(true) {            try {            synchronized(this) {            System.out.println("shouldRun=" + shouldRun);            while(!shouldRun && !shouldTerminate) {            this.wait();            }            _shouldRun = shouldRun;                            _shouldTerminate = shouldTerminate;                            _toRun = toRun;            }            if (_shouldTerminate)                            break;            try {            if(_shouldRun) {            if (_toRun != null) {            _toRun.runIt(t.getThreadData(tp));            } else if (toRunRunnable != null) {            toRunRunnable.run();            } else {            }            }            } catch(Throwable r) {            _shouldTerminate = true;                            _shouldRun = false;                            tp.notifyThreadEnd(this);            } finally {            if(_shouldRun) {            shouldRun = false;            tp.returnController(this);            }                        }            if (_shouldTerminate) {                            break;                        }            } catch(InterruptedException e) {                        }            }            } finally {            tp.removeThread(Thread.currentThread());            }                }      public synchronized void runIt(Runnable toRun) {      this.toRunRunnable = toRun;            shouldRun = true;            this.notify();      }      public synchronized void runIt(ThreadPoolRunnable toRun) {      this.toRun = toRun;            shouldRun = true;            this.notify();      }      public void stop() {            this.terminate();      }      public void kill() {            t.stop();      }      public synchronized void terminate() {    shouldTerminate = true;    this.notify();    }    }      public String getName() {    return name;    }      public boolean getDaemon() {    return isDaemon;    }public int getThreadPriority() {return threadPriority;}public int incSequence() {return sequence ++;}public void setThreadPriority(int threadPriority) {this.threadPriority = threadPriority;}} ThreadWithAttributes.java

package com.xiao.tomcatthreadpool;import java.util.Hashtable;public class ThreadWithAttributes extends Thread {      private Object control;    public static int MAX_NOTES=16;    private Object notes[]=new Object;    private Hashtable attributes=new Hashtable();    private String currentStage;    private Object param;      private Object thData[];    public ThreadWithAttributes(Object control, Runnable r) {      super(r);      this.control=control;    }      public final Object[] getThreadData(Object control ) {      return thData;    }      public final void setThreadData(Object control, Object thData[] ) {      this.thData=thData;    }    public final void setNote( Object control, int id, Object value ) {      if( this.control != control ) return;      notes=value;    }    public final String getCurrentStage(Object control) {      if( this.control != control ) return null;      return currentStage;    }    /** Information about the current request ( or the main object   * we are processing )   */    public final Object getParam(Object control) {      if( this.control != control ) return null;      return param;    }    public final void setCurrentStage(Object control, String currentStage) {      if( this.control != control ) return;      this.currentStage = currentStage;    }    public final void setParam( Object control, Object param ) {      if( this.control != control ) return;      this.param=param;    }    public final Object getNote(Object control, int id ) {      if( this.control != control ) return null;      return notes;    }    public final Hashtable getAttributes(Object control) {      if( this.control != control ) return null;      return attributes;    }} ThreadPoolRunnable.java

package com.xiao.tomcatthreadpool;public interface ThreadPoolRunnable {public Object[] getInitData();public void runIt(Object thData[]);}
页: [1]
查看完整版本: tomcat线程池的实现