半夏的博客

liangbogopher's blog

再谈Java并发

关于java线程池中的 allowCoreThreadTimeOut 和 Guava Futures的使用。

之前写过一篇关于java线程池的文章:Java线程池和队列

要做异步任务执行队列,有个如下需求:

1) 有个线程池用于执行任务

2) 有个无界队列,用于缓存未执行的任务

3) 没有任务执行时,我希望线程池中的线程停掉

这看似是个很正常的需求,但是用JDK1.5(我的工作地方JDK还是1.5的)实现,真得很困难的。

ThreadPoolExecutor中线程池有 corePoolSize 和 maximumPoolSize 两个参数。JDK1.5 中线程池至少保持 corePoolSize 的线程,所以为了满足上面的需求,corePoolSize必须被设置为0。这时,如果JDK1.5中队列不满的话,是不会创建大于corePoolSize大小的线程数的。也就是,corePoolSize为0时,队列满了,才会创建新的线程,这显然不满足我的需求。

在JDK1.6,发现ThreadPoolExecutor多了一个allowCoreThreadTimeOut方法。这个方法是允许线程数低于corePoolSize时,线程也因为空闲而终止。有了这个方法,实现上面的需求就非常简单了。将 corePoolSize 和 maximumPoolSize 设置为相同的大小,allowCoreThreadTimeOut设置为true,加上一个无界队列,就OK了。

allowCoreThreadTimeOut源码

	/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //allowCoreThreadTimeOut为ture,则timed始终为true
            //此时只要timeout时间一过,则会销毁线程,则线程数可能会销毁至0
            //效果跟cachedThreadPool一致
			  
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

参数说明

allowCoreThreadTimeOut为true时,则线程池数量最后可以销毁到0个。 allowCoreThreadTimeOut为false时:超过核心线程数时,而且(超过最大值或者超过timeout),就会销毁。

没有timeout值的时候始终保持在maxPoolSize水平;

如果有timeout情况,那么保持在corePoolSize水平。默认是cachedThreadPool才会设置timeout为60秒,其他Executors造出来的timeout为0,即没有timeout。而cachedThreadPool的corePoolSize为0,即cachedThreadPool,最后线程数量为0.
(1)当没有超过核心线程时,不会销毁线程
(2)当超过核心线程数:再判断,如果超过最大值,则销毁;如果超过timeout,则销毁

线程池执行服务对象

import com.google.common.base.Joiner;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.*;

public final class ExecutorServiceObject {

    private final ThreadPoolExecutor threadPoolExecutor;

    private final BlockingQueue<Runnable> workQueue;

    public ExecutorServiceObject(final String namingPattern, final int threadSize) {
        workQueue = new LinkedBlockingQueue<>();
        threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, workQueue,
                new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }

    /**
     * 创建线程池服务对象.
     *
     * @return 线程池服务对象
     */
    public ExecutorService createExecutorService() {
        return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
    }

    public boolean isShutdown() {
        return threadPoolExecutor.isShutdown();
    }

    /**
     * 获取当前活跃的线程数.
     *
     * @return 当前活跃的线程数
     */
    public int getActiveThreadCount() {
        return threadPoolExecutor.getActiveCount();
    }

    /**
     * 获取待执行任务数量.
     *
     * @return 待执行任务数量
     */
    public int getWorkQueueSize() {
        return workQueue.size();
    }
}

这里我们使用到了MoreExecutors类,其中包含了大量的静态方法用于处理Executor, ExecutorService和ThreadPool实例,翻开源码,整理其中的公共方法,如下:

getExitingExecutorService(ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit):将给定的ThreadPoolExecutor转换成ExecutorService实例,在程序完成时退出, 它是通过使用守护线程和添加一个关闭钩子来等待他们完成。

getExitingScheduledExecutorService(ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit):将给定的ScheduledThreadPoolExecutor转换成ScheduledExecutorService实例,在程序完成时退出, 它是通过使用守护线程和添加一个关闭钩子来等待他们完成。

addDelayedShutdownHook(ExecutorService service, long terminationTimeout, TimeUnit timeUnit):添加一个关闭的钩子来等待给定的ExecutorService中的线程完成。

getExitingExecutorService(ThreadPoolExecutor executor):将给定的ThreadPoolExecutor转换成ExecutorService实例,在程序完成时退出, 它是通过使用守护线程和添加一个关闭钩子来等待他们完成。

getExitingScheduledExecutorService(ScheduledThreadPoolExecutor executor):将给定的ThreadPoolExecutor转换成ScheduledExecutorService实例,在程序完成时退出, 它是通过使用守护线程和添加一个关闭钩子来等待他们完成。

sameThreadExecutor():创建一个ExecutorService实例,运行线程中的每一个任务。

listeningDecorator(ExecutorService delegate):创建一个ExecutorService实例,通过线程提交或者唤醒其他线程提交ListenableFutureTask到给定的ExecutorService实例。

