本文共 4910 字,大约阅读时间需要 16 分钟。
package cn.edu.sdk;import sun.misc.Unsafe;import java.lang.reflect.Field;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.locks.LockSupport;public class MyAQS { /* * 当前加锁状态 记录加锁的次数 (没有实现重入 AQS 没有实现重入 而是在ReentrankLock中实现重入) * */ private volatile int state = 0; /* * 记录当前加锁的线程 这两个属性 和 synchronized monitor对象 中的是一样的 owner 和 recursion * */ private Thread threadHolder; /* * 使用一个队列进行加锁失败的线程的保存 * */ private ConcurrentLinkedQueuewaiters = new ConcurrentLinkedQueue<>(); /* * 使用unsafe类 进行cas操作 * */ private static Unsafe unsafe; static { try { unsafe = MyAQS.getUnsafeInstance(); } catch (Exception e) { e.printStackTrace(); } } public static Unsafe getUnsafeInstance() throws Exception { // 通过反射获取rt.jar下的Unsafe类 Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafeInstance.setAccessible(true); // return (Unsafe) theUnsafeInstance.get(null);是等价的 return (Unsafe) theUnsafeInstance.get(Unsafe.class); } public int getState() { return state; } public void setState(int state) { state = state; } public Thread getThreadHolder() { return threadHolder; } public void setThreadHolder(Thread threadHolder) { this.threadHolder = threadHolder; } public boolean acquire() throws Exception { // 使用cas判断是否是能进行加锁 int c = getState(); if (c == 0){ long offset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state")); // 这里使用的是公平锁的思想 没有排队的才能进行判断是否是能加锁 // 要不就是排序对队列中没有 等待的线程(第一次加锁) // 要不就是 当前线程是 队列中的第一个 // 没有这个条件就是 导致只有一个线程才能唤醒 Thread thread = Thread.currentThread(); if ( (waiters.size() == 0 || thread == waiters.peek())&& unsafe.compareAndSwapInt(state, offset, 0 , 1)){ // 进行持有锁线程的设置 threadHolder = thread; return true; } } return false; } // 加锁方法 public void lock() throws Exception { if (acquire()){ // 获取到了锁 return; } // 没有加锁成功的话 Thread c = Thread.currentThread(); waiters.add(c); for(;;){ // 只有加上锁才能跳出循环 if ((c == waiters.peek() ) && acquire()){ // 加锁 就是从队列中出队 waiters.poll(); break; } // 为了减少cpu的消耗 将没有加锁的线程放到 阻塞线程 等待队列中// Thread.yield(); 不一定成功 还有还有优先级的问题// TimeUnit.SECONDS.sleep(1); 加锁操作的事件不确定 也不好 浪费cpu // 也是有问题的 阻塞之后需要重新启动 LockSupport.park(); // 进行排队 同时需要这个队列是线程安全的 防止一个线程永远的阻塞 } } public void unLock() throws Exception { // 判断是否是持有锁的线程进行解锁 if (threadHolder != Thread.currentThread()){ throw new RuntimeException(" 不是持有锁的线程不能枷锁 "); } // 进行解锁 唤醒排队的线程 进行加锁 // 使用cas 进行解锁 long offset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state")); boolean isUnLock = unsafe.compareAndSwapInt(state, offset, 1, 0); if (isUnLock){ // 通知排队的线程加锁 setThreadHolder(null); //获取队列头的线程 进行加锁 Thread first = waiters.peek(); if (first != null){ // 进行唤醒 唤醒的是队列中的头部的thread LockSupport.unpark(first); } } }}
package cn.edu.sdk;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class TestThread { public static void main(String[] args) { Order order = new Order(); AtomicInteger integer = new AtomicInteger(0); for (int i = 0; i < 50; i++) { new Thread(() ->{ for (int j = 0; j < 20; j++) { order.descOrder(); integer.getAndIncrement(); } }, String.valueOf(i)).start(); } // 两个线程 一个就是 主线程 另一个就是 GC线程 保证前面的五十个线程执行完 while (Thread.activeCount() > 2){ Thread.yield(); } //可以看出一共是操作1000次 实现加锁操作: System.out.println(integer); }}class Order{ private int orders = 10; // 线程操作资源类 private MyAQS myAQS = new MyAQS(); public void descOrder() { try { myAQS.lock(); if (orders > 0){ // 模拟订单操作的时间200毫秒 TimeUnit.MILLISECONDS.sleep(200); System.out.println("线程" + Thread.currentThread().getName() + "---->获取订单" + orders --); } } catch (Exception e) { e.printStackTrace(); }finally { try { myAQS.unLock(); } catch (Exception e) { e.printStackTrace(); } } }}
转载地址:http://gfxrn.baihongyu.com/