importjava.util.Random;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.TimeUnit;publicclassT05_LinkedBlockingQueue{staticBlockingQueue<String> strs =newLinkedBlockingQueue<>();staticRandom r =newRandom();publicstaticvoidmain(String[] args){newThread(()->{for(int i =0; i <100; i++){try{strs.put("a"+ i);//如果满了,当前线程就会等待(实现阻塞),等多会有空位,将值插入TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));}catch(InterruptedException e){e.printStackTrace();}}},"p1").start();for(int i =0; i <5; i++){newThread(()->{for(;;){try{System.out.println(Thread.currentThread().getName()+" take -"+ strs.take());//取内容,如果空了,当前线程就会等待(实现阻塞)}catch(InterruptedException e){e.printStackTrace();}}},"c"+ i).start();}}}
5、ArrayBlockingQueue 有界阻塞队列(因为Array需要指定长度)
importjava.util.Random;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeUnit;publicclassT06_ArrayBlockingQueue{staticBlockingQueue<String> strs =newArrayBlockingQueue<>(10);staticRandom r =newRandom();publicstaticvoidmain(String[] args)throwsInterruptedException{for(int i =0; i <10; i++){strs.put("a"+ i);}//strs.put("aaa"); //满了就会等待,程序阻塞//strs.add("aaa");//strs.offer("aaa");strs.offer("aaa",1,TimeUnit.SECONDS);System.out.println(strs);}}
importjava.util.Calendar;importjava.util.Random;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;publicclassT07_DelayQueue{staticBlockingQueue<MyTask> tasks =newDelayQueue<>();staticRandom r =newRandom();staticclassMyTaskimplementsDelayed{String name;long runningTime;MyTask(String name,long rt){this.name = name;this.runningTime = rt;}@OverridepublicintcompareTo(Delayed o){if(this.getDelay(TimeUnit.MILLISECONDS)< o.getDelay(TimeUnit.MILLISECONDS))return-1;elseif(this.getDelay(TimeUnit.MILLISECONDS)> o.getDelay(TimeUnit.MILLISECONDS))return1;elsereturn0;}@OverridepubliclonggetDelay(TimeUnit unit){return unit.convert(runningTime -System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@OverridepublicStringtoString(){return name +" "+ runningTime;}}publicstaticvoidmain(String[] args)throwsInterruptedException{long now =System.currentTimeMillis();MyTask t1 =newMyTask("t1", now +1000);MyTask t2 =newMyTask("t2", now +2000);MyTask t3 =newMyTask("t3", now +1500);MyTask t4 =newMyTask("t4", now +2500);MyTask t5 =newMyTask("t5", now +500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for(int i=0; i<5; i++){System.out.println(tasks.take());//获取的是toString方法返回值}}}
7、特殊的阻塞队列2:PriorityQueque 优先队列(二叉树算法,就是排序)
importjava.util.PriorityQueue;publicclassT07_01_PriorityQueque{publicstaticvoidmain(String[] args){PriorityQueue<String> q =newPriorityQueue<>();q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");for(int i =0; i <5; i++){System.out.println(q.poll());}}}
publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{Callable<String> c =newCallable(){@OverridepublicStringcall()throwsException{return"Hello Callable";}};ExecutorService service =Executors.newCachedThreadPool();Future<String> future = service.submit(c);//异步System.out.println(future.get());//阻塞service.shutdown();}
importjava.util.Random;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;publicclassT10_ScheduledPool{publicstaticvoidmain(String[] args){ScheduledExecutorService service =Executors.newScheduledThreadPool(4);service.scheduleAtFixedRate(()->{//提交延时任务try{TimeUnit.MILLISECONDS.sleep(newRandom().nextInt(1000));}catch(InterruptedException e){e.printStackTrace();}System.out.println(Thread.currentThread().getName());},0,500,TimeUnit.MILLISECONDS);//指定延时时间和单位,第一个任务延时0毫秒,之后的任务,延时500毫秒}}
9、手写拒绝策略小例子
importjava.util.concurrent.*;publicclassT14_MyRejectedHandler{publicstaticvoidmain(String[] args){ExecutorService service =newThreadPoolExecutor(4,4,0,TimeUnit.SECONDS,newArrayBlockingQueue<>(6),Executors.defaultThreadFactory(),newMyHandler());//将手写拒绝策略传入}staticclassMyHandlerimplementsRejectedExecutionHandler{//1、继承RejectedExecutionHandler@OverridepublicvoidrejectedExecution(Runnable r,ThreadPoolExecutor executor){//2、重写方法//log("r rejected")//伪代码,表示通过log4j.log()报一下日志,拒绝的时间,线程名//save r kafka mysql redis//可以尝试保存队列//try 3 times //可以尝试几次,比如3次,重新去抢队列,3次还不行就丢弃if(executor.getQueue().size()<10000){//尝试条件,如果size>10000了,就执行拒绝策略//try put again();//如果小于10000,尝试将其放到队列中}}}}
publicvoidexecute(Runnable command){if(command ==null)thrownewNullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();// worker数量比核心线程数小,直接创建worker执行任务if(workerCountOf(c)< corePoolSize){if(addWorker(command,true))return;c = ctl.get();}// worker数量超过核心线程数,任务直接进入队列if(isRunning(c)&& workQueue.offer(command)){int recheck = ctl.get();// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。if(!isRunning(recheck)&&remove(command))reject(command);// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0elseif(workerCountOf(recheck)==0)addWorker(null,false);}// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。// 这儿有3点需要注意:// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态// 2. addWorker第2个参数表示是否创建核心线程// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作elseif(!addWorker(command,false))reject(command);}
4、addworker源码解析
privatebooleanaddWorker(Runnable firstTask,boolean core){retry:// 外层自旋for(;;){int c = ctl.get();int rs =runStateOf(c);// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价// (rs > SHUTDOWN) || // (rs == SHUTDOWN && firstTask != null) || // (rs == SHUTDOWN && workQueue.isEmpty())// 1. 线程池状态大于SHUTDOWN时,直接返回false// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false// Check if queue empty only if necessary.if(rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask ==null&&! workQueue.isEmpty()))returnfalse;// 内层自旋for(;;){int wc =workerCountOf(c);// worker数量超过容量,直接返回falseif(wc >= CAPACITY ||wc >=(core ? corePoolSize : maximumPoolSize))returnfalse;// 使用CAS的方式增加worker数量。// 若增加成功,则直接跳出外层循环进入到第二部分if(compareAndIncrementWorkerCount(c))break retry;c = ctl.get();// Re-read ctl// 线程池状态发生变化,对外层循环进行自旋if(runStateOf(c)!= rs)continue retry;// 其他情况,直接内层循环进行自旋即可// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted =false;boolean workerAdded =false;Worker w =null;try{w =newWorker(firstTask);finalThread t = w.thread;if(t !=null){finalReentrantLock mainLock =this.mainLock;// worker的添加必须是串行的,因此需要加锁mainLock.lock();try{// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 这儿需要重新检查线程池状态int rs =runStateOf(ctl.get());if(rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask ==null)){// worker已经调用过了start()方法,则不再创建workerif(t.isAlive())// precheck that t is startablethrownewIllegalThreadStateException();// worker创建并添加到workers成功workers.add(w);// 更新`largestPoolSize`变量int s = workers.size();if(s > largestPoolSize)largestPoolSize = s;workerAdded =true;}}finally{mainLock.unlock();}// 启动worker线程if(workerAdded){t.start();workerStarted =true;}}}finally{// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作if(! workerStarted)addWorkerFailed(w);}return workerStarted;}
5、线程池worker任务单元
privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/privatestaticfinallong serialVersionUID =6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */finalThread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatilelong completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask){setState(-1);// inhibit interrupts until runWorkerthis.firstTask = firstTask;// 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前workerthis.thread =getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */publicvoidrun(){runWorker(this);}// 省略代码...}
6、核心线程执行逻辑-runworker
finalvoidrunWorker(Worker w){Thread wt =Thread.currentThread();Runnable task = w.firstTask;w.firstTask =null;// 调用unlock()是为了让外部可以中断w.unlock();// allow interrupts// 这个变量用于判断是否进入过自旋(while循环)boolean completedAbruptly =true;try{// 这儿是自旋// 1. 如果firstTask不为null,则执行firstTask;// 2. 如果firstTask为null,则调用getTask()从队列获取任务。// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待while(task !=null||(task =getTask())!=null){// 这儿对worker进行加锁,是为了达到下面的目的// 1. 降低锁范围,提升性能// 2. 保证每个worker执行的任务是串行的w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 如果线程池正在停止,则对当前线程进行中断操作if((runStateAtLeast(ctl.get(), STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(), STOP)))&&!wt.isInterrupted())wt.interrupt();// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。// 这两个方法在当前类里面为空实现。try{beforeExecute(wt, task);Throwable thrown =null;try{task.run();}catch(RuntimeException x){thrown = x;throw x;}catch(Error x){thrown = x;throw x;}catch(Throwable x){thrown = x;thrownewError(x);}finally{afterExecute(task, thrown);}}finally{// 帮助gctask =null;// 已完成任务数加一 w.completedTasks++;w.unlock();}}completedAbruptly =false;}finally{// 自旋操作被退出,说明线程池正在结束processWorkerExit(w, completedAbruptly);}}