手记

并发与锁

并发与锁

先列个提纲逐步细化。

一、 JUC AQS

1) AbstractQueuedSynchronizer 的结构

static final class Node {    static final Node SHARED = new Node(); // 共享模式的标记
    static final int CANCELLED = 1; // 标志着线程被取消
    static final int SIGNAL = -1; //标志着后在自己释放锁放弃等待时需要唤醒继节点(用于互斥锁)
    static final int CONDITION = -2; //标志着当前节点在条件等待队列中(由于条件是基于互斥锁的)
    static final int PROPAGATE = -3; // 标志着此链表存放都是共享锁等待节点,需要广播
    volatile int waitStatus; // 默认状态是0, 小于0的状态都是有特殊作用,大于0的状态表示已取消
    volatile Node prev;    volatile Node next;    volatile Thread thread; // 该节点对应的线程

    /**如果是SHARED,表示当前节点是共享模式,
    *  如果是null,当前节点是独占模式,
    *  如果是其他值,当前节点也是独占模式,不过这个值也是Condition队列的下一个节点。
    */
    Node nextWaiter;    // 是不是共享模式
    final boolean isShared() {        return nextWaiter == SHARED;
    }
}

2)互斥锁的获取过程

  • CAS争夺所有权
    没有在等待队列中就是个CAS没什么要说的,如果前驱结点是头节点,那就需要再次尝试CAS争夺所有权。需要注意的是无论锁可中断与否,此时获取锁的成功都是不响应中断请求的。

  • CAS 的精髓
    CAS的精髓有两个。首先是可以对当前值是预期相等才能CAS充分利用,结合不等式传递性,这样不仅可以做等值替换,大于小于都能做 stackoverflow上有例子就不列举了。再一个是增加的过程是逐步改变的从全局看是不能跳跃的;例如rotate 回转数的问题,netty线程老代码取余数的显然会被鄙视,后来2的指数用位运算回转勉强。

import java.util.concurrent.atomic.AtomicInteger;public class RotateAtomicInteger {  private final AtomicInteger val;  private final int min;  private final int max;  public RotateAtomicInteger( int min, int max) {    if(max-min<2){      throw new IllegalArgumentException("可能有ABA问题");
    }    this.val = new AtomicInteger(min);    this.min = min;    this.max = max;
  }  public int getAndRotateIncr(){    for(;;){      int cur=val.get();//因为如果cas失败说明之前有人成功过所以这个值就不能再用了
      int next = cur==max ? min :cur+1;//全局来看是不可跳跃
      if(val.compareAndSet(cur, next)){        return cur;
      }      
    }
  }

}

3) 获取失败挂起的过程

  • 入队的注意点
    如果竞争失败就加入队列,加入队列如果为空就从头插,否则从尾部插入,插入后不能闲着还要再试一次,即便失败了还不能立即park,要绕过取消的等待者,并且CAS标记前驱结点出队时别忘了唤醒自己。CAS失败了入队后的各种尝试还得再来一遍。

  • 如何防止丢失唤醒
    上面这样做就够了吗,显然还不行;满足自己去“睡”的条件的check和“睡”显然不是一个原子操作。在check后和“睡着”前如果条件变了就没人叫醒。 unpark和park早就预防相关问题,猜想内部是有变量记忆了上次的操作后状态,同时基于操作系统提供的锁保证了原子性。

4) 唤醒的过程

  • 释放锁唤醒等待的过程
    先释放锁标记,再置通知标记为取消状态,然后从后往前找离自己最近的未取消结点唤醒,为什么0状态也生效刚好上面已经解释过了。另外没区分条件队列,所以虚假唤醒这也是原因之一。

  • 如何防止惊群问题
    锁的唤醒通常是有唤醒一个等待线程和唤醒全部等待线程的;通常在编程条件下如果想减少全部唤醒的引发不必要的竞争时,还要注意虚假 唤醒问题,被唤醒的线程如果条件不满足条件释放锁后造成唤醒链脱节的问题。而这种情况又不能简单在自己释放锁前去唤醒其他等待者。 所以比如在leader-follower模式中,如果leader线程只在原leader线程离任时唤醒任意一个等待线程时,则当线程都在follower模式下因为没有唤醒者陷入停机状态,所以在每次完成工作时还要尝试自荐为leader。如果leader在离任时唤起其他所有线程显然是惊群的。

5) 条件等待队列

-为什么nextWaiter属性没用volatile修饰
首先如果要条件等待或者唤醒必须在持有锁(夹杂volatile中间)保证了可见性。

  • 等待条件的流程 待续

  • 唤醒条件的流程 待续

