当前位置 : 主页 > 编程语言 > java >

ThreadPoolExecutor支持写文件再处理的策略

来源:互联网 收集:自由互联 发布时间:2021-06-28
RetryWithFileExecutor.java /** * */package gist.util.concurrent;import java.io.BufferedWriter;import java.io.File;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStreamWri
RetryWithFileExecutor.java
/**
 * 
 */
package gist.util.concurrent;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * RetryWithFileExecutor的功能: 在任务超过线程池处理能力时,先写文件再读取处理
 * 
 * 任务数据保存在数据文件dir/name_yyyyMMdd.txt文件, 每天一个文件,数据按行分隔
 * 数据文件读取进度保存在dir/name_yyyyMMdd.meta, 默认启动时读取处理当天的数据文件,和当前时间12小时内的昨天数据文件
 * 
 * dir: 数据文件和元文件保存目录
 * name: 文件名称
 * codec: 任务的编解码
 * handler: retryExecutor调用handler.retry(codec.decode(data));
 * taskExecutor: 执行任务
 * writeT: 处理taskExecutor拒绝的任务,写入数据文件
 * retryExecutor: 定时调度程序,延迟5s执行下一次重试操作,更新meta的读取进度
 * 
* * Test: {@link #main} * *

* TODO: *

  • 写的时候磁盘满的异常情况未处理
  • *
  • 没有权限创建文件
  • * *

    * * @author dzh * */ public class RetryWithFileExecutor { static Logger LOG = LoggerFactory.getLogger(RetryWithFileExecutor.class); public static final String NAME = "retrywithfileexecutor.name"; public static final String DIR = "retrywithfileexecutor.dir"; public static final String NTHREADS = "retrywithfileexecutor.nthreads"; public static final String CAPACITY = "retrywithfileexecutor.capacity"; private ExecutorService taskExecutor; private ScheduledExecutorService retryExecutor; private String charsetName = "utf-8"; /** * directory which saved retry files * *
         * file prefix:
         * write data-file: dir/name_yyyyMMdd.txt 
         * write meta-file: dir/name_yyyyMMdd.meta
         * 
    */ private String dir; private String name; private RetryCodec codec; private RetryHandler handler; // write queue private BlockingQueue wq = new LinkedBlockingQueue<>(); private Thread writeT; private volatile boolean shutdown = false; // task queue private BlockingQueue tq = new LinkedBlockingQueue<>(); private RetryWithFileExecutor(Properties conf) { this.name = conf.getProperty(NAME); this.dir = conf.getProperty(DIR); LOG.info("{} retry folder: {}", name, dir); if (!checkOrNewDir(new File(dir))) { LOG.error("{} failed to start. Please check directory {}", name, dir); return; } LOG.info("taskExecutor start"); int nThreads = Integer.parseInt(conf.getProperty(NTHREADS, String.valueOf(Runtime.getRuntime().availableProcessors() - 1))); int capacity = Integer.parseInt(conf.getProperty(CAPACITY, String.valueOf(10000))); tq = new ArrayBlockingQueue (capacity); taskExecutor = new ThreadPoolExecutor(0, nThreads, 2000L, TimeUnit.MILLISECONDS, tq, newThreadFactory(), newRejectedExecutionHandler()) { protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new SourceFutureTask (runnable, value); } }; Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutdown(); } }); startWriteThread(); startRetryThread(); } private void shutdown() { LOG.info("taskExecutor shutdown"); try { if (taskExecutor != null) { taskExecutor.shutdown(); taskExecutor.awaitTermination(30, TimeUnit.SECONDS); } } catch (Exception e) {} if (!tq.isEmpty()) { LOG.info("write tq {}", tq.size()); Runnable r = null; while ((r = tq.poll()) != null) { if (r instanceof WithSource) { writeFile(((WithSource ) r).getSource()); } } } shutdown = true; LOG.info("writeThread shutdown"); writeT.interrupt(); try { writeT.join(10000L); } catch (InterruptedException e) {} LOG.info("retryExecutor shutdown"); try { if (retryExecutor != null) { retryExecutor.shutdown(); retryExecutor.awaitTermination(30, TimeUnit.SECONDS); } } catch (Exception e) {} } private boolean checkOrNewDir(File f) { if (f == null) throw new NullPointerException("file is null"); if (f.isDirectory() && !f.exists()) { LOG.info("create {}", f.getAbsolutePath()); f.mkdirs(); } return true; } public RetryCodec getCodec() { return codec; } public void setCodec(RetryCodec codec) { this.codec = codec; } public static final RetryWithFileExecutor newExecutor(String name, String dir, RetryCodec codec, RetryHandler h) { Properties conf = new Properties(); conf.put(NAME, name); conf.put(DIR, dir); conf.put(NTHREADS, String.valueOf(Runtime.getRuntime().availableProcessors() - 1)); conf.put(CAPACITY, "10000"); return newExecutor(conf, codec, h); } public static final RetryWithFileExecutor newExecutor(Properties conf, RetryCodec codec, RetryHandler h) { RetryWithFileExecutor r = new RetryWithFileExecutor(conf); r.codec = codec; r.handler = h; if (r.codec == null) throw new NullPointerException("RetryWithFileExecutor.codec is null"); if (r.handler == null) throw new NullPointerException("RetryWithFileExecutor.handler is null"); return r; } /** *
         * Write source into data file.
         * 
         * 
    */ private void startWriteThread() { LOG.info("writeThread start"); writeT = new Thread("RetryWithFileExecutor.writeThread") { public void run() { BufferedWriter out = null; WRITE_DATA: for (;;) { if (out != null) { try { out.flush(); out.close(); } catch (IOException e) { LOG.error(e.getMessage(), e); } } File dataFile = checkDataFile(System.currentTimeMillis(), true); try { out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dataFile, true), charsetName)); } catch (UnsupportedEncodingException | FileNotFoundException e) { LOG.error(e.getMessage(), e.fillInStackTrace()); } for (;;) { if (shutdown && tq.size() == 0 && wq.size() == 0) break WRITE_DATA; File chkFile = checkDataFile(System.currentTimeMillis(), true); if (!chkFile.equals(dataFile)) { continue WRITE_DATA; } String data = null; try { data = wq.poll(5, TimeUnit.SECONDS);// TODO } catch (InterruptedException e) { continue; } if (data != null) { try { out.write(data); out.newLine(); } catch (IOException e) { LOG.error(e.getMessage(), e); } } else { try { out.flush(); } catch (IOException e) {} } } } if (out != null) { try { out.flush(); out.close(); } catch (IOException e) { LOG.error(e.getMessage(), e); } } LOG.info("writeThread exit"); } }; writeT.start(); } private File checkDataFile(long ts, boolean create) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); String tsStr = sdf.format(new Date(ts)); String fileBase = this.dir + this.name; String data = fileBase + "_" + tsStr + ".txt"; File dataFile = new File(data); if (!dataFile.exists() && create) { try { LOG.info("create data file {}", dataFile.getAbsolutePath()); dataFile.createNewFile(); } catch (IOException e) { LOG.error(e.getMessage(), e); } } return dataFile; } /** * record dataFile's retry position * * @param ts * @param create * @return */ private File checkMetaFile(long ts, boolean create) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); String tsStr = sdf.format(new Date(ts)); String fileBase = this.dir + this.name; String meta = fileBase + "_" + tsStr + ".meta"; File metaFile = new File(meta); if (!metaFile.exists() && create) { try { LOG.info("create meta file {}", metaFile.getAbsolutePath()); metaFile.createNewFile(); } catch (IOException e) { LOG.error(e.getMessage(), e); } } return metaFile; } /** * Read and retry to publish */ private void startRetryThread() { LOG.info("retryExecutor start"); retryExecutor = Executors.newScheduledThreadPool(1); retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { long today = System.currentTimeMillis(); // today retryWithHandler(today); // check 12hours-before TODO long h12 = today - 12 * 3600 * 1000; File h12Data = checkDataFile(h12, false); File nowData = checkDataFile(today, false); if (h12Data.exists() && !h12Data.equals(nowData)) { retryWithHandler(h12); } } catch (Exception e) { LOG.error(e.getMessage(), e); } } }, 1000, 5000, TimeUnit.MILLISECONDS);// TODO } private void retryWithHandler(long ts) { try { File file = checkDataFile(ts, false); if (file.exists()) { try (RandomAccessFile metaFile = new RandomAccessFile(checkMetaFile(ts, true), "rw"); RandomAccessFile dataFile = new RandomAccessFile(file, "r");) { long dataSize = dataFile.length(); if (dataSize > 0) { MappedByteBuffer metaBuf = metaFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 8); long pos = metaBuf.getLong(); LOG.info("{} init size-{} pos-{}", checkMetaFile(ts, false).getAbsolutePath(), dataSize, pos); while (pos < dataSize) { if (shutdown) break; dataFile.seek(pos); String data = dataFile.readLine(); try { handler.retry(codec.decode(data)); } catch (Exception e) { LOG.error(e.getMessage(), e); } pos = dataFile.getFilePointer(); metaBuf.rewind(); metaBuf.putLong(pos); LOG.debug("retry size-{} pos-{} data-{}", dataSize, pos, data); } metaBuf.force(); } } } } catch (Exception e) { LOG.error(e.getMessage(), e); } } public Future submit(SourceRunnable r) { return taskExecutor.submit(r, r.getSource()); } private ThreadFactory newThreadFactory() { return new ThreadFactory() { private final ThreadGroup group = Thread.currentThread().getThreadGroup(); private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, RetryWithFileExecutor.this.name + "-" + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }; } private RejectedExecutionHandler newRejectedExecutionHandler() { return new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof WithSource) { writeFile(((WithSource ) r).getSource()); } } }; } protected void writeFile(Object source) { String enc = codec.encode(source); if (wq.size() > 100000) { // discard TODO LOG.error("discard {} because wq.size is over max-size", enc); } if (enc != null) wq.offer(enc); } public abstract static class SourceRunnable implements Runnable, WithSource { private V source; public SourceRunnable(V s) { this.source = s; } public V getSource() { return source; } } public static class SourceFutureTask extends FutureTask implements WithSource { private V source; public SourceFutureTask(Runnable runnable, V result) { super(runnable, result); this.source = result; } public V getSource() { return source; } } public static interface WithSource { V getSource(); } public static interface RetryCodec { String encode(Object obj); Object decode(String enstr); } public static interface RetryHandler { void retry(Object obj); } /** * Test example * * {@link #RetryWithFileExecutor(String, String)} * int nThreads = 1; * int maxE = 1; */ public static void main(String[] args) { RetryCodec SimpleCodec = new RetryCodec() { @Override public String encode(Object obj) { if (obj == null) return null; return String.valueOf(obj); } @Override public Object decode(String enstr) { return Integer.parseInt(enstr); } }; final RetryHandler retryHandler = new RetryHandler() { @Override public void retry(Object obj) { LOG.info("retry {}", obj); } }; Properties conf = new Properties(); conf.put(NAME, "retry"); conf.put(DIR, "/Users/dzh/"); conf.put(NTHREADS, "1"); conf.put(CAPACITY, "1"); RetryWithFileExecutor RetryExecutor = RetryWithFileExecutor.newExecutor(conf, SimpleCodec, retryHandler); for (int i = 0; i < 100; i++) { RetryExecutor.submit(new SourceRunnable (i) { @Override public void run() { LOG.info("task {}", getSource()); try { Thread.sleep(1000L); } catch (InterruptedException e) {} } }); } try { Thread.sleep(10 * 1000); } catch (InterruptedException e) {} System.exit(0); } }
    网友评论