tomcat线程池的实现
Tomcat的线程池主要使用了ThreadPool.java、ThreadPoolRunnable.java、ThreadWithAttributes.java,其中ThreadPoolRunnable.java是一个接口,所有的需要使用线程池的服务都可以实现这个接口。而实现的核心则再ThreadPool.java中的两个内部类:MonitorRunnable.java和ControlRunnable.java。MonitorRunnable.java在线程池启动之后定期(60s)的扫描线程数,如果空闲的线程大于最大空闲线程数,则结束多余的线程。
ControlRunnable.java是所有启动的线程,若由任务需要执行,ThreadPool会先找一个空闲的ControlRunnable来执行,若没有空闲的,则创建,若现有的busy线程已经达到最大值,则等待。任务执行结束后通知ControlRunnable继续wait,直到有任务执行或被MonitorRunnable回收。
若要使用线程池可以实现Runnable接口,或者可以实现ThreadPoolRunnable 接口,当然自己还可以扩展这个类,以便实现更多的使用线程池的方式。
ThreadPool.java
package com.xiao.tomcatthreadpool;import java.util.Hashtable;public class ThreadPool {public static final int MAX_THREADS = 10; public static final int MAX_THREADS_MIN = 4; public static final int MAX_SPARE_THREADS = 5; public static final int MIN_SPARE_THREADS = 2; public static final int WORK_WAIT_TIMEOUT = 10*1000; private String name = "TP"; private boolean isDaemon; private boolean stopThePool; private int maxSpareThreads; private int minSpareThreads; private int currentThreadCount; private int currentThreadsBusy; private int maxThreads; private int threadPriority = Thread.NORM_PRIORITY; private int sequence = 0; private ControlRunnable[] pool; private MonitorRunnable monitor; protected Hashtable threads=new Hashtable(); public ThreadPool() { maxThreads = MAX_THREADS; maxSpareThreads = MAX_SPARE_THREADS; minSpareThreads = MIN_SPARE_THREADS; currentThreadCount = 0; currentThreadsBusy = 0; stopThePool = false; } public static ThreadPool createThreadPool() { return new ThreadPool(); } public synchronized void start() { stopThePool = false; currentThreadCount = 0; currentThreadsBusy = 0; pool = new ControlRunnable; openThreads(minSpareThreads); if (maxSpareThreads < maxThreads) { monitor = new MonitorRunnable(this); } } public void run(Runnable r) { ControlRunnable c = findControlRunnable(); c.runIt(r); } public void runIt(ThreadPoolRunnable r) { if(null == r) { throw new NullPointerException(); } ControlRunnable c = findControlRunnable(); c.runIt(r); } public ControlRunnable findControlRunnable() { ControlRunnable c; if ( stopThePool ) { throw new IllegalStateException(); } synchronized(this) { while (currentThreadsBusy == currentThreadCount) { if (currentThreadCount < maxThreads) { int toOpen = currentThreadCount + minSpareThreads; openThreads(toOpen); } else { try { this.wait(); } catch(InterruptedException e) { } } } if(0 == currentThreadCount || stopThePool) { throw new IllegalStateException(); } int pos = currentThreadCount - currentThreadsBusy - 1; c = pool; pool = null; currentThreadsBusy++; } return c; } public void openThreads(int toOpen) { if(toOpen > maxThreads) { toOpen = maxThreads; } for(int i=currentThreadCount; i<toOpen; i++) { pool = new ControlRunnable(this); } currentThreadCount = toOpen; } public void addThread(ThreadWithAttributes t, ControlRunnable r) { threads.put(t, r); } public void removeThread(Thread t) { threads.remove(t); } public synchronized void notifyThreadEnd(ControlRunnable r) { currentThreadCount --; currentThreadsBusy --; notify(); } public synchronized void returnController(ControlRunnable r) { if(0 == currentThreadCount || stopThePool) { r.terminate(); return; } currentThreadsBusy--; pool = r; notify(); } public synchronized void checkSpareControllers() { if(stopThePool) { return; } if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) { int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads; for(int i=0; i<toFree; i++) { ControlRunnable cr = pool; cr.terminate(); pool = null; currentThreadCount --; } } } /** * MonitorRunnable主要任务是监控线程数 * 如果线程数大于最大线程则回收线程 */ public static class MonitorRunnable implements Runnable { ThreadPool tp; Thread t; boolean shouldTerminate; int interval = WORK_WAIT_TIMEOUT; public MonitorRunnable(ThreadPool tp) { this.tp = tp; this.start(); } public void setInterval(int i) { interval = i; } public void start() { shouldTerminate = false; t = new Thread(this); t.setDaemon(tp.getDaemon()); t.setName(tp.getName() + "-Monitor"); t.start(); } public void stop() { terminal(); } public synchronized void terminal() { this.shouldTerminate = true; this.notify(); } public void run() { while(true) { try { synchronized(this) { this.wait(interval); } if(shouldTerminate) { break; } //System.out.println("currentThreadCount=" + currentThreadCount + " currentThreadsBusy=" + currentThreadsBusy + " "); tp.checkSpareControllers(); } catch(InterruptedException e) { } } } } public static class ControlRunnable implements Runnable { private ThreadPool tp; private boolean shouldTerminate; private ThreadWithAttributes t; private ThreadPoolRunnable toRun; private Runnable toRunRunnable; private boolean shouldRun; public ControlRunnable(ThreadPool tp) { toRun = null; shouldTerminate = false; shouldRun = false; this.tp = tp; t = new ThreadWithAttributes(tp, this); t.setDaemon(true); t.setName(tp.getName() + "-Processor" + tp.incSequence()); t.setPriority(tp.getThreadPriority()); tp.addThread(t, this); t.start(); } public void run() { boolean _shouldRun = false; boolean _shouldTerminate = false; ThreadPoolRunnable _toRun = null; try { while(true) { try { synchronized(this) { System.out.println("shouldRun=" + shouldRun); while(!shouldRun && !shouldTerminate) { this.wait(); } _shouldRun = shouldRun; _shouldTerminate = shouldTerminate; _toRun = toRun; } if (_shouldTerminate) break; try { if(_shouldRun) { if (_toRun != null) { _toRun.runIt(t.getThreadData(tp)); } else if (toRunRunnable != null) { toRunRunnable.run(); } else { } } } catch(Throwable r) { _shouldTerminate = true; _shouldRun = false; tp.notifyThreadEnd(this); } finally { if(_shouldRun) { shouldRun = false; tp.returnController(this); } } if (_shouldTerminate) { break; } } catch(InterruptedException e) { } } } finally { tp.removeThread(Thread.currentThread()); } } public synchronized void runIt(Runnable toRun) { this.toRunRunnable = toRun; shouldRun = true; this.notify(); } public synchronized void runIt(ThreadPoolRunnable toRun) { this.toRun = toRun; shouldRun = true; this.notify(); } public void stop() { this.terminate(); } public void kill() { t.stop(); } public synchronized void terminate() { shouldTerminate = true; this.notify(); } } public String getName() { return name; } public boolean getDaemon() { return isDaemon; }public int getThreadPriority() {return threadPriority;}public int incSequence() {return sequence ++;}public void setThreadPriority(int threadPriority) {this.threadPriority = threadPriority;}} ThreadWithAttributes.java
package com.xiao.tomcatthreadpool;import java.util.Hashtable;public class ThreadWithAttributes extends Thread { private Object control; public static int MAX_NOTES=16; private Object notes[]=new Object; private Hashtable attributes=new Hashtable(); private String currentStage; private Object param; private Object thData[]; public ThreadWithAttributes(Object control, Runnable r) { super(r); this.control=control; } public final Object[] getThreadData(Object control ) { return thData; } public final void setThreadData(Object control, Object thData[] ) { this.thData=thData; } public final void setNote( Object control, int id, Object value ) { if( this.control != control ) return; notes=value; } public final String getCurrentStage(Object control) { if( this.control != control ) return null; return currentStage; } /** Information about the current request ( or the main object * we are processing ) */ public final Object getParam(Object control) { if( this.control != control ) return null; return param; } public final void setCurrentStage(Object control, String currentStage) { if( this.control != control ) return; this.currentStage = currentStage; } public final void setParam( Object control, Object param ) { if( this.control != control ) return; this.param=param; } public final Object getNote(Object control, int id ) { if( this.control != control ) return null; return notes; } public final Hashtable getAttributes(Object control) { if( this.control != control ) return null; return attributes; }} ThreadPoolRunnable.java
package com.xiao.tomcatthreadpool;public interface ThreadPoolRunnable {public Object[] getInitData();public void runIt(Object thData[]);}
页:
[1]