// 公平锁获取protectedfinalbooleantryAcquire(int acquires) {finalThread current =Thread.currentThread();int c =getState();if (c ==0) {// 需要考虑队列 查询是否有任何线程等待获取的时间比当前线程长。if (!hasQueuedPredecessors()&&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);returntrue; } }elseif (current ==getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc <0)thrownewError("Maximum lock count exceeded");setState(nextc);returntrue; }returnfalse; }}publicfinalbooleanhasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;// 判断队列是否为空。return h != t && ((s =h.next) ==null||s.thread!=Thread.currentThread()); }// 非公平锁finalbooleannonfairTryAcquire(int acquires) {finalThread current =Thread.currentThread();int c =getState();if (c ==0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);returntrue; } }elseif (current ==getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc <0) // overflowthrownewError("Maximum lock count exceeded");setState(nextc);returntrue; }returnfalse; }
protectedfinalbooleantryAcquire(int acquires) {/* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */Thread current =Thread.currentThread();// 1. 获取写锁当前的同步状态int c =getState();// 2. 获取写锁获取的次数int w =exclusiveCount(c);if (c !=0) {// (Note: if c != 0 and w == 0 then shared count != 0)// 3.1 当读锁已被读线程获取或者当前线程不是已经获取写锁的线程的话// 当前线程获取写锁失败if (w ==0|| current !=getExclusiveOwnerThread())returnfalse;if (w +exclusiveCount(acquires)> MAX_COUNT)thrownewError("Maximum lock count exceeded");// Reentrant acquire// 3.2 当前线程获取写锁,支持可重复加锁setState(c + acquires);returntrue; }// 3.3 写锁未被任何线程获取,当前线程可获取写锁if (writerShouldBlock()||!compareAndSetState(c, c + acquires))returnfalse;setExclusiveOwnerThread(current);returntrue;}
protectedfinalinttryAcquireShared(int unused) {/* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */Thread current =Thread.currentThread();int c =getState();//1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前// 线程获取读锁失败返回-1if (exclusiveCount(c)!=0&&getExclusiveOwnerThread()!= current)return-1;int r =sharedCount(c);if (!readerShouldBlock()&& r < MAX_COUNT &&//2. 当前线程获取读锁compareAndSetState(c, c + SHARED_UNIT)) {//3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法//返回当前获取读锁的次数if (r ==0) { firstReader = current; firstReaderHoldCount =1; } elseif (firstReader == current) { firstReaderHoldCount++; } else {HoldCounter rh = cachedHoldCounter;if (rh ==null||rh.tid!=getThreadId(current)) cachedHoldCounter = rh =readHolds.get();elseif (rh.count==0)readHolds.set(rh);rh.count++; }return1; }//4. 处理在第二步中CAS操作失败的自旋已经实现重入性returnfullTryAcquireShared(current);}
voidprocessCachedData() {rwl.readLock().lock();if (!cacheValid) {// Must release read lock before acquiring write lockrwl.readLock().unlock();rwl.writeLock().lock();try {// Recheck state because another thread might have// acquired write lock and changed state before we did.if (!cacheValid) { data =... cacheValid =true; }// Downgrade by acquiring read lock before releasing write lockrwl.readLock().lock(); } finally {rwl.writeLock().unlock(); // Unlock write, still hold read } }try {use(data); } finally {rwl.readLock().unlock(); } }}
Condition
任何一个java对象都天然继承于Object类,在线程间实现通信的往往会应用到Object的几个方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)与notify(),notifyAll()几个方法实现等待/通知机制,同样的, 在java Lock体系下依然会有同样的方法实现等待/通知机制。从整体上来看Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。两者除了在使用方式上不同外,在功能特性上还是有很多的不同:
sun.misc.Unsafe U 在ConcurrentHashMapde的实现中可以看到大量的U.compareAndSwapXXXX的方法去修改ConcurrentHashMap的一些属性。这些方法实际上是利用了CAS算法保证了线程安全性,这是一种乐观策略,假设每一次操作都不会产生冲突,当且仅当冲突发生的时候再去尝试。而CAS操作依赖于现代处理器指令集,通过底层CMPXCHG指令实现。CAS(V,O,N)核心思想为:若当前变量实际值V与期望的旧值O相同,则表明该变量没被其他线程进行修改,因此可以安全的将新值N赋值给变量;若当前变量实际值V与期望的旧值O不相同,则表明该变量已经被其他线程做了处理,此时将新值N赋给变量操作就是不安全的,在进行重试。而在大量的同步组件和并发容器的实现中使用CAS是通过sun.misc.Unsafe类实现的,该类提供了一些可以直接操控内存和线程的底层操作,可以理解为java中的“指针”。该成员变量的获取是在静态代码块中:
***Nodesfor use in TreeBins*/staticfinalclassTreeNode<K,V> extendsNode<K,V> {TreeNode<K,V> parent; // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev; // needed to unlink next upon deletionboolean red;...... }
staticfinalclassTreeBin<K,V> extendsNode<K,V> {TreeNode<K,V> root;volatileTreeNode<K,V> first;volatileThread waiter;volatileint lockState;// values for lockStatestaticfinalint WRITER =1; // set while holding write lockstaticfinalint WAITER =2; // set when waiting for write lockstaticfinalint READER =4; // increment value for setting read lock...... }
publicbooleanoffer(E e) {checkNotNull(e);finalNode<E> newNode =newNode<E>(e);for (Node<E> t = tail, p = t;;) {Node<E> q =p.next;if (q ==null) {// p is last node cas获取pif (p.casNext(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.returntrue; }// Lost CAS race to another thread; re-read next }elseif (p == q)// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }