我们先演示一下在多线程环境下,对共享资源操作不使用锁可能存在的问题。在介绍jvm 的锁synchronized ,java并发包的CAS 以及AQS 中的lock 锁。后面我们会模仿AQS自己手写一个简单的lock 锁。
我们定义一个变量 i ,使用100个线程,循环100次累加,我们期望的计算结果是10000。先不使用锁看运行的结果是否和我们的预期值一致。 定义一个NoLockAdd 类,代码如下:
/** * 无锁的累加 * @author yangyanping * @date 2020-09-04 */ public class NoLockAdd { private int sum; public int add() { return sum++; } public int getSum() { return sum; } }编写一个main方法,开启100个线程,循环100次调用NoLockAdd的add方法
public class TestLock { public static void main(String[] args) throws Exception { testNoLock(); } private static void testNoLock() throws Exception { NoLockAdd noLockAdd = new NoLockAdd(); for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int k = 0; k < 100; k++) { noLockAdd.add(); Thread.sleep(1); } } catch (Exception ex) { } } }); thread.start(); } Thread.sleep(5000); System.out.println(noLockAdd.getSum()); } }运行main方法,打印的结果为9851,并不是我们期望的结果值10000,为什么呢?
9851 Process finished with exit code 0我们在idea 的Terminal 窗口中使用javap 命令 看下NoLockAdd java的字节码 javap -p -c NoLockAdd.class
Compiled from "NoLockAdd.java" public class com.yyp.redis.lock.NoLockAdd { public com.yyp.redis.lock.NoLockAdd(); Code: 0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: return public int add(); Code: 0: aload_0 1: dup 2: getfield #2 // Field sum:I 5: dup_x1 6: iconst_1 7: iadd 8: putfield #2 // Field sum:I 11: ireturn public int getSum(); Code: 0: aload_0 1: getfield #2 // Field sum:I 4: ireturn }我们主要看 add() 方法的字节码,发现一条语句 sum++ 被编译了 多条 指令
指令描述aload_0从局部变量0中装载引用类型值入栈。iconst_11(int)值入栈。iadd将栈顶两int类型数相加,结果入栈。ireturn返回int类型值。我们来看下JVM的内存模型。i++ 的多条字节码 指令在执行中,可能是下图的一种情况。
悲观锁:即很悲观,每次拿数据的时候都觉得数据会被人更改,所以拿数据的时候就把这条记录锁掉,这样别人就没法改这条数据了,一直到你的锁释放。
synchronized 是Java中的关键字,是一种同步锁。可修饰实例方法,静态方法,代码块。它是一种悲观锁,一旦某一个线程获取到锁,其他需要锁的线程就会挂起
我们定义一个SysAdd 类,在add 方法上添加 synchronized 关键字
/** * synchronized * @author yangyanping * @date 2020-09-04 */ public class SysAdd { private int sum; public void add() { synchronized (this) { sum++; } } public int getSum() { return sum; } }编写main 方法 测试SysAdd 的累加
public class TestLock { public static void main(String[] args) throws Exception { testAtomicAdd(); } private static void testAtomicAdd() throws Exception{ SysAdd sysAdd = new SysAdd(); for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int k = 0; k < 100; k++) { sysAdd.add(); Thread.sleep(1); } } catch (Exception ex) { } } }); thread.start(); } Thread.sleep(5000); System.out.println(sysAdd.getSum()); } }运行结果为10000 和 我们期望的结果也一致:
10000 Process finished with exit code 0使用使用javap查看这段代码的字节码文件
ZBMAC-2f32839f6:lock yangyanping$ javap -v SysAdd.class Classfile /Users/yangyanping/Downloads/code/0818/yypredis/target/classes/com/yyp/redis/lock/SysAdd.class Last modified 2020-9-6; size 574 bytes MD5 checksum e56c8113a4571915bbc3a56347584f24 Compiled from "SysAdd.java" public class com.yyp.redis.lock.SysAdd minor version: 0 major version: 52 flags: ACC_PUBLIC, ACC_SUPER Constant pool: #1 = Methodref #4.#23 // java/lang/Object."<init>":()V #2 = Fieldref #3.#24 // com/yyp/redis/lock/SysAdd.sum:I #3 = Class #25 // com/yyp/redis/lock/SysAdd #4 = Class #26 // java/lang/Object #5 = Utf8 sum #6 = Utf8 I #7 = Utf8 <init> #8 = Utf8 ()V #9 = Utf8 Code #10 = Utf8 LineNumberTable #11 = Utf8 LocalVariableTable #12 = Utf8 this #13 = Utf8 Lcom/yyp/redis/lock/SysAdd; #14 = Utf8 add #15 = Utf8 StackMapTable #16 = Class #25 // com/yyp/redis/lock/SysAdd #17 = Class #26 // java/lang/Object #18 = Class #27 // java/lang/Throwable #19 = Utf8 getSum #20 = Utf8 ()I #21 = Utf8 SourceFile #22 = Utf8 SysAdd.java #23 = NameAndType #7:#8 // "<init>":()V #24 = NameAndType #5:#6 // sum:I #25 = Utf8 com/yyp/redis/lock/SysAdd #26 = Utf8 java/lang/Object #27 = Utf8 java/lang/Throwable { public com.yyp.redis.lock.SysAdd(); descriptor: ()V flags: ACC_PUBLIC Code: stack=1, locals=1, args_size=1 0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: return LineNumberTable: line 10: 0 LocalVariableTable: Start Length Slot Name Signature 0 5 0 this Lcom/yyp/redis/lock/SysAdd; public void add(); descriptor: ()V flags: ACC_PUBLIC Code: stack=3, locals=3, args_size=1 0: aload_0 1: dup 2: astore_1 3: monitorenter 4: aload_0 5: dup 6: getfield #2 // Field sum:I 9: iconst_1 10: iadd 11: putfield #2 // Field sum:I 14: aload_1 15: monitorexit 16: goto 24 19: astore_2 20: aload_1 21: monitorexit 22: aload_2 23: athrow 24: return Exception table: from to target type 4 16 19 any 19 22 19 any LineNumberTable: line 14: 0 line 15: 4 line 16: 14 line 17: 24 LocalVariableTable: Start Length Slot Name Signature 0 25 0 this Lcom/yyp/redis/lock/SysAdd; StackMapTable: number_of_entries = 2 frame_type = 255 /* full_frame */ offset_delta = 19 locals = [ class com/yyp/redis/lock/SysAdd, class java/lang/Object ] stack = [ class java/lang/Throwable ] frame_type = 250 /* chop */ offset_delta = 4 public int getSum(); descriptor: ()I flags: ACC_PUBLIC Code: stack=1, locals=1, args_size=1 0: aload_0 1: getfield #2 // Field sum:I 4: ireturn LineNumberTable: line 20: 0 LocalVariableTable: Start Length Slot Name Signature 0 5 0 this Lcom/yyp/redis/lock/SysAdd; }我们重点关注下,同步代码块:
public void add(); descriptor: ()V flags: ACC_PUBLIC Code: stack=3, locals=3, args_size=1 0: aload_0 1: dup 2: astore_1 3: monitorenter 4: aload_0 5: dup 6: getfield #2 // Field sum:I 9: iconst_1 10: iadd 11: putfield #2 // Field sum:I 14: aload_1 15: monitorexit 16: goto 24 19: astore_2 20: aload_1 21: monitorexit 22: aload_2 23: athrow 24: returnCAS: 乐观锁,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
我们使用java 并发包里的原子类操作AtomicInteger 来进行累加。 定义AtomicAdd类
/** * AtomicInteger的累加 * @author yangyanping * @date 2020-09-04 */ public class AtomicAdd { private AtomicInteger sum = new AtomicInteger(0); public void add() { sum.incrementAndGet(); } public int getSum() { return sum.get(); } }编写main 方法 测试AtomicAdd 的累加
public class TestLock { public static void main(String[] args) throws Exception { testAtomicAdd(); } private static void testAtomicAdd() throws Exception{ AtomicAdd atomicAdd = new AtomicAdd(); for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int k = 0; k < 100; k++) { atomicAdd.add(); Thread.sleep(1); } } catch (Exception ex) { } } }); thread.start(); } Thread.sleep(5000); System.out.println(atomicAdd.getSum()); } }运行结果为 10000,和我们期望的结果一致
10000 Process finished with exit code 0AQS 类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…。
我们使用java 并发包里的Lock 锁 LockAdd 类定义如下,也可以实现 安全的计数。
/** * ReentrantLock 使用 * @author yangyanping * @date 2020-09-04 */ public class LockAdd { private int sum; private final Lock lock = new ReentrantLock(); public void add() { lock.lock(); try { sum++; } finally { lock.unlock(); } } public int getSum() { return sum; } }基础知识
LinkedBlockingQueue 内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。
LockSupport 用来创建锁和其他同步类的基本线程阻塞原语。 此类以及每个使用它的线程与一个许可关联(从 Semaphore 类的意义上说)。如果该许可可用,并且可在进程中使用,则调用 park 将立即返回;否则可能 阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。(但与 Semaphore 不同的是,许可不能累积,并且最多只能有一个许可。) park 和 unpark 方法提供了阻塞和解除阻塞线程的有效方法,并且不会遇到导致过时方法 Thread.suspend 和 Thread.resume 因为以下目的变得不可用的问题:由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。此外,如果调用者线程被中断,并且支持超时,则 park 将返回。park 方法还可以在其他任何时间“毫无理由”地返回,因此通常必须在重新检查返回条件的循环里调用此方法。从这个意义上说,park 是“忙碌等待”的一种优化,它不会浪费这么多的时间进行自旋,但是必须将它与 unpark 配对使用才更高效。
我们模仿ReentrantLock,自己动手写一个YypLock 锁。
/** * 自定义锁 * @author yangyanping * @date 2020-09-3 */ public class YypLock implements Lock { /** * 独占资源所有者 */ private AtomicReference<Thread> owner = new AtomicReference<>(); /** * 等待的线程 */ private LinkedBlockingQueue<Thread> queue = new LinkedBlockingQueue(); @Override public void lock() { //当前线程 Thread thread = Thread.currentThread(); //获取锁失败,进入循环 while (!owner.compareAndSet(null, thread)) { //添加数据到等待队列 queue.offer(thread); //线程等待 LockSupport.park(); //线程被唤醒后,从等待集合中移除 queue.remove(thread); } } @Override public void unlock() { Thread thread = Thread.currentThread(); //释放锁 if (owner.compareAndSet(thread, null)) { Iterator<Thread> iterator = queue.iterator(); while (iterator.hasNext()) { Thread next = iterator.next(); if (next != null) { //唤醒线程 LockSupport.unpark(next); } } } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }定义YypLockAdd 类,使用YypLock 锁
/** * 使用自定义的Lock锁 * @author yangyanping * @date 2020-09-04 */ public class YypLockAdd { private int sum; private final YypLock lock = new YypLock(); public void add() { lock.lock(); try { sum++; } finally { lock.unlock(); } } public int getSum() { return sum; } }我们使用YypLock锁,测试并发计算的结果sum=10000
public class TestLock { public static void main(String[] args) throws Exception { testAtomicAdd(); } private static void testAtomicAdd() throws Exception{ YypLockAdd yypLockAdd = new YypLockAdd(); for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int k = 0; k < 100; k++) { yypLockAdd.add(); Thread.sleep(1); } } catch (Exception ex) { } } }); thread.start(); } Thread.sleep(5000); System.out.println(yypLockAdd.getSum()); } }运行程序,输出如下:
10000 Process finished with exit code 0