listeningDecorator(ScheduledExecutorService delegate):创建一个ScheduledExecutorService实例,通过线程提交或者唤醒其他线程提交ListenableFutureTask到给定的ExecutorService实例。

platformThreadFactory():返回一个默认的线程工厂用于创建新的线程。

shutdownAndAwaitTermination(ExecutorService service, long timeout, TimeUnit unit):逐渐关闭指定的ExecutorService,首先会禁用新的提交, 然后会取消现有的任务。

测试用例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class ThreadTest {

//    private static AtomicInteger threadcounter = new AtomicInteger(1);
//    private static ExecutorService executor = Executors.newFixedThreadPool(100, r -> {
//        Thread thread = new Thread(r);
//        thread.setName("Thread " + threadcounter.getAndIncrement());
//        System.out.println(thread.getName());
//        return thread;
//    });

    private static ExecutorService executor = new ExecutorServiceObject("thread-test", 100).createExecutorService();

    private static AtomicBoolean isConnected = new AtomicBoolean(false);

    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 1000; i++) {
            process(i);
        }

        // 模拟服务器断连
        sleepMs(10 * 1000);

        isConnected.set(true);

        // 模拟线程池销毁线程后
        sleepMs(80 * 1000);
        System.out.println("============================分割线==========================");
        for (int i = 0; i < 1000; i++) {
            process(i);
        }
    }

    public synchronized static void reconnect() {
        if (!isConnected.get()) {
            sleepMs(3000);
        }
    }

    public static void process(int index) {
        executor.execute(() -> {
            System.out.println("thread_name: " + Thread.currentThread().getName() + ", index: " + index);

            if (isConnected.get()) {
                sleepMs(10);
            } else {
                reconnect();

                process(index);
            }
        });
    }

    public static void sleepMs(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行的结果太多这里就不贴了。

Guava Futures

Java 5中引入了concurrent包,其中提供了许多重要的并发设计,其中一个便是Future对象,Future用于表示一个异步计算任务,我们通常需要启动一个Executor实例,之后调用submit方法获取到Future对象,并通过future.get()方法获取线程执行完成后的结果,ListenableFuture接口继承了Future接口进行了扩展,允许我们注册一个Callback函数,并在任务完成后执行。

一个简单的Future栗子


ExecutorService executor = Executors.newCachedThreadPool();

Future<Integer> future = executor.submit(new Callable<Integer>() {

    @Override

    public Integer call() throws Exception {

        //这里调用一些处理逻辑

        return 1 + 1;
    }
});

获得ListenableFuture接口

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(executor);

一个ListenableFuture栗子

public class Test1 {

    public static void main(String[] args) {
        int NUM_THREADS = 10;//10个线程

        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS));

        ListenableFuture<String> listenableFuture =

                executorService.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        return null;
                    }

                });

        listenableFuture.addListener(new Runnable() {

            @Override
            public void run() {
                //在Future任务完成之后运行的一些方法
                System.out.println("methodToRunOnFutureTaskCompletion");
            }

        }, executorService);
    }
}

不过,ListenableFuture.addListener有一个小的缺陷,我们没有办法接收返回的对象,这就导致在任务执行失败或成功的时候,我们不能执行其他的操作,不过Guava 提供了FutureCallback接口来弥补这个缺陷。

Guava Futures

public class FuturesTest {

    @Test
    public void v1() throws Exception {
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        ListenableFuture future1 = service.submit(new Callable<Integer>() {
            public Integer call() throws InterruptedException {
                Thread.sleep(1000);
                System.out.println("call future 1 ...");
                return 1;
            }
        });

        ListenableFuture future2 = service.submit(new Callable<Integer>() {
            public Integer call() throws InterruptedException {
                Thread.sleep(1000);

//                throw new RuntimeException("call future 2 ...");

                System.out.println("call future 2 ...");
                return 2;
            }
        });

        final ListenableFuture allFutures = Futures.allAsList(future1, future2);
//        final ListenableFuture allFutures = Futures.successfulAsList(future1, future2);

        final ListenableFuture transform = Futures.transform(allFutures, new AsyncFunction<List<Integer>, Boolean>() {
            @Override
            public ListenableFuture apply(List<Integer> results) throws Exception {
                System.out.println("results: " + results);
                return Futures.immediateFuture(String.format("success future: %d", results.size()));
            }
        });

        Futures.addCallback(transform, new FutureCallback<Object>() {

            public void onSuccess(Object result) {
                System.out.println();
                System.out.printf("onSuccess: %s%n", result);
            }

            public void onFailure(Throwable thrown) {
                System.out.printf("onFailure: %s%n", thrown.getMessage());
            }
        });

        System.out.println(transform.get());
    }
}

在Guava中Futures对于Future扩展还有:

transform:对于ListenableFuture的返回值进行转换。
allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
immediateFuture/immediateCancelledFuture:立即返回一个待返回值的ListenableFuture。
makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。

Top