线程池操作 import java.util.HashMap;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.Logger
import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author FSJ * */ public class ThreadPoolFactory { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolFactory.class); private static ThreadPoolFactory instance = null; private static Object initLock = new Object(); private Map添加执行任务threadThreadMap = new HashMap (); private final String POOL_DEFAULT = "defaultPool"; private ThreadPoolFactory() { initial(); } private void initial() { // 添加默认队列 if (!threadThreadMap.containsKey(POOL_DEFAULT)) { this.addThreadPool(POOL_DEFAULT, 3, 50, 5, 50, true); } } /** * 添加线程队列 * * @param name * 队列名称 * @param corePoolSize * 核心队列大小 * @param maximumPoolSize * 最大队列大小 * @param keepAliveTime * 活动时间(单位:秒) * @param workQueueSize * 工作队列大小 * @param allowCoreThreadTimeOut */ public void addThreadPool(String name, int corePoolSize, int maximumPoolSize, int keepAliveTime, int workQueueSize, boolean allowCoreThreadTimeOut) { threadThreadMap.put(name, new ThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(workQueueSize), new ThreadPoolExecutor.DiscardOldestPolicy(), allowCoreThreadTimeOut)); } public static ThreadPoolFactory getInstance() { if (instance == null) { synchronized (initLock) { if (instance == null) { instance = new ThreadPoolFactory(); } } } return instance; } /** * 根据线程池的名称获取线程池对象 * * @param threadPoolName * 线程池名称 * @return 返回一个非空的线程池对象(没有指定的线程池,返回defaultPool名称的线程池) */ public ThreadPool getThreadPool(String threadPoolName) { if (threadThreadMap.containsKey(threadPoolName)) { return (ThreadPool) threadThreadMap.get(threadPoolName); } else { return (ThreadPool) threadThreadMap.get(POOL_DEFAULT); } } public void addTask(Runnable task) { addTask(POOL_DEFAULT, task); } /** * 往线程池中增加任务处理类 * * @param threadPoolName * 线程池名称 * @param task * 任务处理类 */ public void addTask(String threadPoolName, Runnable task) { long startTime = System.currentTimeMillis(); ThreadPool threadPool = null; String poolName = threadPoolName; if (threadThreadMap.containsKey(poolName)) { threadPool = (ThreadPool) threadThreadMap.get(poolName); } else { poolName = POOL_DEFAULT; threadPool = (ThreadPool) threadThreadMap.get(POOL_DEFAULT); } threadPool.addTask(task); threadPool.setAccessTime(DateUtils.getCurrentDateTime()); logger.debug("Task.add:[" + DateUtils.elaps(startTime) + "][" + poolName + "]" + task); } }
import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author FSJ * */ public class ThreadPool { private static final Logger log = LoggerFactory.getLogger(ThreadPool.class); private static Object initLock = new Object(); private ThreadPoolExecutor threadPoolExecutor = null; // private ExecutorService executorService; private Date accessTime = null; private String name; public ThreadPool(String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler, boolean allowCoreThreadTimeOut) { this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy()); setName(threadPoolName); // this.threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut); } /** * 添加执行任务 * * @param task */ public void addTask(Runnable task) { long addTaskTime = System.currentTimeMillis(); synchronized (initLock) { this.threadPoolExecutor.execute(task); } log.info("线程池:" + getName() + "新增任务" + task + " 耗时:" + DateUtils.elaps(addTaskTime)); } public String submitTask(Callable task) { String result = ""; long addTaskTime = System.currentTimeMillis(); synchronized (initLock) { try { Future future = this.threadPoolExecutor.submit(task); result = (String) future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } log.info("线程池:" + getName() + "submit任务" + task + " 耗时:" + DateUtils.elaps(addTaskTime)); return result; } /** * * 返回核心线程数。 */ public int getCorePoolSize() { return threadPoolExecutor.getCorePoolSize(); } /** * * 返回允许的最大线程数。 */ public int getMaximumPoolSize() { return threadPoolExecutor.getMaximumPoolSize(); } /** * * 返回池中的当前线程数。 */ public int getPoolSize() { return threadPoolExecutor.getPoolSize(); } /** * 返回队列大小 * */ public synchronized int getQueueSize() { return threadPoolExecutor.getQueue().size(); } /** * 返回曾经同时位于池中的最大线程数。 * */ public int getLargestPoolSize() { return threadPoolExecutor.getLargestPoolSize(); } /** * 返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。 * */ public long getKeepAliveTime() { return threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); } /** * 返回返回曾计划执行的近似任务总数。 * */ public long getTaskCount() { return threadPoolExecutor.getTaskCount(); } /** * 返回已完成执行的近似任务总数。 * */ public long getCompletedTaskCount() { return threadPoolExecutor.getCompletedTaskCount(); } /** * 加入到线程池的日期 * */ public Date getAccessTime() { return accessTime; } public void setAccessTime(Date joinTime) { this.accessTime = joinTime; } public String getName() { return name; } public void setName(String name) { this.name = name; } }