java

导航

教你一个可直接套用的线程池实例

来源 :中华考试网 2020-12-02

  该实例来源于Apache项目源代码,源程序有800余行,功能比较全 面,而且是非常完善的,并且运行于诸多服务器如tomcat上,就是分析起来有点繁琐。如果开发人员直接把这段程序拿来修改后使用到自己的开发项目中,不失为拿来主义的上策。

  本文对该源程序进行了修改和简化,对其中核 心部分进行分析,然后创建测试类进行测试。读者学习之后可以直接模仿和套用,而不需要花费大量时间自己亲自再去写线程池程序了。

  关键的程序有2个:线程池类(ThreadPool)和线程池接口(ThreadPoolRunnable)。

  首先看较复杂的线程池类程序,文件名为ThreadPool.java,有200余行,需要读者有耐心,内容如下:

  ---------------------------------------------------------------------------------------

  import java.util.Vector;

  public class ThreadPool {

  public static final int MAX_THREADS = 100;

  public static final int MAX_SPARE_THREADS = 50;

  public static final int MIN_SPARE_THREADS = 10;

  public static final int WORK_WAIT_TIMEOUT = 60 * 1000;

  protected Vector pool;

  protected MonitorRunnable monitor;

  protected int maxThreads;

  protected int minSpareThreads;

  protected int maxSpareThreads;

  protected int currentThreadCount;

  protected int currentThreadsBusy;

  填写下面表单即可预约申请免费试听java课程!害怕学不会?助教陪读,随时解惑!担心就业?一地学习,可全国推荐就业!

  protected boolean stopThePool;

  public ThreadPool() {

  maxThreads = MAX_THREADS;

  maxSpareThreads = MAX_SPARE_THREADS;

  minSpareThreads = MIN_SPARE_THREADS;

  currentThreadCount = 0;

  currentThreadsBusy = 0;

  stopThePool = false;

  }

  public synchronized void start() {

  adjustLimits();

  openThreads(minSpareThreads);

  monitor = new MonitorRunnable(this);

  }

  public void setMaxThreads(int maxThreads) {

  this.maxThreads = maxThreads;

  }

  public int getMaxThreads() {

  return maxThreads;

  }

  public void setMinSpareThreads(int minSpareThreads) {

  this.minSpareThreads = minSpareThreads;

  }

  public int getMinSpareThreads() {

  return minSpareThreads;

  }

  public void setMaxSpareThreads(int maxSpareThreads) {

  this.maxSpareThreads = maxSpareThreads;

  }

  public int getMaxSpareThreads() {

  return maxSpareThreads;

  }

  public void runIt(ThreadPoolRunnable r) {

  if (null == r) {

  throw new NullPointerException();

  }

  if (0 == currentThreadCount || stopThePool) {

  throw new IllegalStateException();

  }

  ControlRunnable c = null;

  synchronized (this) {

  if (currentThreadsBusy == currentThreadCount) {

  if (currentThreadCount < maxThreads) {

  int toOpen = currentThreadCount + minSpareThreads;

  openThreads(toOpen);

  } else {

  while (currentThreadsBusy == currentThreadCount) {

  try {

  this.wait();

  }catch (InterruptedException e) {

  }

  if (0 == currentThreadCount || stopThePool) {

  throw new IllegalStateException();

  }

  }

  }

  }

  c = (ControlRunnable) pool.lastElement();

  pool.removeElement(c);

  currentThreadsBusy++;

  }

  c.runIt(r);

  }

  public synchronized void shutdown() {

  if (!stopThePool) {

  stopThePool = true;

  monitor.terminate();

  monitor = null;

  for (int i = 0; i < (currentThreadCount - currentThreadsBusy); i++) {

  try {

  ((ControlRunnable) (pool.elementAt(i))).terminate();

  } catch (Throwable t) {

  }

  }

  currentThreadsBusy = currentThreadCount = 0;

  pool = null;

  notifyAll();

  }

  }

  protected synchronized void checkSpareControllers() {

  if (stopThePool) {

  return;

  }

  if ((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {

  int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads;

  for (int i = 0; i < toFree; i++) {

  ControlRunnable c = (ControlRunnable) pool.firstElement();

  pool.removeElement(c);

  c.terminate();

  currentThreadCount--;

  }

  }

  }

  protected synchronized void returnController(ControlRunnable c) {

  if (0 == currentThreadCount || stopThePool) {

  c.terminate();

  return;

  }

  currentThreadsBusy--;

  pool.addElement(c);

  notify();

  }

  protected synchronized void notifyThreadEnd() {

  currentThreadsBusy--;

  currentThreadCount--;

  notify();

  openThreads(minSpareThreads);

  }

  protected void adjustLimits() {

  if (maxThreads <= 0) {

  maxThreads = MAX_THREADS;

  }

  if (maxSpareThreads >= maxThreads) {

  maxSpareThreads = maxThreads;

  }

  if (maxSpareThreads <= 0) {

  if (1 == maxThreads) {

  maxSpareThreads = 1;

  } else {

  maxSpareThreads = maxThreads / 2;

  }

  }

  if (minSpareThreads > maxSpareThreads) {

  minSpareThreads = maxSpareThreads;

  }

  if (minSpareThreads <= 0) {

  if (1 == maxSpareThreads) {

  minSpareThreads = 1;

  } else {

  minSpareThreads = maxSpareThreads / 2;

  }

  }

  }

  protected void openThreads(int toOpen) {

  if (toOpen > maxThreads) {

  toOpen = maxThreads;

  }

  if (0 == currentThreadCount) {

  pool = new Vector(toOpen);

  }

  for (int i = currentThreadCount; i < toOpen; i++) {

  pool.addElement(new ControlRunnable(this));

  }

  currentThreadCount = toOpen;

  }

  class MonitorRunnable implements Runnable {

  ThreadPool p;

  Thread t;

  boolean shouldTerminate;

  MonitorRunnable(ThreadPool p) {

  shouldTerminate = false;

  this.p = p;

  t = new Thread(this);

  t.start();

  }

  public void run() {

  while (true) {

  try {

  synchronized (this) {

  this.wait(WORK_WAIT_TIMEOUT);

  }

  if (shouldTerminate) {

  break;

  }

  p.checkSpareControllers();

  } catch (Throwable t) {

  t.printStackTrace();

  }

  }

  }

  public synchronized void terminate() {

  shouldTerminate = true;

  this.notify();

  }

  }

  class ControlRunnable implements Runnable {

  ThreadPool p;

  Thread t;

  ThreadPoolRunnable toRun;

  boolean shouldTerminate;

  boolean shouldRun;

  boolean noThData;

  Object thData[] = null;

  ControlRunnable(ThreadPool p) {

  toRun = null;

  shouldTerminate = false;

  shouldRun = false;

  this.p = p;

  t = new Thread(this);

  t.start();

  noThData = true;

  thData = null;

  }

  public void run() {

  while (true) {

  try {

  synchronized (this) {

  if (!shouldRun && !shouldTerminate) {

  this.wait();

  }

  }

  if (shouldTerminate) {

  break;

  }

  try {

  if (noThData) {

  thData = toRun.getInitData();

  noThData = false;

  }

  if (shouldRun) {

  toRun.runIt(thData);

  }

  } catch (Throwable t) {

  System.err.println("ControlRunnable Throwable: ");

  t.printStackTrace();

  shouldTerminate = true;

  shouldRun = false;

  p.notifyThreadEnd();

  } finally {

  if (shouldRun) {

  shouldRun = false;

  p.returnController(this);

  }

  }

  if (shouldTerminate) {

  break;

  }

  } catch (InterruptedException ie) {

  }

  }

  }

  public synchronized void runIt(ThreadPoolRunnable toRun) {

  if (toRun == null) {

  throw new NullPointerException("No Runnable");

  }

  this.toRun = toRun;

  shouldRun = true;

  this.notify();

  }

  public synchronized void terminate() {

  shouldTerminate = true;

  this.notify();

  }

  }

  }

  ---------------------------------------------------------------------------------------

  以上程序中,关键的是openThreads方法、runIt方法以及2个内部类:MonitorRunnable和ControlRunnable。

  刚开始运行的时候,线程池会往Vector对象里装入minSpareThreads个元素,每个元素都是ControlRunnable线程 类,ControlRunnable类在其构造方法中启动线程。如果shouldRun和shouldTerminate都是false的话,线程就等 待。如果shouldRun为true,就调用ThreadPoolRunnable的runIt(Object[])方法,该接口的方法就是我们需要在 自己的任务类中覆盖的方法。

  如果minSpareThreads个线程都处于Busy后,线程池会再创建出minSpareThreads个线程。MonitorRunnable是 用来监视线程池运行情况的,其线程间隔60秒(WORK_WAIT_TIMEOUT)调用一次线程池类的checkSpareControllers方 法,如果发现(currentThreadCount - currentThreadsBusy) > maxSpareThreads,就会调用ControlRunnable类的terminate方法删除空闲线程,准备删除的线程是否空闲是通过 shouldTerminate参数来判断的。

  线程池接口ThreadPoolRunnable有2个空方法getInitData和runIt,我们一般自己创建一个任务类实现这个线程池接口就可以了,把具体的任务内容放在任务类的runIt方法中。如果不想用getInitData,就让它返回null值。

  线程池接口程序很简单,文件名为ThreadPoolRunnable.java,就几行,内容如下:

  ---------------------------------------------------------------------------------------------

  public interface ThreadPoolRunnable {

  public Object[] getInitData();

  public void runIt(Object thData[]);

  }

  ---------------------------------------------------------------------------------------------

  线程池类和线程池接口都已经说完,下面就举个例子说说怎么使用它们了。

  我们的任务还是扫描端口(请参考我的“多线程程序模型研究”),文件名为TestThreadPool.java,内容如下:

  ---------------------------------------------------------------------------------------------

  import java.net.InetAddress;

  import java.net.Socket;

  public class TestThreadPool {

  public static void main(String[] args) {

  String host = null; //第一个参数,目标主机。

  int beginport = 1; //第二个参数,开始端口。

  int endport = 65535; //第三个参数,结束端口。

  try{

  host = args[0];

  beginport = Integer.parseInt(args[1]);

  endport = Integer.parseInt(args[2]);

  if(beginport <= 0 || endport >= 65536 || beginport > endport){

  throw new Exception("Port is illegal");

  }

  }catch(Exception e){

  System.out.println("Usage: java PortScannerSingleThread host beginport endport");

  System.exit(0);

  }

  ThreadPool tp = new ThreadPool();

  tp.setMaxThreads(100);

  tp.setMaxSpareThreads(50);

  tp.setMinSpareThreads(10);

  tp.start();

  for(int i = beginport; i <= endport; i++){

  Task task = new Task(host,i);

  tp.runIt(task);

  }

  }

  }

  class Task implements ThreadPoolRunnable{

  String host;

  int port;

  Task(String host, int port){

  this.host = host;

  this.port = port;

  }

  public Object[] getInitData(){

  return null;

  }

  public void runIt(Object thData[]){

  Socket s = null;

  try{

  s = new Socket(InetAddress.getByName(host),port);

  System.out.println("The port " + port + " at " + host + " is open.");

  }catch(Exception e){

  }finally{

  try{

  if(s != null) s.close();

  }catch(Exception e){

  }

  }

  }

  }

  -----------------------------------------------------------------------------------------------

  在TestThreadPool类的main方法中定义了三个参数,分别是目标主机IP地址,开始端口和结束端口。然后通过new ThreadPool()创建线程池类,并通过setMaxThreads、setMaxSpareThreads和 setMinSpareThreads设置线程池的maxThreads、maxSpareThreads和minSpareThreads参数。 Task类通过实现ThreadPoolRunnable接口,在runIt中定义了具体内容(创建Socket对象达到扫描端口的目的)。

  以上程序都在JDK1.4.2的环境下编译并运行通过,输入 java TestThreadPool 10.1.1.182 1 10000 运行得出如下结果:

  The port 25 at 10.1.1.182 is open.

  The port 110 at 10.1.1.182 is open.

  The port 135 at 10.1.1.182 is open.

  ...

  通过以上的线程池类、线程池接口的分析和介绍,读者可以在理解的基础上,直接把这个线程池类和接口拿来,应用到自己的开发项目中。


  如果你现在想学习Java,赢取高薪工作机会,非常简单,填写下面信息,学好Java技术高薪工作机会唾手可得。

分享到

您可能感兴趣的文章