前端技术
HTML
CSS
Javascript
前端框架和UI库
VUE
ReactJS
AngularJS
JQuery
NodeJS
JSON
Element-UI
Bootstrap
Material UI
服务端和客户端
Java
Python
PHP
Golang
Scala
Kotlin
Groovy
Ruby
Lua
.net
c#
c++
后端WEB和工程框架
SpringBoot
SpringCloud
Struts2
MyBatis
Hibernate
Tornado
Beego
Go-Spring
Go Gin
Go Iris
Dubbo
HessianRPC
Maven
Gradle
数据库
MySQL
Oracle
Mongo
中间件与web容器
Redis
MemCache
Etcd
Cassandra
Kafka
RabbitMQ
RocketMQ
ActiveMQ
Nacos
Consul
Tomcat
Nginx
Netty
大数据技术
Hive
Impala
ClickHouse
DorisDB
Greenplum
PostgreSQL
HBase
Kylin
Hadoop
Apache Pig
ZooKeeper
SeaTunnel
Sqoop
Datax
Flink
Spark
Mahout
数据搜索与日志
ElasticSearch
Apache Lucene
Apache Solr
Kibana
Logstash
数据可视化与OLAP
Apache Atlas
Superset
Saiku
Tesseract
系统与容器
Linux
Shell
Docker
Kubernetes
[嵌套列表列的DataFrame展开操作 ]的搜索结果
这里是文章列表。热门标签的颜色随机变换,标签颜色没有特殊含义。
点击某个标签可搜索标签相关的文章。
点击某个标签可搜索标签相关的文章。
转载文章
...指向堆外内存(直接被操作系统管理的内存),JVM无法对其回收 当虚引用对象被回收时,JVM的垃圾回收无法自动回收堆外内存, 但是此时,虚引用对象被回收,会将其放在队列中 操作人员,看到队列中有对象被回收,就进行相应操作,回收堆内存 如何回收堆外内存 C和C++有函数可以用 java现在也提供了Unsafe类可以操作堆外内存,具体请参考上一篇博客,总之,JDK1.8只能通过反射来用,JDK1.9以上可以通过new Unsafe对象来用 Unsafe类的方法有: copyMemory():直接访问内存 allocateMemory():直接分配内存,这就必须手动回收内存了 freeMemory():回收内存 下面是一个虚引用例子,自己看吧,懂得自然懂,现在看不懂的,先收藏或者保存上,以后回来看 / 一个对象是否有虚引用的存在,完全不会对其生存时间构成影响, 也无法通过虚引用来获取一个对象的实例。 为一个对象设置虚引用关联的唯一目的就是能在这个对象被收集器回收时收到一个系统通知。 虚引用和弱引用对关联对象的回收都不会产生影响,如果只有虚引用活着弱引用关联着对象, 那么这个对象就会被回收。它们的不同之处在于弱引用的get方法,虚引用的get方法始终返回null, 弱引用可以使用ReferenceQueue,虚引用必须配合ReferenceQueue使用。 jdk中直接内存的回收就用到虚引用,由于jvm自动内存管理的范围是堆内存, 而直接内存是在堆内存之外(其实是内存映射文件,自行去理解虚拟内存空间的相关概念), 所以直接内存的分配和回收都是有Unsafe类去操作,java在申请一块直接内存之后, 会在堆内存分配一个对象保存这个堆外内存的引用, 这个对象被垃圾收集器管理,一旦这个对象被回收, 相应的用户线程会收到通知并对直接内存进行清理工作。 事实上,虚引用有一个很重要的用途就是用来做堆外内存的释放, DirectByteBuffer就是通过虚引用来实现堆外内存的释放的。/import java.lang.ref.PhantomReference;import java.lang.ref.Reference;import java.lang.ref.ReferenceQueue;import java.util.LinkedList;import java.util.List;public class T04_PhantomReference {private static final List<Object> LIST = new LinkedList<>();private static final ReferenceQueue<M> QUEUE = new ReferenceQueue<>();public static void main(String[] args) {PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);new Thread(() -> {while (true) {LIST.add(new byte[1024 1024]);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}System.out.println(phantomReference.get());} }).start();new Thread(() -> {while (true) {Reference<? extends M> poll = QUEUE.poll();if (poll != null) {System.out.println("--- 虚引用对象被jvm回收了 ---- " + poll);} }}).start();try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} }} 2、容器 1、发展历史(一定要了解) map容器你需要了解的历史 JDK早期,java提供了Vector和Hashtable两个容器,这两个容器,很多操作都加了锁Synchronized,对于某些不需要用锁的情况下,就显得十分影响性能,所以现在基本没人用这两个容器,但是面试经常问这两个容器里面的数据结构等内容 后来,出现了HashMap,此容器完全不加锁,是用的最多的容器 但是完全不加锁未免不完善,所以java提供了如下方式,将HashMap变为加锁的 //通过Collections.synchronizedMap(HashMap)方法,将其变为加锁Map集合,其中泛型随意,UUID只是举例。static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>()); 通过阅读源码发现,上面方法将HashMap变为加锁,也是使用Synchronized,只是锁的内容更细,但并不比HashTable效率高多少 所以衍生除了新的容器ConcurrentHashMap ConcurrentHashMap 此容器,插入效率不如上面的,因为它做了各种判断和CAS,但是差距不是特别大 读取效率很高,100个线程同时访问,每个线程读取一百万次实测 Hashtable 39s ,SynchronizedHashMap 38s ,ConcurrentHashMap 1.7s 前两个将近40秒,ConcurrentHashMap只需要不到2s,由此可见此容器读取效率极高 2、为什么推荐使用Queue来做高并发 为什么推荐Queue(队列) Queue接口提供了很多针对多线程非常友好的API(offer ,peek和poll,其中BlockingQueue还添加了put和take可以阻塞),可以说专门为多线程高并发而创造的接口,所以一般我们使用Queue而不用List 以下代码分别使用链表LinkList和ConcurrentQueue,对比一下速度 LinkList用了5s多,ConcurrentQueue几乎瞬间完成 Concurrent接口就是专为多线程设计,多线程设计要多考虑Queue(高并发用)的使用,少使用List / 有N张火车票,每张票都有一个编号 同时有10个窗口对外售票 请写一个模拟程序 分析下面的程序可能会产生哪些问题? 重复销售?超量销售? 使用Vector或者Collections.synchronizedXXX 分析一下,这样能解决问题吗? 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步 就像这个程序,判断size和进行remove必须是一整个的原子操作 @author 马士兵/import java.util.LinkedList;import java.util.List;import java.util.concurrent.TimeUnit;public class TicketSeller3 {static List<String> tickets = new LinkedList<>();static {for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);}public static void main(String[] args) {for(int i=0; i<10; i++) {new Thread(()->{while(true) {synchronized(tickets) {if(tickets.size() <= 0) break;try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("销售了--" + tickets.remove(0));} }}).start();} }} 队列 import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);}public static void main(String[] args) {for(int i=0; i<10; i++) {new Thread(()->{while(true) {String s = tickets.poll();if(s == null) break;else System.out.println("销售了--" + s);} }).start();} }} 3、多线程常用容器 1、ConcurrentHashMap(无序)和ConcurrentSkipListMap(有序,链表,使用跳表数据结构,让查询更快) 跳表:http://blog.csdn.net/sunxianghuang/article/details/52221913 import java.util.;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentSkipListMap;import java.util.concurrent.CountDownLatch;public class T01_ConcurrentMap {public static void main(String[] args) {Map<String, String> map = new ConcurrentHashMap<>();//Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序//Map<String, String> map = new Hashtable<>();//Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX//TreeMapRandom r = new Random();Thread[] ths = new Thread[100];CountDownLatch latch = new CountDownLatch(ths.length);long start = System.currentTimeMillis();for(int i=0; i<ths.length; i++) {ths[i] = new Thread(()->{for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));latch.countDown();});}Arrays.asList(ths).forEach(t->t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(map.size());} } 2、CopyOnWriteList(写时复制)和CopyOnWriteSet 适用于,高并发是,读的多,写的少的情况 当我们写的时候,将容器复制,让写线程去复制的线程写(写的时候加锁) 而读线程依旧去读旧的(读的时候不加锁) 当写完,将对象指向复制后的已经写完的容器,原来容器销毁 大大提高读的效率 / 写时复制容器 copy on write 多线程环境下,写时效率低,读时效率高 适合写少读多的环境 @author 马士兵/import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Random;import java.util.Vector;import java.util.concurrent.CopyOnWriteArrayList;public class T02_CopyOnWriteList {public static void main(String[] args) {List<String> lists = //new ArrayList<>(); //这个会出并发问题!//new Vector();new CopyOnWriteArrayList<>();Random r = new Random();Thread[] ths = new Thread[100];for(int i=0; i<ths.length; i++) {Runnable task = new Runnable() {@Overridepublic void run() {for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000));} };ths[i] = new Thread(task);}runAndComputeTime(ths);System.out.println(lists.size());}static void runAndComputeTime(Thread[] ths) {long s1 = System.currentTimeMillis();Arrays.asList(ths).forEach(t->t.start());Arrays.asList(ths).forEach(t->{try {t.join();} catch (InterruptedException e) {e.printStackTrace();} });long s2 = System.currentTimeMillis();System.out.println(s2 - s1);} } 3、synchronizedList和ConcurrentLinkedQueue package com.mashibing.juc.c_025;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;public class T04_ConcurrentQueue {public static void main(String[] args) {List<String> strsList = new ArrayList<>();List<String> strsSync = Collections.synchronizedList(strsList);//加锁ListQueue<String> strs = new ConcurrentLinkedQueue<>();//Concurrent链表队列,就是读快for(int i=0; i<10; i++) {strs.offer("a" + i); //add添加,但是不同点是,此方法会返回一个布尔值}System.out.println(strs);System.out.println(strs.size());System.out.println(strs.poll());//取出,取完后将元素去除System.out.println(strs.size());System.out.println(strs.peek());//取出,但是不会将元素从队列删除System.out.println(strs.size());//双端队列Deque} } 4、LinkedBlockingQueue 链表阻塞队列(无界链表,可以一直装东西,直到内存满(其实,也不是无限,其长度Integer.MaxValue就是上限,毕竟最大就这么大)) 主要体现在put和take方法,put添加的时候,如果队列满了,就阻塞当前线程,直到队列有空位,继续插入。take方法取的时候,如果没有值,就阻塞,等有值了,立马去取 import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class T05_LinkedBlockingQueue {static BlockingQueue<String> strs = new LinkedBlockingQueue<>();static Random r = new Random();public static void main(String[] args) {new Thread(() -> {for (int i = 0; i < 100; i++) {try {strs.put("a" + i); //如果满了,当前线程就会等待(实现阻塞),等多会有空位,将值插入TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();} }}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(() -> {for (;;) {try {System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //取内容,如果空了,当前线程就会等待(实现阻塞)} catch (InterruptedException e) {e.printStackTrace();} }}, "c" + i).start();} }} 5、ArrayBlockingQueue 有界阻塞队列(因为Array需要指定长度) import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;public class T06_ArrayBlockingQueue {static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);static Random r = new Random();public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {strs.put("a" + i);}//strs.put("aaa"); //满了就会等待,程序阻塞//strs.add("aaa");//strs.offer("aaa");strs.offer("aaa", 1, TimeUnit.SECONDS);System.out.println(strs);} } 6、特殊的阻塞队列1:DelayQueue 延时队列(按时间进行调度,就是隔多长时间运行,谁隔的少,谁先) 以下例子中,我们添加线程到队列顺序为t12345,正常情况下,会按照顺序运行,但是这里有了延时时间,也就是时间越短,越先执行 步骤很简单,拿到延时队列 指定构造方法 继承 implements Delayed 重写 compareTo和getDelay import java.util.Calendar;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class T07_DelayQueue {static BlockingQueue<MyTask> tasks = new DelayQueue<>();static Random r = new Random();static class MyTask implements Delayed {String name;long runningTime;MyTask(String name, long rt) {this.name = name;this.runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1;else return 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return name + " " + runningTime;} }public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask("t1", now + 1000);MyTask t2 = new MyTask("t2", now + 2000);MyTask t3 = new MyTask("t3", now + 1500);MyTask t4 = new MyTask("t4", now + 2500);MyTask t5 = new MyTask("t5", now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for(int i=0; i<5; i++) {System.out.println(tasks.take());//获取的是toString方法返回值} }} 7、特殊的阻塞队列2:PriorityQueque 优先队列(二叉树算法,就是排序) import java.util.PriorityQueue;public class T07_01_PriorityQueque {public static void main(String[] args) {PriorityQueue<String> q = new PriorityQueue<>();q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");for (int i = 0; i < 5; i++) {System.out.println(q.poll());} }} 8、特殊的阻塞队列3:SynchronusQueue 同步队列(线程池用处非常大) 此队列容量为0,当插入元素时,必须同时有个线程往外取 就是说,当你往这个队列里面插入一个元素,它就拿着这个元素站着(阻塞),直到有个取元素的线程来,它就把元素交给它 就是用来同步数据的,也就是线程间交互数据用的一个特殊队列 package com.mashibing.juc.c_025;import java.util.concurrent.BlockingQueue;import java.util.concurrent.SynchronousQueue;public class T08_SynchronusQueue { //容量为0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(()->{//这个线程就是消费者,来取值try {System.out.println(strs.take());//和同步队列要值} catch (InterruptedException e) {e.printStackTrace();} }).start();strs.put("aaa"); //阻塞等待消费者消费,就拿着aaa站着,等线程来取//strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());} } 9、特殊的阻塞队列4:TransferQueue 传递队列 此队列加入了一个方法transfer()用来向队列添加元素 但是和put()方法不同的是,put添加完元素就走了 而这个方法,添加完自己就阻塞了,直到有人将这个元素取走,它才继续工作(省去我们手动阻塞) import java.util.concurrent.LinkedTransferQueue;public class T09_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();} }).start();strs.transfer("aaa");//放东西到队列,同时阻塞等待消费者线程,取走元素//strs.put("aaa");//如果用put就和普通队列一样,放完东西就走了/new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();} }).start();/} } 3、线程池 线程池 由于单独创建线程,十分影响效率,而且无法对线程集中管理,一旦疏落,可能线程无限执行,浪费资源 线程池就是一个存储线程的游泳池,而每个线程就是池子里面的赛道 池子里的线程不执行任何任务,只是提供一个资源 而谁提交了任务,比如我想来游泳,那么池子就给你一个赛道,让你游泳 比如它想练憋气,那么给它一个赛道练憋气 当他们用完,走了,那么后面其它人再过来继续用 这就是线程池,始终只有这几个线程,不做实现,而是借用这几个线程的用户,自己掌控用这些线程资源做什么(提交任务给线程,线程空闲就帮他们完成任务) 线程池的两种类型(两类,不是两个) ThreadPoolExecutor(简称TPE) ForkJoinPool(分解汇总任务(将任务细化,最后汇总结果),少量线程执行多个任务(子任务,TPE做不到先执行子任务),CPU密集型) Executors(注意这后面有s) 它可以说是线程池工厂类,我们一般通过它创建线程池,并且它为我们封装了线程 1、常用类 Executor ExecutorService 扩展了execute方法,具有一个返回值 规定了异步执行机制,提供了一些执行器方法,比如shutdown()关闭等 但是它不知道执行器中的线程何时执行完 Callable 对Runnable进行了扩展,实现Callable的调用,可以有返回值,表示线程的状态 但是无法返回线程执行结果 Future 获得未来线程执行结果 由此,我们可以得知线程池基本的一个使用步骤 其中service.submit():为异步提交,也就是说,主线程该干嘛干嘛,我是异步执行的,和同步不一样(当前线程执行完,主线程才能继续执行,叫同步) futuer.get():获取结果集结果,此时因为异步,主线程执行到这里,结果集可能还没封装好,所以此时如果没有值,就阻塞,直到结果集出来 public static void main(String[] args) throws ExecutionException, InterruptedException {Callable<String> c = new Callable() {@Overridepublic String call() throws Exception {return "Hello Callable";} };ExecutorService service = Executors.newCachedThreadPool();Future<String> future = service.submit(c); //异步System.out.println(future.get());//阻塞service.shutdown();} 2、FutureTask 可充当任务的结果集 上面我们介绍Future是用来得到任务的执行结果的 而FutureTask,可以当做一个任务用,并且返回任务的结果,也就是可以跑线程,然后还可以得到线程结果 public static void main(String[] args) throws InterruptedException, ExecutionException {FutureTask<Integer> task = new FutureTask<>(()->{TimeUnit.MILLISECONDS.sleep(500);return 1000;}); //new Callable () { Integer call();}new Thread(task).start();System.out.println(task.get()); //阻塞} 3、CompletableFuture 非常灵活的任务结果集 一个非常灵活的结果集 他可以将很多执行不同任务的线程的结果进行汇总 比如一个网站,它可以启动多个线程去各大电商网站,比如淘宝,京东,收集某些或某一个商品的价格 最后,将获取的数据进行整合封装 最终,客户就可以通过此网站,获取某类商品在各网站的价格信息 / 假设你能够提供一个服务 这个服务查询各大电商网站同一类产品的价格并汇总展示 @author 马士兵 http://mashibing.com/import java.io.IOException;import java.util.Random;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class T06_01_CompletableFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {long start, end;/start = System.currentTimeMillis();priceOfTM();priceOfTB();priceOfJD();end = System.currentTimeMillis();System.out.println("use serial method call! " + (end - start));/start = System.currentTimeMillis();CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(()->priceOfTM());CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(()->priceOfTB());CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(()->priceOfJD());CompletableFuture.allOf(futureTM, futureTB, futureJD).join();//当所有结果集都获取到,才汇总阻塞CompletableFuture.supplyAsync(()->priceOfTM()).thenApply(String::valueOf).thenApply(str-> "price " + str).thenAccept(System.out::println);end = System.currentTimeMillis();System.out.println("use completable future! " + (end - start));try {System.in.read();} catch (IOException e) {e.printStackTrace();} }private static double priceOfTM() {delay();return 1.00;}private static double priceOfTB() {delay();return 2.00;}private static double priceOfJD() {delay();return 3.00;}/private static double priceOfAmazon() {delay();throw new RuntimeException("product not exist!");}/private static void delay() {int time = new Random().nextInt(500);try {TimeUnit.MILLISECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}System.out.printf("After %s sleep!\n", time);} } 4、TPE型线程池1:ThreadPoolExecutor 原理及其参数 线程池由两个集合组成,一个集合存储线程,一个集合存储任务 存储线程:可以规定大小,最多可以有多少个,以及指定核心线程数量(不会被回收) 任务队列:存储任务 细节:初始线程池没有线程,当有一个任务来,线程池起一个线程,又有一个任务来,再起一个线程,直到达到核心线程数量 核心线程数量达到时,新来的任务将存储到任务队列中等待核心线程处理完成,直到任务队列也满了 当任务队列满了,此时再次启动一个线程(非核心线程,一旦空闲,达到指定时间将会消失),直到达到线程最大数量 当线程容器和任务容器都满了,又来了线程,将会执行拒绝策略 上面的细节涉及的所有步骤内容,均由创建线程池的参数执行 下面是ThreadPoolExecutor构造方法参数的源码注释 / 用给定的初始值,创建一个新的线程池 @param corePoolSize 核心线程数量 @param maximumPoolSize 最大线程数量 @param keepAliveTime 当线程数大于核心线程数量时,空闲的线程可生存的时间 @param unit 时间单位 @param workQueue 任务队列,只能包含由execute提交的Runnable任务 @param threadFactory 工厂,用于创建线程给线程池调度的工厂,可以自定义 @param handler 拒绝策略(可以自定义,JDK默认提供4种),当线程边界和队列容量已经满了,新来线程被阻塞时使用的处理程序/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) JDK提供的4种拒绝策略,不常用,一般都是自己定义拒绝策略 Abort:抛异常 Discard:扔掉,不抛异常 DiscardOldest:扔掉排队时间最久的(将队列中排队时间最久的扔掉,然后让新来的进来) CallerRuns:调用者处理任务(谁通过execute方法提交任务,谁处理) ThreadPoolExecutor继承关系 继承关系:ThreadPoolExecutor->AbstractExectorService类->ExectorService接口->Exector接口 Executors(注意这后面有s) 它可以说是线程池工厂类,我们一般通过它创建线程池,并且它为我们封装了线程 看看下面创建线程池,哪里用到了它 使用实例 import java.io.IOException;import java.util.concurrent.;public class T05_00_HelloThreadPool {static class Task implements Runnable {private int i;public Task(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " Task " + i);try {System.in.read();} catch (IOException e) {e.printStackTrace();} }@Overridepublic String toString() {return "Task{" +"i=" + i +'}';} }public static void main(String[] args) {ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(4),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());//创建线程池,核心2个,最大4个,空闲线程存活时间60s,任务队列容量4,使用默认线程工程,创建线程。拒绝策略是JDK提供的for (int i = 0; i < 8; i++) {tpe.execute(new Task(i));//供提交8次任务}System.out.println(tpe.getQueue());//查看任务队列tpe.execute(new Task(100));//提交新的任务System.out.println(tpe.getQueue());tpe.shutdown();//关闭线程池} } 5、TPE型线程池2:SingleThreadPool 单例线程池(只有一个线程) 为什么有单例线程池 有任务队列,有线程池管理机制 Executors(注意这后面有s) 它可以说是线程池工厂类,我们一般通过它创建线程池,并且它为我们封装了线程 看看下面哪里用到了它 /创建单例线程池,扔5个任务进去,查看输出结果,看看有几个线程执行任务/import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class T07_SingleThreadPool {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();for(int i=0; i<5; i++) {final int j = i;service.execute(()->{System.out.println(j + " " + Thread.currentThread().getName());});} }} 6、TPE型线程池3:CachedPool 缓存,存储线程池 此线程池没有核心线程,来一个任务启动一个线程(最多Integer.MaxValue,不会放在任务队列,因为任务队列容量为0),每个线程空闲后,只能活60s 实例 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class T07_SingleThreadPool {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();//通过Executors获取池子for(int i=0; i<5; i++) {final int j = i;service.execute(()->{//提交任务System.out.println(j + " " + Thread.currentThread().getName());});}service.shutdown();} } 7、TPE型线程池4:FixedThreadPool 固定线程池 此线次池,用于创建一个固定线程数量的线程池,不会回收 实例 import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class T09_FixedThreadPool {public static void main(String[] args) throws InterruptedException, ExecutionException {//并发执行long start = System.currentTimeMillis();getPrime(1, 200000); long end = System.currentTimeMillis();System.out.println(end - start);//输出并发执行耗费时间final int cpuCoreNum = 4;//并行执行ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20MyTask t2 = new MyTask(80001, 130000);MyTask t3 = new MyTask(130001, 170000);MyTask t4 = new MyTask(170001, 200000);Future<List<Integer>> f1 = service.submit(t1);Future<List<Integer>> f2 = service.submit(t2);Future<List<Integer>> f3 = service.submit(t3);Future<List<Integer>> f4 = service.submit(t4);start = System.currentTimeMillis();f1.get();f2.get();f3.get();f4.get();end = System.currentTimeMillis();System.out.println(end - start);//输出并行耗费时间}static class MyTask implements Callable<List<Integer>> {int startPos, endPos;MyTask(int s, int e) {this.startPos = s;this.endPos = e;}@Overridepublic List<Integer> call() throws Exception {List<Integer> r = getPrime(startPos, endPos);return r;} }static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;}return true;}static List<Integer> getPrime(int start, int end) {List<Integer> results = new ArrayList<>();for(int i=start; i<=end; i++) {if(isPrime(i)) results.add(i);}return results;} } 8、TPE型线程池5:ScheduledPool 预定,延时线程池 根据延时时间(隔多长时间后运行),排序,哪个线程先执行,用户只需要指定核心线程数量 此线程池返回的池对象,和提交任务方法都不一样,比较涉及到时间 import java.util.Random;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class T10_ScheduledPool {public static void main(String[] args) {ScheduledExecutorService service = Executors.newScheduledThreadPool(4);service.scheduleAtFixedRate(()->{//提交延时任务try {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());}, 0, 500, TimeUnit.MILLISECONDS);//指定延时时间和单位,第一个任务延时0毫秒,之后的任务,延时500毫秒} } 9、手写拒绝策略小例子 import java.util.concurrent.;public class T14_MyRejectedHandler {public static void main(String[] args) {ExecutorService service = new ThreadPoolExecutor(4, 4,0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),Executors.defaultThreadFactory(),new MyHandler());//将手写拒绝策略传入}static class MyHandler implements RejectedExecutionHandler {//1、继承RejectedExecutionHandler@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {//2、重写方法//log("r rejected")//伪代码,表示通过log4j.log()报一下日志,拒绝的时间,线程名//save r kafka mysql redis//可以尝试保存队列//try 3 times //可以尝试几次,比如3次,重新去抢队列,3次还不行就丢弃if(executor.getQueue().size() < 10000) {//尝试条件,如果size>10000了,就执行拒绝策略//try put again();//如果小于10000,尝试将其放到队列中} }} } 10、ForkJoinPool线程池1:ForkJoinPool 前面我们讲过线程分为两大类,TPE和FJP ForkJoinPool(分解汇总任务(将任务细化,最后汇总结果),少量线程执行多个任务(子任务,TPE做不到先执行子任务),CPU密集型) 适合将大任务切分成多个小任务运行 两个方法,fork():分子任务,将子任务分配到线程池中 join():当前任务的计算结果,如果有子任务,等子任务结果返回后再汇总 下面实例实现,一百万个随机数求和,由两种方法实现,一种ForkJoinPool分任务并行,一种使用单线程做 import java.io.IOException;import java.util.Arrays;import java.util.Random;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveAction;import java.util.concurrent.RecursiveTask;public class T12_ForkJoinPool {//1000000个随机数求和static int[] nums = new int[1000000];//一堆数static final int MAX_NUM = 50000;//分任务时,每个任务的操作量不能多于50000个,否则就继续细分static Random r = new Random();//使用随机数将数组初始化static {for(int i=0; i<nums.length; i++) {nums[i] = r.nextInt(100);}System.out.println("---" + Arrays.stream(nums).sum()); //stream api 单线程就这么做,一个一个加}//分任务,需要继承,可以继承RecursiveAction(不需要返回值,一般用在不需要返回值的场景)或//RecursiveTask(需要返回值,我们用这个,因为我们需要最后获取求和结果)两个更好实现的类,//他俩继承与ForkJoinTaskstatic class AddTaskRet extends RecursiveTask<Long> {private static final long serialVersionUID = 1L;int start, end;AddTaskRet(int s, int e) {start = s;end = e;}@Overrideprotected Long compute() {if(end-start <= MAX_NUM) {//如果任务操作数小于规定的最大操作数,就进行运算,long sum = 0L;for(int i=start; i<end; i++) sum += nums[i];return sum;//返回结果} //如果分配的操作数大于规定,就继续细分(简单的重中点分,两半)int middle = start + (end-start)/2;//获取中间值AddTaskRet subTask1 = new AddTaskRet(start, middle);//传入起始值和中间值,表示一个子任务AddTaskRet subTask2 = new AddTaskRet(middle, end);//中间值和结尾值,表示一个子任务subTask1.fork();//分任务subTask2.fork();//分任务return subTask1.join() + subTask2.join();//最后返回结果汇总} }public static void main(String[] args) throws IOException {/ForkJoinPool fjp = new ForkJoinPool();AddTask task = new AddTask(0, nums.length);fjp.execute(task);/ForkJoinPool fjp = new ForkJoinPool();//创建线程池AddTaskRet task = new AddTaskRet(0, nums.length);//创建任务fjp.execute(task);//传入任务long result = task.join();//返回汇总结果System.out.println(result);//System.in.read();} } 11、ForkJoinPool线程池2:WorkStealingPool 任务偷取线程池 原来的线程池,都是有一个任务队列,而这个不同,它给每个线程都分配了一个任务队列 当某一个线程的任务队列没有任务,并且自己空闲,它就去其它线程的任务队列中偷任务,所以叫任务偷取线程池 细节:当线程自己从自己的任务队列拿任务时,不需要加锁,但是偷任务时,因为有两个线程,可能发生同步问题,需要加锁 此线程继承FJP 实例 import java.io.IOException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class T11_WorkStealingPool {public static void main(String[] args) throws IOException {ExecutorService service = Executors.newWorkStealingPool();System.out.println(Runtime.getRuntime().availableProcessors());service.execute(new R(1000));service.execute(new R(2000));service.execute(new R(2000));service.execute(new R(2000)); //daemonservice.execute(new R(2000));//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出System.in.read(); }static class R implements Runnable {int time;R(int t) {this.time = t;}@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(time + " " + Thread.currentThread().getName());} }} 12、流式API:ParallelStreamAPI 不懂的请参考:https://blog.csdn.net/grd_java/article/details/110265219 实例 import java.util.ArrayList;import java.util.List;import java.util.Random;public class T13_ParallelStreamAPI {public static void main(String[] args) {List<Integer> nums = new ArrayList<>();Random r = new Random();for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));//System.out.println(nums);long start = System.currentTimeMillis();nums.forEach(v->isPrime(v));long end = System.currentTimeMillis();System.out.println(end - start);//使用parallel stream apistart = System.currentTimeMillis();nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);//并行流,将任务切分成子任务执行end = System.currentTimeMillis();System.out.println(end - start);}static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;}return true;} } 13、总结 总结 Callable相当于一Runnable但是它有返回值 Future:存储执行完产生的结果 FutureTask 相当于Future+Runnable,既可以执行任务,又能获取任务执行的Future结果 CompletableFuture 可以多任务异步,并对多任务控制,整合任务结果,细化完美,比如可以一个任务完成就可以整合结果,也可以所有任务完成才整合结果 4、ThreadPoolExecutor源码解析 依然只讲重点,实际还需要大家按照上篇博客中看源码的方式来看 1、常用变量的解释 // 1. ctl,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 2. COUNT_BITS,Integer.SIZE为32,所以COUNT_BITS为29private static final int COUNT_BITS = Integer.SIZE - 3;// 3. CAPACITY,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATEDprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctl// 5. runStateOf(),获取线程池状态,通过按位与操作,低29位将全部变成0private static int runStateOf(int c) { return c & ~CAPACITY; }// 6. workerCountOf(),获取线程池worker数量,通过按位与操作,高3位将全部变成0private static int workerCountOf(int c) { return c & CAPACITY; }// 7. ctlOf(),根据线程池状态和线程池worker数量,生成ctl值private static int ctlOf(int rs, int wc) { return rs | wc; }/ Bit field accessors that don't require unpacking ctl. These depend on the bit layout and on workerCount being never negative./// 8. runStateLessThan(),线程池状态小于xxprivate static boolean runStateLessThan(int c, int s) {return c < s;}// 9. runStateAtLeast(),线程池状态大于等于xxprivate static boolean runStateAtLeast(int c, int s) {return c >= s;} 2、构造方法 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 基本类型参数校验if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();// 空指针校验if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;// 根据传入参数unit和keepAliveTime,将存活时间转换为纳秒存到变量keepAliveTime 中this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;} 3、提交执行task的过程 public void execute(Runnable command) {if (command == null)throw new NullPointerException();/ Proceed in 3 steps: 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false. 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none. 3. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task./int c = ctl.get();// worker数量比核心线程数小,直接创建worker执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// worker数量超过核心线程数,任务直接进入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。if (! isRunning(recheck) && remove(command))reject(command);// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。// 这儿有3点需要注意:// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态// 2. addWorker第2个参数表示是否创建核心线程// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作else if (!addWorker(command, false))reject(command);} 4、addworker源码解析 private boolean addWorker(Runnable firstTask, boolean core) {retry:// 外层自旋for (;;) {int c = ctl.get();int rs = runStateOf(c);// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价// (rs > SHUTDOWN) || // (rs == SHUTDOWN && firstTask != null) || // (rs == SHUTDOWN && workQueue.isEmpty())// 1. 线程池状态大于SHUTDOWN时,直接返回false// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 内层自旋for (;;) {int wc = workerCountOf(c);// worker数量超过容量,直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 使用CAS的方式增加worker数量。// 若增加成功,则直接跳出外层循环进入到第二部分if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl// 线程池状态发生变化,对外层循环进行自旋if (runStateOf(c) != rs)continue retry;// 其他情况,直接内层循环进行自旋即可// else CAS failed due to workerCount change; retry inner loop} }boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// worker的添加必须是串行的,因此需要加锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 这儿需要重新检查线程池状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// worker已经调用过了start()方法,则不再创建workerif (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// worker创建并添加到workers成功workers.add(w);// 更新largestPoolSize变量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;} } finally {mainLock.unlock();}// 启动worker线程if (workerAdded) {t.start();workerStarted = true;} }} finally {// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作if (! workerStarted)addWorkerFailed(w);}return workerStarted;} 5、线程池worker任务单元 private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/ This class will never be serialized, but we provide a serialVersionUID to suppress a javac warning./private static final long serialVersionUID = 6138294804551838833L;/ Thread this worker is running in. Null if factory fails. /final Thread thread;/ Initial task to run. Possibly null. /Runnable firstTask;/ Per-thread task counter /volatile long completedTasks;/ Creates with given first task and thread from ThreadFactory. @param firstTask the first task (null if none)/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前workerthis.thread = getThreadFactory().newThread(this);}/ Delegates main run loop to outer runWorker /public void run() {runWorker(this);}// 省略代码...} 6、核心线程执行逻辑-runworker final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 调用unlock()是为了让外部可以中断w.unlock(); // allow interrupts// 这个变量用于判断是否进入过自旋(while循环)boolean completedAbruptly = true;try {// 这儿是自旋// 1. 如果firstTask不为null,则执行firstTask;// 2. 如果firstTask为null,则调用getTask()从队列获取任务。// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待while (task != null || (task = getTask()) != null) {// 这儿对worker进行加锁,是为了达到下面的目的// 1. 降低锁范围,提升性能// 2. 保证每个worker执行的任务是串行的w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 如果线程池正在停止,则对当前线程进行中断操作if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();// 执行任务,且在执行前后通过beforeExecute()和afterExecute()来扩展其功能。// 这两个方法在当前类里面为空实现。try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);} } finally {// 帮助gctask = null;// 已完成任务数加一 w.completedTasks++;w.unlock();} }completedAbruptly = false;} finally {// 自旋操作被退出,说明线程池正在结束processWorkerExit(w, completedAbruptly);} } 本篇文章为转载内容。原文链接:https://blog.csdn.net/grd_java/article/details/113116244。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-07-21 16:19:45
328
转载
转载文章
...的状态信息,实现连续操作间的关联和认证,这对于处理需要保持登录状态或进行多次交互的网页抓取任务尤为关键。
2023-03-01 12:40:55
563
转载
站内搜索
用于搜索本网站内部文章,支持栏目切换。
知识学习
实践的时候请根据实际情况谨慎操作。
随机学习一条linux命令:
pkill process_name
- 结束与指定名称匹配的进程。
推荐内容
推荐本栏目内的其它文章,看看还有哪些文章让你感兴趣。
2023-04-28
2023-08-09
2023-06-18
2023-04-14
2023-02-18
2023-04-17
2024-01-11
2023-10-03
2023-09-09
2023-06-13
2023-08-07
2023-03-11
历史内容
快速导航到对应月份的历史文章列表。
随便看看
拉到页底了吧,随便看看还有哪些文章你可能感兴趣。
时光飞逝
"流光容易把人抛,红了樱桃,绿了芭蕉。"