色综合图-色综合图片-色综合图片二区150p-色综合图区-玖玖国产精品视频-玖玖香蕉视频

您的位置:首頁技術(shù)文章
文章詳情頁

Java 自定義線程池和線程總數(shù)控制操作

瀏覽:68日期:2022-08-16 09:13:03
1 概述

池化是常見的思想,線程池是非常典型的池化的實(shí)現(xiàn),《Java并發(fā)編程實(shí)戰(zhàn)》也大篇幅去講解了Java中的線程池。本文實(shí)現(xiàn)一個(gè)簡單的線程池。

2 核心類【1】接口定義

public interface IThreadPool<Job extends Runnable> { /** * 關(guān)閉線程池 */ public void shutAlldown(); /** * 執(zhí)行任務(wù) * * @param job 任務(wù) */ public void execute(Job job); /** * 添加工作者 * * @param addNum 添加數(shù) */ public void addWorkers(int addNum); /** * 減少工作者 * * @param reduceNum 減少數(shù)目 */ public void reduceWorkers(int reduceNum);}【2】實(shí)現(xiàn)類

線程池的核心是維護(hù)了1個(gè)任務(wù)列表和1個(gè)工作者列表。

import java.util.ArrayList;import java.util.Collections;import java.util.LinkedList;import java.util.List; public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> { // 默認(rèn)線程數(shù) private static int DEAFAULT_SIZE = 5; // 最大線程數(shù) private static int MAX_SIZE = 10; // 任務(wù)列表 private LinkedList<Job> tasks = new LinkedList<Job>(); // 工作線程列表 private List<Worker> workers = Collections .synchronizedList(new ArrayList<Worker>()); /** * 默認(rèn)構(gòu)造函數(shù) */ public XYThreadPool() { initWokers(DEAFAULT_SIZE); } /** * 執(zhí)行線程數(shù) * * @param threadNums 線程數(shù) */ public XYThreadPool(int workerNum) { workerNum = workerNum <= 0 ? DEAFAULT_SIZE : workerNum > MAX_SIZE ? MAX_SIZE : workerNum; initWokers(workerNum); } /** * 初始化線程池 * * @param threadNums 線程數(shù) */ public void initWokers(int threadNums) { for (int i = 0; i < threadNums; i++) { Worker worker = new Worker(); worker.start(); workers.add(worker); } // 添加關(guān)閉鉤子 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutAlldown(); } }); } @Override public void shutAlldown() { for (Worker worker : workers) { worker.shutdown(); } } @Override public void execute(Job job) { synchronized (tasks) { // 提交任務(wù)就是將任務(wù)對象加入任務(wù)隊(duì)列,等待工作線程去處理 tasks.addLast(job); tasks.notifyAll(); } } @Override public void addWorkers(int addNum) { // 新線程數(shù)必須大于零,并且線程總數(shù)不能大于最大線程數(shù) if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) { initWokers(addNum); } else { System.out.println('addNum too large'); } } @Override public void reduceWorkers(int reduceNum) { if ((workers.size() - reduceNum <= 0)) System.out.println('thread num too small'); else { // 暫停指定數(shù)量的工作者 int count = 0; while (count != reduceNum) { for (Worker w : workers) { w.shutdown(); count++; } } } } /** * 工作線程 */ class Worker extends Thread { private volatile boolean flag = true; @Override public void run() { while (flag) { Job job = null; // 加鎖(若只有一個(gè)woker可不必加鎖,那就是所謂的單線程的線程池,線程安全) synchronized (tasks) { // 任務(wù)隊(duì)列為空 while (tasks.isEmpty()) { try { // 阻塞,放棄對象鎖,等待被notify喚醒 tasks.wait(); System.out.println('block when tasks is empty'); } catch (InterruptedException e) { e.printStackTrace(); } } // 不為空取出任務(wù) job = tasks.removeFirst(); System.out.println('get job:' + job + ',do biz'); job.run(); } } } public void shutdown() { flag = false; } }}

(1) 當(dāng)調(diào)用wait()方法時(shí)線程會(huì)放棄對象鎖,進(jìn)入等待此對象的等待鎖定池,只有針對此對象調(diào)用notify()方法后本線程才進(jìn)入對象鎖定池準(zhǔn)備

(2) Object的方法:void notify(): 喚醒一個(gè)正在等待該對象的線程。void notifyAll(): 喚醒所有正在等待該對象的線程。

notifyAll使所有原來在該對象上等待被notify的線程統(tǒng)統(tǒng)退出wait狀態(tài),變成等待該對象上的鎖,一旦該對象被解鎖,它們會(huì)去競爭。

notify只是選擇一個(gè)wait狀態(tài)線程進(jìn)行通知,并使它獲得該對象上的鎖,但不驚動(dòng)其它同樣在等待被該對象notify的線程們,當(dāng)?shù)谝粋€(gè)線程運(yùn)行完畢以后釋放對象上的鎖,此時(shí)如果該對象沒有再次使用notify語句,即便該對象已經(jīng)空閑,其他wait狀態(tài)等待的線程由于沒有得到該對象的通知,繼續(xù)處在wait狀態(tài),直到這個(gè)對象發(fā)出一個(gè)notify或notifyAll,它們等待的是被notify或notifyAll,而不是鎖。

