Java-阻塞队列-线程池-ThreadPoolExecutor-AtomicInteger-CountDownLatch-Semaphore

tech2023-01-23  54

1.阻塞队列

​ 组成: ​ ArrayBlockingQueue: 底层是数组,元素是有限的. ​ LinkedBlockingDeque: 底层是链表,元素是无限的.(并不是真正的无限,最大是int的最大值) ​ 方法: ​ public void put(E e); //存入元素.该方法是阻塞的.当容器中满了的时候,该方法就会停止在那里,等待着容器空 ​ public E take(); //获取元素.该方法是阻塞的.当容器中空的时候,该方法会停止在那里,等待着容器中存入元素 ​

import java.util.concurrent.ArrayBlockingQueue; public class Demo { public static void main(String[] args) { // 创建阻塞队列的对象,容量为 1 ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1); Foodie foodie = new Foodie(arrayBlockingQueue); Cooker cooker = new Cooker(arrayBlockingQueue); foodie.start(); cooker.start(); } } import java.util.concurrent.ArrayBlockingQueue; public class Cooker extends Thread { private ArrayBlockingQueue<String> bd; public Cooker(ArrayBlockingQueue<String> bd) { this.bd = bd; } @Override public void run() { while (true) { try { bd.put("汉堡包"); System.out.println("厨师放入一个汉堡包"); } catch (InterruptedException e) { e.printStackTrace(); } } } } import java.util.concurrent.ArrayBlockingQueue; public class Foodie extends Thread{ private ArrayBlockingQueue<String> bd; public Foodie(ArrayBlockingQueue<String> bd) { this.bd = bd; } @Override public void run() { while (true) { try { String take = bd.take(); System.out.println("吃货将" + take +"拿出来吃了"); } catch (InterruptedException e) { e.printStackTrace(); } } } }

2.线程池:

​ 概述: ​ 线程池,就是存储线程的池子.线程池可以创建和回收程池.

​ 线程:由计算机分配的.

​ 申请线程的过程,是一个非常耗时的操作.

​ 线程本身是需要占用系统资源的. 创建: ​ ExecutorService es = Executors.newCachedThreadPool(); //创建一个默认线程池.(内部最多有int最大值个线程) ​ ExecutorService es = Executors.newFixedThreadPool(int num); //创建一个线程池.(内部线程最大数量为num) ​ 使用: ​ submit(Runnable task); //提交任务.(把要执行的任务交给线程池,由线程池分配线程执行) ​ shutdown(); //关闭线程池 ​ 提高效率,节省资源

例子:

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestDemo { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); es.submit(new MyRunnable()); es.submit(new Thread(){ @Override public void run() { super.run(); } }); es.shutdown(); } } class MyRunnable implements Runnable{ @Override public void run() { for (int i = 0; i < 50; i++) { System.out.println(Thread.currentThread().getName()+":"+i); } } } import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MyThreadPoolDemo2 { public static void main(String[] args) { //创建线程池 最多有10个线程 ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.submit(()->{ System.out.println(Thread.currentThread().getName() + "在执行了"); }); executorService.submit(()->{ System.out.println(Thread.currentThread().getName() + "在执行了"); }); //关闭线程池 executorService.shutdown(); } }

3.ThreadPoolExecutor(了解)

​ 概述: ​ 用于自定义线程池. ​ 构造: ​ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); ​ 参数: ​ corePoolSize: //核心线程的最大值,不能小于0 ​ maximumPoolSize://最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize ​ keepAliveTime: //空闲线程最大存活时间,不能小于0 ​ unit: //空闲线程最大存活时间的时间单位,一般使用TimeUtil选项 ​ workQueue: //任务队列(排队的数量),不能为null ​ threadFactory: //创建线程方式,一般为"Executors.defaultThreadFactory()",不能为null ​ handler: //任务的拒绝策略(多余任务的处理方案),不能为null ​ 拒绝策略: ​ ThreadPoolExecutor.AbortPolicy: //丢弃任务并抛出RejectedExecutionException异常。是默认的策略。 ​ ThreadPoolExecutor.DiscardPolicy: //丢弃任务,但是不抛出异常 这是不推荐的做法。 ​ ThreadPoolExecutor.DiscardOldestPolicy://抛弃队列中等待最久的任务 然后把当前任务加入队列中。 ​ ThreadPoolExecutor.CallerRunsPolicy: //调用任务的run()方法绕过线程池直接执行。 ​ 练习:

public class MyThreadPoolDemo3 { public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3,2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //调用线程任务 pool.submit(new MyRunnable()); pool.submit(new MyRunnable()); pool.shutdown(); /* pool-1-thread-1在执行 pool-1-thread-2在执行 */ } }

4.可见性问题

​ 概述: ​ 多线程之间共享数据,在A线程修改数据的情况下,B没有看到A修改后的最新数据,这种现象就是"多线程的可见性问题". ​ 解决: ​ 1:使用volatile关键字,修饰该数据.

public class Test02 { public volatile static int money = 10000; public static void main(String[] args) { Girl girl = new Girl(); girl.setName("小路同学"); girl.start(); Boy boy = new Boy(); boy.setName("小张同学"); boy.start(); } } class Boy extends Thread{ @Override public void run() { //等待100ms try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //修改money Test02.money = 9000; } } class Girl extends Thread{ @Override public void run() { while(Test02.money == 10000){ } System.out.println("money不是一万了"); } }

​ 2:可以使用synchronized加锁解决. ​

public class Test01 { public static void main(String[] args) { Boy boy = new Boy(); boy.setName("小张同学"); boy.start(); Girl girl = new Girl(); girl.setName("小路同学"); girl.start(); } } class Money{ public static int money = 10000; //定义一个锁对象 public static Object lock = new Object(); } class Boy extends Thread{ @Override public void run() { //等待100ms synchronized (Money.lock) { try { Thread.sleep(100); //修改money Money.money = 9000; } catch (InterruptedException e) { e.printStackTrace(); } } } } class Girl extends Thread{ @Override public void run() { while (true){ synchronized (Money.lock){ if(Money.money != 10000){ System.out.println("money不是一万了"); break; } } } } }

5.原子性问题

​ 概述: ​ 原子性指的是"多个操作,要么一起成功,要么一起失败". ​ 多线程中的原子性问题,指的是:多个多线操作共同处理数据,并且出现了数据丢失的这种现象,就叫做"多线程的原子性问题" ​ 解决: ​ 使用原子性安全的类.

10000+10000 可能会出现不等于两万

public class Test { public static AtomicInteger count= new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { MyThread1 myThread1 = new MyThread1(); MyThread2 myThread2 = new MyThread2(); myThread1.start(); myThread2.start(); Thread.sleep(2000); System.out.println(Test.count); } } class MyThread1 extends Thread { @Override public void run() { for (int i = 0; i < 10000; i++) { Test.count.addAndGet(1); } } } class MyThread2 extends Thread { @Override public void run() { for (int i = 0; i < 10000; i++) { Test.count.addAndGet(1); } } }

6.AtomicInteger

​ 概述: 是一个原子性的Integer类.内部的操作可以保证多线程的原子性. ​ 构造: ​ public AtomicInteger(); //默认值为0 ​ public AtomicInteger(int num); //默认值为num ​ 方法: ​ public int addAndGet(int delta); //添加指定的值(可正可负),并返回添加后的值 ​ public int get(); //添加指定的值(可正可负),并返回添加后的值 ​

public class AtomicIntegerTest { public static void main(String[] args) { // public AtomicInteger(); 默认值为0 AtomicInteger ac1 = new AtomicInteger(); System.out.println(ac1.get());//0 // public AtomicInteger(int num); 默认值为num AtomicInteger ac2 = new AtomicInteger(10); System.out.println(ac2.get());//10 //getAndIncrement 原子当前值加1 返回自增前的值 AtomicInteger ac3 = new AtomicInteger(10); int andIncrement = ac3.getAndIncrement(); System.out.println(andIncrement);//10 System.out.println(ac3.get());//11 AtomicInteger ac4 = new AtomicInteger(10); int i = ac3.incrementAndGet(); System.out.println(i);//自增后的值 11 System.out.println(ac4.get()); // public int addAndGet(int delta); 添加指定的值(可正可负),并返回添加后的值 AtomicInteger ac5 = new AtomicInteger(10); int a = ac5.addAndGet(20); System.out.println(a);//30 System.out.println(ac5.get());//30 //getAndSet AtomicInteger ac6 = new AtomicInteger(10); int andSet = ac6.getAndSet(20); System.out.println(andSet);//10 System.out.println(ac6.get());//20 // public int get(); 添加指定的值(可正可负),并返回添加后的值 } }

7.并发包

​ 概述: 并发包就是一些线程安全的集合类. ​ 分类: ​ ConcurrentHashMap //线程安全的HsahMap ​ CopyOnWriteArrayList//线程安全的ArrayList ​ CopyOnWriteArraySet //线程安全的HsahSet ​

public class ConcurrentHashMapTest { public static void main(String[] args) throws InterruptedException { ConcurrentHashMap<String,String> ch = new ConcurrentHashMap(); Thread t1 = new Thread(()->{ for (int i = 0; i < 25; i++) { ch.put(i+"",i+""); } }); Thread t2 = new Thread(()->{ for (int i = 25; i < 51; i++) { ch.put(i+"",i+""); } }); t1.start(); t2.start(); System.out.println("--------------------"); //为了让t1和t2有足够的时间 Thread.sleep(1000); for (int i = 0; i < 51; i++) { System.out.println(ch.get(i+"")); } } }

8.CountDownLatch

​ 概述: 在多线程情况下,可以控制线程的执行顺序.确定某个线程在其他线程执行之后再执行. ​ 构造: ​ public CountDownLatch(int num); //创建一个对象,指定一个计数器 ​ 方法: ​ public void await(); //当前线程等待,当计数器归零的时候,自动唤醒等待的线程 ​ public void countDown();//计数器-1 ​

public class CountDownLatchTest { public static void main(String[] args) { //创建CountDownLatch对象 CountDownLatch countDownLatch = new CountDownLatch(3); //创建4个线程对象并开启 Mother mother = new Mother(countDownLatch); mother.start(); Child1 c1 = new Child1(countDownLatch); c1.setName("小美"); Child2 c2 = new Child2(countDownLatch); c2.setName("小在"); Child3 c3 = new Child3(countDownLatch); c3.setName("小分"); c1.start(); c2.start(); c3.start(); } } class Mother extends Thread{ private CountDownLatch countDownLatch; public Mother(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //等待 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("收拾碗筷"); } }

9.Semaphore

​ 概述: 可以控制"同时运行的线程的数量". ​ 构造: ​ public Semaphore(int num); //最多允许同时运行num个线程(可以少,但不可以多) ​ 方法: ​ void acquire(); //获取"许可证",运行线程运行 ​ void release(); //释放"许可证"

public class SemaphoreTest { public static void main(String[] args) { //创建对象 MyRunnable myRunnable = new MyRunnable(); for (int i = 0; i < 10; i++) { new Thread(myRunnable).start(); } } } class MyRunnable implements Runnable { //获得管理员对象 private Semaphore semaphore = new Semaphore(2); @Override public void run() { //获得通行证 try { semaphore.acquire(); //开始行驶 System.out.println("获得了通行证开始行驶"); Thread.sleep(2000); System.out.println("归还通行证"); //归还通行证 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }
最新回复(0)