Java priority thread pool executor


首先要了解的是线程池(ThreadPoolExecutor)的设计

线程池最重要的一个属性 workQueue,没有提交运行的任务,都放入了这个队列当中,所以,只要控住了这个队列即可。常用的队列有:

  • SynchronousQueue
  • LinkedBlockingDeque
  • ArrayBlockingQueue

默认没有基于 PriorityBlockingQueue 实现的 ThreadPool. 下面描述一下自己开发一个基于 PriorityBlockingQueue 队列的优先级线程池步骤。

定义 PriorityThreadPoolExecutor

public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

    //省略 

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new PriorityFutureTask(runnable, value);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new PriorityFutureTask(callable);
    }

}

为什么需要这样的一个类,来继承 ThreadPoolExecutor ? 可以看看这篇文章ThreadPoolExecutor的PriorityBlockingQueue类型转化问题

在执行器 submit 的时候,会通过 newTaskFor 来创建 RunnableFuture. 覆写这两个方法,是为了用上自定义的 FutureTask.

public class PriorityFutureTask<V>
        extends FutureTask<V> implements Comparable<PriorityFutureTask<V>> {

    private Object object;

    public PriorityFutureTask(Callable<V> callable) {
        super(callable);
        object = callable;
    }

    public PriorityFutureTask(Runnable runnable, V result) {
        super(runnable, result);
        object = runnable;
    }

    @Override
    public int compareTo(PriorityFutureTask<V> o) {
        if (this == o) {
            return 0;
        }
        if (o == null) {
            return -1; // high priority
        }
        if (object != null && o.object != null) {
            if ((object instanceof Comparable)) {
                return ((Comparable) object).compareTo(o.object);
            }
        }
        return 0;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        } else if (!(obj instanceof PriorityFutureTask)) {
            return false;
        } else {
            PriorityFutureTask<V> c = (PriorityFutureTask<V>) obj;
            return object.equals(c.object);
        }
    }

    @Override
    public int hashCode() {
        return object.hashCode();
    }

    public Object getObject() {
        return object;
    }

    public void setObject(Object object) {
        this.object = object;
    }
}

该方法主要是保存了构造方法传入的 Runnable 并实现了排序器。因为 workQueue 存的就是这个对象,如果想要调用执行器的 remove 方法从执行器中的 workQueue 移除待提交任务,就需要对传入的这个 Runnable 对象做一些处理

public class MiniTask extends AbstractTask<Integer> implements Runnable, Comparable {

    private Mini mini;

    public MiniTask(Mini mini) {
        this.mini = mini;
        this.priority = 1;
    }
    public MiniTask(Mini mini, Integer priority) {
        this.mini = mini;
        this.priority = priority;
    }

    public void run() {
        //to do
        System.out.println("Running - [" + mini.getMessage() + "], priority is [" + priority + "]" + " --- " + System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public int hashCode() {
        return mini.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        } else if (obj instanceof PriorityFutureTask) {
            PriorityFutureTask p = (PriorityFutureTask) obj;
            return this.equals(p.getObject());
        } else if (obj instanceof MiniTask) {
            MiniTask t = (MiniTask) obj;
            return mini.equals(t.mini);
        }
        return false;
    }

    public int compareTo(Object o) {
        if (o == null) {
            return -1;
        } else if (!(o instanceof MiniTask)) {
            return -1;
        }
        MiniTask m = (MiniTask) o;
        return Integer.compare(priority, m.priority);
    }
}

这个类的重点在 equal 方法,这里面判断了 MiniTaskPriorityFutureTask 的 equal 关系。

点击查看项目源码 通过运行 App 来看最终效果, 日志如下

Running - [A02], priority is [5] --- 1534225081301
Remove v6: true
Running - [A01], priority is [1] --- 1534225081301
Executor core pool size: 1
Running - [A04], priority is [8] --- 1534225083308
Running - [A12], priority is [8] --- 1534225085309
Running - [A10], priority is [8] --- 1534225087309
Running - [A08], priority is [5] --- 1534225089310
Running - [A11], priority is [4] --- 1534225091310
Running - [A13], priority is [4] --- 1534225093310
Running - [A05], priority is [4] --- 1534225095310
Running - [A09], priority is [3] --- 1534225097311
Running - [A03], priority is [3] --- 1534225099311
Running - [A07], priority is [1] --- 1534225101311