CP-12-synchronized通知等待机制

模拟实现一个BlockQueue,实现方法:

  1. put: 队列为满时,阻塞线程等待,直到队列不满时才能添加新元素
  2. take: 队列为空时,阻塞线程等待,直到队列不空时才能获取元素
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    public class MyBlockQueue {

    //1 需要一个承装元素的集合
    private LinkedList<Object> list = new LinkedList<Object>();

    //2 需要一个计数器
    private AtomicInteger count = new AtomicInteger(0);

    //3 需要制定上限和下限
    private final int minSize = 0;

    private final int maxSize ;

    //4 构造方法
    public MyBlockQueue(int size){
    this.maxSize = size;
    }

    //5 初始化一个对象 用于加锁
    private final Object lock = new Object();

    //put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
    public void put(Object obj){
    synchronized (lock) {
    while(count.get() == this.maxSize){
    try {
    lock.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    //1 加入元素
    list.add(obj);
    //2 计数器累加
    count.incrementAndGet();
    //3 通知另外一个线程(唤醒)
    lock.notify(); //释放锁
    System.out.println("新加入的元素为:" + obj);
    }
    }


    //take: 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.
    public Object take(){
    Object ret = null;
    synchronized (lock) {
    while(count.get() == this.minSize){
    try {
    lock.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    //1 做移除元素操作
    ret = list.removeFirst();
    //2 计数器递减
    count.decrementAndGet();
    //3 唤醒另外一个线程
    lock.notify();
    }
    return ret;
    }

    public int getSize(){
    return this.count.get();
    }


    public static void main(String[] args) {
    // 1. 初始化容量为5
    final MyBlockQueue mq = new MyBlockQueue(5);
    mq.put("a");
    mq.put("b");
    mq.put("c");
    mq.put("d");
    mq.put("e");

    System.out.println("当前容器的长度:" + mq.getSize());

    // 2. 再添加元素,因为容器满,会阻塞该线程
    Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
    mq.put("f");
    mq.put("g");
    }
    },"t1");

    t1.start();

    // 3. 取出元素,容器有空余位置,在2中阻塞的线程会被通知,继续添加新元素
    Thread t2 = new Thread(new Runnable() {
    @Override
    public void run() {
    Object o1 = mq.take();
    System.out.println("移除的元素为:" + o1);
    Object o2 = mq.take();
    System.out.println("移除的元素为:" + o2);
    }
    },"t2");


    try {
    TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    t2.start();
    }
    }