线程池操作 import java.util.HashMap;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;import org.slf4j.Logger
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author FSJ
*
*/
public class ThreadPoolFactory {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolFactory.class);
private static ThreadPoolFactory instance = null;
private static Object initLock = new Object();
private Map
threadThreadMap = new HashMap
(); private final String POOL_DEFAULT = "defaultPool"; private ThreadPoolFactory() { initial(); } private void initial() { // 添加默认队列 if (!threadThreadMap.containsKey(POOL_DEFAULT)) { this.addThreadPool(POOL_DEFAULT, 3, 50, 5, 50, true); } } /** * 添加线程队列 * * @param name * 队列名称 * @param corePoolSize * 核心队列大小 * @param maximumPoolSize * 最大队列大小 * @param keepAliveTime * 活动时间(单位:秒) * @param workQueueSize * 工作队列大小 * @param allowCoreThreadTimeOut */ public void addThreadPool(String name, int corePoolSize, int maximumPoolSize, int keepAliveTime, int workQueueSize, boolean allowCoreThreadTimeOut) { threadThreadMap.put(name, new ThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(workQueueSize), new ThreadPoolExecutor.DiscardOldestPolicy(), allowCoreThreadTimeOut)); } public static ThreadPoolFactory getInstance() { if (instance == null) { synchronized (initLock) { if (instance == null) { instance = new ThreadPoolFactory(); } } } return instance; } /** * 根据线程池的名称获取线程池对象 * * @param threadPoolName * 线程池名称 * @return 返回一个非空的线程池对象(没有指定的线程池,返回defaultPool名称的线程池) */ public ThreadPool getThreadPool(String threadPoolName) { if (threadThreadMap.containsKey(threadPoolName)) { return (ThreadPool) threadThreadMap.get(threadPoolName); } else { return (ThreadPool) threadThreadMap.get(POOL_DEFAULT); } } public void addTask(Runnable task) { addTask(POOL_DEFAULT, task); } /** * 往线程池中增加任务处理类 * * @param threadPoolName * 线程池名称 * @param task * 任务处理类 */ public void addTask(String threadPoolName, Runnable task) { long startTime = System.currentTimeMillis(); ThreadPool threadPool = null; String poolName = threadPoolName; if (threadThreadMap.containsKey(poolName)) { threadPool = (ThreadPool) threadThreadMap.get(poolName); } else { poolName = POOL_DEFAULT; threadPool = (ThreadPool) threadThreadMap.get(POOL_DEFAULT); } threadPool.addTask(task); threadPool.setAccessTime(DateUtils.getCurrentDateTime()); logger.debug("Task.add:[" + DateUtils.elaps(startTime) + "][" + poolName + "]" + task); } }
添加执行任务
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author FSJ
*
*/
public class ThreadPool {
private static final Logger log = LoggerFactory.getLogger(ThreadPool.class);
private static Object initLock = new Object();
private ThreadPoolExecutor threadPoolExecutor = null;
// private ExecutorService executorService;
private Date accessTime = null;
private String name;
public ThreadPool(String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue, RejectedExecutionHandler handler, boolean allowCoreThreadTimeOut) {
this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new ThreadPoolExecutor.AbortPolicy());
setName(threadPoolName);
// this.threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
}
/**
* 添加执行任务
*
* @param task
*/
public void addTask(Runnable task) {
long addTaskTime = System.currentTimeMillis();
synchronized (initLock) {
this.threadPoolExecutor.execute(task);
}
log.info("线程池:" + getName() + "新增任务" + task + " 耗时:" + DateUtils.elaps(addTaskTime));
}
public String submitTask(Callable task) {
String result = "";
long addTaskTime = System.currentTimeMillis();
synchronized (initLock) {
try {
Future future = this.threadPoolExecutor.submit(task);
result = (String) future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
log.info("线程池:" + getName() + "submit任务" + task + " 耗时:" + DateUtils.elaps(addTaskTime));
return result;
}
/**
*
* 返回核心线程数。
*/
public int getCorePoolSize() {
return threadPoolExecutor.getCorePoolSize();
}
/**
*
* 返回允许的最大线程数。
*/
public int getMaximumPoolSize() {
return threadPoolExecutor.getMaximumPoolSize();
}
/**
*
* 返回池中的当前线程数。
*/
public int getPoolSize() {
return threadPoolExecutor.getPoolSize();
}
/**
* 返回队列大小
*
*/
public synchronized int getQueueSize() {
return threadPoolExecutor.getQueue().size();
}
/**
* 返回曾经同时位于池中的最大线程数。
*
*/
public int getLargestPoolSize() {
return threadPoolExecutor.getLargestPoolSize();
}
/**
* 返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。
*
*/
public long getKeepAliveTime() {
return threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
}
/**
* 返回返回曾计划执行的近似任务总数。
*
*/
public long getTaskCount() {
return threadPoolExecutor.getTaskCount();
}
/**
* 返回已完成执行的近似任务总数。
*
*/
public long getCompletedTaskCount() {
return threadPoolExecutor.getCompletedTaskCount();
}
/**
* 加入到线程池的日期
*
*/
public Date getAccessTime() {
return accessTime;
}
public void setAccessTime(Date joinTime) {
this.accessTime = joinTime;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