6) AQS小结

  • 为什么要双向链表?用带头尾指针的单向链表行不行呢?
    我认为问题比较麻烦,不能说完全不可以。主要问题是中间节点取消等待获取锁时,摘除这个节点需要对前一节点的后继指针做CAS,例如:A->B->C->D 这种情况,摘除B时需要对A的后继指针做CAS,此时如果又要摘除C由于C没有稳定前驱指针做不了。如果CAS前先原子更新当前节点为取消状态,然后对前一节点的两个字段同时做CAS(状态和后继指针域16字节的不知道除了X86其他平台是否支持),如果前一节点的状态已经为取消状态了,需要从头再查找当前线程在链表中的位置进而取得其前一节点重试。再有一种办法就是标记删除了,这个方式累积过多的无效节点显然是不合适的。总之这两种办法要么时间复杂度不符合要么空间复杂度不符合。

  • 流言:cas+clh队列语义上等价于锁
    这个说法的问题在于线程加入clh队列后,需要借助cpu指令中的屏蔽中断与禁止抢占保证调度切换的原子性。另外前面提到的获取锁的小过程不响应中断也是个启示。

  • 双检锁的简单优化
    代码注释强调了关键的逻辑和注意点。重点对比findById与findByIdSync两个方法。

import java.lang.reflect.Field;import java.util.HashMap;import java.util.Map;import java.util.concurrent.LinkedTransferQueue;import java.util.concurrent.locks.LockSupport;import sun.misc.Unsafe;//只是示意代码没测试慎用public class Dcl { 
    private static final Unsafe unsafe = getUnsafeInstance();    private static final long readableOffset;    // 1 can read , 0 can not read
    private volatile int readable=1;    private Map<String,Object> map=new HashMap<>();    /**
     * 可以用数组 cas索引(类似ringbuffer)保存,不过扩容有点麻烦,不过通常线程数不会太大。。。
     * 这里用带头尾指针的单链表cas尾插法存储等待线程显然比较合适
     * 现在为突出双检锁,用了AQS显然违反自举原则
     */
    private LinkedTransferQueue<Thread> list = new LinkedTransferQueue<>();    private boolean compareAndSwapReadable(int expect, int update) {        return unsafe.compareAndSwapInt(this, readableOffset, expect, update);
    }    
    private Object getCache(String id) {        return map.get(id);//TODO
    }    private Object loadFromDb(String id) {        return null; //TODO 
    }    private void setCache(String id, Object value) {        //copy on write 这样可以是线程非安全的map  TODO
    }    //volatile + unsafe cas + park/unpark 版本
    public Object findById(String id) throws InterruptedException {
        Object data;        for (;;) {
            data = getCache(id);            if (data != null) {                return data;
            }            if (readable==1 && compareAndSwapReadable(1, 0)) { //volatile读减少cas
                data = getCache(id); 
                if (data == null) {
                    data = loadFromDb(id);
                    setCache(id, data);
                }
                readable = 1;                for (Thread th = list.peek(); th != null; th = list.peek()) {
                    LockSupport.unpark(th);//即便unpark早于park也不会有问题因为有记忆功能
                }                return data;
            } else {//cas失败就不用像双检锁那样再次过独木桥
                list.add(Thread.currentThread());                if (readable == 0) {
                    LockSupport.park();
                }
                list.remove(Thread.currentThread());
            }
        }
    }    // synchronized 双检锁版本
    public Object findByIdSync(String id) {
        Object data;
        data = getCache(id);        if (data != null) {            return data;
        }        synchronized (this) { // 防止并发,不过loadFromDb期间的线程都会阻塞在这里后面依次拿到锁通过
            data = getCache(id);            if (data != null) {// 这个条件可以取反优化流程,只是为了和下面的双检测对比
                return data;
            }
            data = loadFromDb(id);
            setCache(id, data);            return data;
        }
    }    
    private static Unsafe getUnsafeInstance(){        try {
            Field theUnsafeInstance;
            theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafeInstance.setAccessible(true);            return (Unsafe) theUnsafeInstance.get(Unsafe.class);
        } catch (Exception e) {            throw new RuntimeException(e);
        }       
    }    
    static {        try {
            Field f = Dcl.class.getDeclaredField("readable");
            f.setAccessible(true);
            readableOffset = unsafe.objectFieldOffset(f);
        } catch (Exception ex) {            throw new Error(ex);
        }
    }

}

二、从缓存一致性协议和内存屏障看 volatile atomic

  • 1)背景:cpu要提高性能执行,派发调度打乱了原有指令的顺序。同时为提高读写性能多了多级cache,在cache和执行引擎中间还有对外不可见的读写缓冲区。另一个是缓存类似硬件的hashmap,其中地址为key数据为value还有额外的有效性表示。为了有均衡的性能缓存采用通常是全相连与组相连结合的方式。而且缓存为了调高有效载荷和充分发挥程序的局部性采用是较大单元存储的,通常是64B。(另外本文整体都是以x86体系考虑)


    skylake_server架构图

注意几个关键(load buffer, store buffer, WC buffer)

注意外部传来的失效队列



作者:七赤九紫星
链接:https://www.jianshu.com/p/7f7e0cb14d46


0人推荐
随时随地看视频
慕课网APP