我这里需要通过多线程去处理数据,然后在所有数据都处理完成后再往下执行。这里就用到了CountDownLatch。把countdownlatch作为参数传入到每个线程类里,在线程中处理完数据后执行count
我这里需要通过多线程去处理数据,然后在所有数据都处理完成后再往下执行。这里就用到了CountDownLatch。把countdownlatch作为参数传入到每个线程类里,在线程中处理完数据后执行countdown方法。在所有countdownlatch归零后,其await方法结束阻塞状态而往下执行。
具体代码如下:
将多线程任务提交线程池
@Bean(name = "ggnews_executor") public Executor postExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setKeepAliveSeconds(120); executor.setThreadNamePrefix("executor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); return executor; } //通过定时任务调用的fetch方法,为了避免定时任务在多次执行中失效,通异步指定线程池的方式进行调用 @Async("ggnews_executor") public void fetch() { if(fetchFlag.getAndSet(false)) { List<FetchTag> tags = fetchTagService.selectFetchTagList(fetchTag); CountDownLatch downLatch = new CountDownLatch(tags.size()); for (FetchTag tag : tags) { FetchTag tagNew; try { tagNew =(FetchTag) tag.clone(); } catch (Throwable e) { log.error("",e); continue; } //作为参数将CountDownLatch传入 InnerRunner innerRunner = new InnerRunner(downLatch, tagNew); executor.execute(innerRunner); } try { //等待线程执行完毕,如果十分钟后还没结束也会停止阻塞状态 downLatch.await(10,TimeUnit.MINUTES); fetchFlag.getAndSet(true); } catch (Throwable e) { log.error("fetch()方法发生错误:{}", e); fetchFlag.getAndSet(true); //e.printStackTrace(); } finally { fetchFlag.getAndSet(true); } } else { log.info("=======上次抓取尚未结束========="); } }
InnerRunner为要执行具体任务的线程类
private class InnerRunner implements Runnable { private CountDownLatch downLatch; private FetchTag tag; private InnerRunner(CountDownLatch downLatch, FetchTag tag) { this.downLatch = downLatch; this.tag = tag; } @Override public void run() { //将countDown方法移入到具体方法中的finally块中,以保证即使在抛出异常的情况下也算执行了此次任务,countdown会被执行 fetchGG(tag.getTag(), downLatch); //downLatch.countDown(); this.tag = null; } }
private static final String GOOGLE_URL_IN = "https://news.google.com/rss/search?hl=hi&gl=IN&ceid=IN:hi&q="; public void fetchGG(String tag, CountDownLatch downLatch) { try { Document document = Jsoup.parse(new URL(GOOGLE_URL_IN + URLEncoder.encode("\"" + tag + "\"", "utf-8")), 30000); Elements elements = document.getElementsByTag("item"); int rank = 1; for (Element element : elements) { String sourceTitle = element.getElementsByTag("title").get(0).text(); log.info("source title:" + sourceTitle); } } catch (Throwable e) { log.info("fetch google url error", e); } finally { //肯定会被执行 downLatch.countDown(); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持易盾网络。