3 無需控制線程總數(shù)

每調(diào)用一次就會(huì)創(chuàng)建一個(gè)擁有10個(gè)線程工作者的線程池。

public class TestService1 { public static void main(String[] args) { // 啟動(dòng)10個(gè)線程 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10); pool.execute(new Runnable() { @Override public void run() { System.out.println('====1 test===='); } }); }} public class TestService2 { public static void main(String[] args) { // 啟動(dòng)10個(gè)線程 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10); pool.execute(new Runnable() { @Override public void run() { System.out.println('====2 test===='); } }); }}4 控制線程總數(shù)

在項(xiàng)目中所有的線程調(diào)用,一般都共用1個(gè)固定工作者數(shù)大小的線程池。

import javax.annotation.PostConstruct;import org.springframework.stereotype.Component;import com.xy.pool.XYThreadPool; /** * 統(tǒng)一線程池管理類 */@Componentpublic class XYThreadManager { private XYThreadPool<Runnable> executorPool; @PostConstruct public void init() { executorPool = new XYThreadPool<Runnable>(10); } public XYThreadPool<Runnable> getExecutorPool() { return executorPool; }} import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service; @Service('testService3')public class TestService3 { @Autowired private XYThreadManager threadManager; public void test() { threadManager.getExecutorPool().execute(new Runnable() { @Override public void run() { System.out.println('====3 test===='); } }); }} import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service; @Service('testService4')public class TestService4 { @Autowired private XYThreadManager threadManager; public void test() { threadManager.getExecutorPool().execute(new Runnable() { @Override public void run() { System.out.println('====4 test===='); } }); }} import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext; public class TestMain { @SuppressWarnings('resource') public static void main(String[] args) { ApplicationContext atc = new ClassPathXmlApplicationContext('applicationContext.xml'); TestService3 t3 = (TestService3) atc.getBean('testService3'); t3.test(); TestService4 t4 = (TestService4) atc.getBean('testService4'); t4.test(); } }

補(bǔ)充:論如何優(yōu)雅的自定義ThreadPoolExecutor線程池

前言

線程池想必大家也都用過,JDK的Executors 也自帶一些線程池。但是不知道大家有沒有想過,如何才是最優(yōu)雅的方式去使用過線程池嗎? 生產(chǎn)環(huán)境要怎么去配置自己的線程池才是合理的呢?

今天周末,剛好有時(shí)間來總結(jié)一下自己所認(rèn)為的’優(yōu)雅’, 如有問題歡迎大家指正。

線程池使用規(guī)則

要使用好線程池,那么一定要遵循幾個(gè)規(guī)則:

線程個(gè)數(shù)大小的設(shè)置

線程池相關(guān)參數(shù)配置

利用Hook嵌入你的行為

線程池的關(guān)閉

線程池配置相關(guān)

線程池大小的設(shè)置

這其實(shí)是一個(gè)面試的考點(diǎn),很多面試官會(huì)問你線程池coreSize 的大小來考察你對于線程池的理解。

首先針對于這個(gè)問題,我們必須要明確我們的需求是計(jì)算密集型還是IO密集型,只有了解了這一點(diǎn),我們才能更好的去設(shè)置線程池的數(shù)量進(jìn)行限制。

1、計(jì)算密集型:

顧名思義就是應(yīng)用需要非常多的CPU計(jì)算資源,在多核CPU時(shí)代,我們要讓每一個(gè)CPU核心都參與計(jì)算,將CPU的性能充分利用起來,這樣才算是沒有浪費(fèi)服務(wù)器配置,如果在非常好的服務(wù)器配置上還運(yùn)行著單線程程序那將是多么重大的浪費(fèi)。對于計(jì)算密集型的應(yīng)用,完全是靠CPU的核數(shù)來工作,所以為了讓它的優(yōu)勢完全發(fā)揮出來,避免過多的線程上下文切換,比較理想方案是:

線程數(shù) = CPU核數(shù)+1,也可以設(shè)置成CPU核數(shù)*2,但還要看JDK的版本以及CPU配置(服務(wù)器的CPU有超線程)。

一般設(shè)置CPU * 2即可。

2、IO密集型

我們現(xiàn)在做的開發(fā)大部分都是WEB應(yīng)用,涉及到大量的網(wǎng)絡(luò)傳輸,不僅如此,與數(shù)據(jù)庫,與緩存間的交互也涉及到IO,一旦發(fā)生IO,線程就會(huì)處于等待狀態(tài),當(dāng)IO結(jié)束,數(shù)據(jù)準(zhǔn)備好后,線程才會(huì)繼續(xù)執(zhí)行。因此從這里可以發(fā)現(xiàn),對于IO密集型的應(yīng)用,我們可以多設(shè)置一些線程池中線程的數(shù)量,這樣就能讓在等待IO的這段時(shí)間內(nèi),線程可以去做其它事,提高并發(fā)處理效率。那么這個(gè)線程池的數(shù)據(jù)量是不是可以隨便設(shè)置呢?當(dāng)然不是的,請一定要記得,線程上下文切換是有代價(jià)的。目前總結(jié)了一套公式,對于IO密集型應(yīng)用:

線程數(shù) = CPU核心數(shù)/(1-阻塞系數(shù)) 這個(gè)阻塞系數(shù)一般為0.8~0.9之間,也可以取0.8或者0.9。

套用公式,對于雙核CPU來說,它比較理想的線程數(shù)就是20,當(dāng)然這都不是絕對的,需要根據(jù)實(shí)際情況以及實(shí)際業(yè)務(wù)來調(diào)整:final int poolSize = (int)(cpuCore/(1-0.9))

針對于阻塞系數(shù),《Programming Concurrency on the JVM Mastering》即《Java 虛擬機(jī)并發(fā)編程》中有提到一句話:

對于阻塞系數(shù),我們可以先試著猜測,抑或采用一些細(xì)嫩分析工具或java.lang.management API 來確定線程花在系統(tǒng)/IO操作上的時(shí)間與CPU密集任務(wù)所耗的時(shí)間比值。

線程池相關(guān)參數(shù)配置

說到這一點(diǎn),我們只需要謹(jǐn)記一點(diǎn),一定不要選擇沒有上限限制的配置項(xiàng)。

這也是為什么不建議使用Executors 中創(chuàng)建線程的方法。

比如,Executors.newCachedThreadPool的設(shè)置與無界隊(duì)列的設(shè)置因?yàn)槟承┎豢深A(yù)期的情況,線程池會(huì)出現(xiàn)系統(tǒng)異常,導(dǎo)致線程暴增的情況或者任務(wù)隊(duì)列不斷膨脹,內(nèi)存耗盡導(dǎo)致系統(tǒng)崩潰和異常。 我們推薦使用自定義線程池來避免該問題,這也是在使用線程池規(guī)范的首要原則! 小心無大錯(cuò),千萬別過度自信!

可以看下Executors中四個(gè)創(chuàng)建線程池的方法:

//使用無界隊(duì)列public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //線程池?cái)?shù)量是無限的public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

其他的就不再列舉了,大家可以自行查閱源碼。

第二,合理設(shè)置線程數(shù)量、和線程空閑回收時(shí)間,根據(jù)具體的任務(wù)執(zhí)行周期和時(shí)間去設(shè)定,避免頻繁的回收和創(chuàng)建,雖然我們使用線程池的目的是為了提升系統(tǒng)性能和吞吐量,但是也要考慮下系統(tǒng)的穩(wěn)定性,不然出現(xiàn)不可預(yù)期問題會(huì)很麻煩!

第三,根據(jù)實(shí)際場景,選擇適用于自己的拒絕策略。進(jìn)行補(bǔ)償,不要亂用JDK支持的自動(dòng)補(bǔ)償機(jī)制!盡量采用自定義的拒絕策略去進(jìn)行兜底!

第四,線程池拒絕策略,自定義拒絕策略可以實(shí)現(xiàn)RejectedExecutionHandler接口。

JDK自帶的拒絕策略如下:

AbortPolicy:直接拋出異常阻止系統(tǒng)正常工作。

CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。

DiscardOldestPolicy:丟棄最老的一個(gè)請求,嘗試再次提交當(dāng)前任務(wù)。

DiscardPolicy:丟棄無法處理的任務(wù),不給予任何處理。

利用Hook

利用Hook,留下線程池執(zhí)行軌跡:

ThreadPoolExecutor提供了protected類型可以被覆蓋的鉤子方法,允許用戶在任務(wù)執(zhí)行之前會(huì)執(zhí)行之后做一些事情。我們可以通過它來實(shí)現(xiàn)比如初始化ThreadLocal、收集統(tǒng)計(jì)信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個(gè)Hook可以用來在任務(wù)被執(zhí)行完的時(shí)候讓用戶插入邏輯,如rerminated 。

如果hook方法執(zhí)行失敗,則內(nèi)部的工作線程的執(zhí)行將會(huì)失敗或被中斷。

我們可以使用beforeExecute和afterExecute來記錄線程之前前和后的一些運(yùn)行情況,也可以直接把運(yùn)行完成后的狀態(tài)記錄到ELK等日志系統(tǒng)。

關(guān)閉線程池

內(nèi)容當(dāng)線程池不在被引用并且工作線程數(shù)為0的時(shí)候,線程池將被終止。我們也可以調(diào)用shutdown來手動(dòng)終止線程池。如果我們忘記調(diào)用shutdown,為了讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達(dá)到目的!

當(dāng)然,穩(wěn)妥的方式是使用虛擬機(jī)Runtime.getRuntime().addShutdownHook方法,手工去調(diào)用線程池的關(guān)閉方法!

線程池使用實(shí)例

線程池核心代碼:

public class AsyncProcessQueue { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * Task 包裝類<br> * 此類型的意義是記錄可能會(huì)被 Executor 吃掉的異常<br> */ public static class TaskWrapper implements Runnable { private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class); private final Runnable gift; public TaskWrapper(final Runnable target) { this.gift = target; } @Override public void run() { // 捕獲異常,避免在 Executor 里面被吞掉了 if (gift != null) { try { gift.run(); } catch (Exception e) { _LOGGER.error('Wrapped target execute exception.', e); } } } } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * 執(zhí)行指定的任務(wù) * * @param task * @return */ public static boolean execute(final Runnable task) { return AsyncProcessor.executeTask(new TaskWrapper(task)); }}public class AsyncProcessor { static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class); /** * 默認(rèn)最大并發(fā)數(shù)<br> */ private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; /** * 線程池名稱格式 */ private static final String THREAD_POOL_NAME = 'ExternalConvertProcessPool-%d'; /** * 線程工廠名稱 */ private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME) .daemon(true).build(); /** * 默認(rèn)隊(duì)列大小 */ private static final int DEFAULT_SIZE = 500; /** * 默認(rèn)線程存活時(shí)間 */ private static final long DEFAULT_KEEP_ALIVE = 60L; /**NewEntryServiceImpl.java:689 * Executor */ private static ExecutorService executor; /** * 執(zhí)行隊(duì)列 */ private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); static { // 創(chuàng)建 Executor // 此處默認(rèn)最大值改為處理器數(shù)量的 4 倍 try { executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, executeQueue, FACTORY); // 關(guān)閉事件的掛鉤 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { AsyncProcessor.LOGGER.info('AsyncProcessor shutting down.'); executor.shutdown(); try { // 等待1秒執(zhí)行關(guān)閉 if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { AsyncProcessor.LOGGER.error('AsyncProcessor shutdown immediately due to wait timeout.'); executor.shutdownNow(); } } catch (InterruptedException e) { AsyncProcessor.LOGGER.error('AsyncProcessor shutdown interrupted.'); executor.shutdownNow(); } AsyncProcessor.LOGGER.info('AsyncProcessor shutdown complete.'); } })); } catch (Exception e) { LOGGER.error('AsyncProcessor init error.', e); throw new ExceptionInInitializerError(e); } } /** * 此類型無法實(shí)例化 */ private AsyncProcessor() { } /** * 執(zhí)行任務(wù),不管是否成功<br> * 其實(shí)也就是包裝以后的 {@link Executer} 方法 * * @param task * @return */ public static boolean executeTask(Runnable task) { try { executor.execute(task); } catch (RejectedExecutionException e) { LOGGER.error('Task executing was rejected.', e); return false; } return true; } /** * 提交任務(wù),并可以在稍后獲取其執(zhí)行情況<br> * 當(dāng)提交失敗時(shí),會(huì)拋出 {@link } * * @param task * @return */ public static <T> Future<T> submitTask(Callable<T> task) { try { return executor.submit(task); } catch (RejectedExecutionException e) { LOGGER.error('Task executing was rejected.', e); throw new UnsupportedOperationException('Unable to submit the task, rejected.', e); } }}

使用方式:

AsyncProcessQueue.execute(new Runnable() { @Override public void run() {//do something }});

可以根據(jù)自己的使用場景靈活變更,我這里并沒有用到beforeExecute和afterExecute以及拒絕策略。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持好吧啦網(wǎng)。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 久久久久亚洲精品中文字幕 | 久草免费在线视频 | 亚洲精品亚洲人成人网 | 高清国产露脸捆绑01经典 | 日韩在线免费 | 欧美一区二区三区gg高清影视 | 日本午夜高清视频 | 72种姿势欧美久久久久大黄蕉 | 国产成人久久 | 91亚洲精品国产第一区 | 亚洲视频免费看 | 国产一级一级一级国产片 | 中文字幕亚洲综合久久男男 | 国产在线精品一区二区中文 | 国产成人精品高清在线观看99 | 一级做a爰片久久毛片欧美 一级做a爰片久久毛片人呢 | a级毛片免费高清毛片视频 a级毛片免费高清视频 | 日本久久久 | 一级做a爰片久久毛片人呢 一级做a爰片久久毛片唾 | 一区二区三区免费在线视频 | 久久久久久久久久久视频国内精品视频 | 国产一级特黄全黄毛片 | 国产精品深爱在线 | www.av在线.com | 亚洲精品久久久中文字 | 美女张开腿让男生桶出水 | 日韩欧美亚洲每的更新在线 | 91久久精品一区二区 | 欧美精品专区55页 | 男人看片网址 | 欧美性猛片xxxxⅹ免费 | 国产视频久久 | 国产美女自拍视频 | 日本美女性爱 | 国产精品久久久久久久毛片 | 国产成人综合91香蕉 | 久久久久久一级毛片免费野外 | 偷拍精品视频一区二区三区 | 一区二区在线看 | 美国毛片免费一级 | 精品三级内地国产在线观看 |