查看: 1693|回复: 0

[Java语言] 详细分析Java并发集合ArrayBlockingQueue的用法

发表于 2018-5-2 08:00:01

在上一章中,我们介绍了阻塞队列BlockingQueue,下面我们介绍它的常用实现类ArrayBlockingQueue。

一. 用数组来实现队列

因为队列这种数据结构的特殊要求,所以它天然适合用链表的方式来实现,用两个变量分别记录链表头和链表尾,当删除或插入队列时,只要改变链表头或链表尾就可以了,而且链表使用引用的方式链接的,所以它的容量几乎是无限的。
那么怎么使用数组来实现队列,我们需要四个变量:Object[] array来存储队列中元素,headIndex和tailIndex分别记录队列头和队列尾,count记录队列的个数。

  1. 因为数组的长度是固定,所以当count==array.length时,表示队列已经满了,当count==0的时候,表示队列是空的。
  2. 当添加元素的时候,将array[tailIndex] = e将tailIndex位置设置成新元素,之后将tailIndex++自增,然后将count++自增。但是有两点需要注意,在添加之前必须先判断队列是否已满,不然会出现覆盖已有元素。当tailIndex的值等于数组最后一个位置的时候,需要将tailIndex=0,循环利用数组
  3. 当删除元素的时候,将先记录下array[headIndex] 元素,之后将headIndex++自增,然后将count--自减。但是有两点需要注意要注意,在删除之前,必须先判断队列是否为空,不然可能会删除已删除的元素。

这里用了一个很巧妙的方式,我们知道当向队列中插入一个元素,那么就占用了数组的一个位置,当删除一个元素的时候,那么其实数组的这个位置就空闲出来了,表示这个位置又可以插入新元素了。

所以我们插入新元素前,必须检查队列是否已满,删除元素之前,必须检查队列是否为空。

二. ArrayBlockingQueue中重要成员变量

  1. /** 储存队列的中元素 */
  2. final Object[] items;
  3. /** 队列头的位置 */
  4. int takeIndex;
  5. /** 队列尾的位置 */
  6. int putIndex;
  7. /** 当前队列拥有的元素个数 */
  8. int count;
  9. /** 用来保证多线程操作共享变量的安全问题 */
  10. final ReentrantLock lock;
  11. /** 当队列为空时,就会调用notEmpty的wait方法,让当前线程等待 */
  12. private final Condition notEmpty;
  13. /** 当队列为满时,就会调用notFull的wait方法,让当前线程等待 */
  14. private final Condition notFull;
复制代码

就是多了lock、notEmpty、notFull变量来实现多线程安全和线程等待条件的,它们三个是怎么操作的呢?

2.1 lock的作用

因为ArrayBlockingQueue是在多线程下操作的,所以修改items、takeIndex、putIndex和count这些成员变量时,必须要考虑多线程安全问题,所以这里使用lock独占锁,来保证并发操作的安全。

2.2 notEmpty与notFull的作用

因为阻塞队列必须实现,当队列为空或队列已满的时候,队列的读取或插入操作要等待。所以我们想到了并发框架下的Condition对象,使用它来控制。

在AQS中,我们介绍了这个类的作用:

  1. await系列方法,会释放当前锁,并让当前线程等待。
  2. signal与signalAll方法,会唤醒当前线程。其实它并不是唤醒当前线程,而是将在这个Condition条件上等待的线程,添加到lock锁上的等待线程池中,所以当锁被释放时,会唤醒lock锁上的等待线程池中一个线程。具体在AQS中有源码分析。

三. 添加元素方法

3.1 add(E e)与offer(E e)方法:

  1. // 调用AbstractQueue父类中的方法。
  2. public boolean add(E e) {
  3. // 通过调用offer来时实现
  4. if (offer(e))
  5. return true;
  6. else
  7. throw new IllegalStateException("Queue full");
  8. }
  9. //向队列末尾新添加元素。返回true表示添加成功,false表示添加失败,不会抛出异常
  10. public boolean offer(E e) {
  11. checkNotNull(e);
  12. final ReentrantLock lock = this.lock;
  13. // 使用lock来保证,多线程修改成员属性的安全
  14. lock.lock();
  15. try {
  16. // 队列已满,添加元素失败,返回false。
  17. if (count == items.length)
  18. return false;
  19. else {
  20. // 调用enqueue方法将元素插入队列中
  21. enqueue(e);
  22. return true;
  23. }
  24. } finally {
  25. lock.unlock();
  26. }
  27. }
复制代码

add方法是调用offer方法实现的。在offer方法中,必须先判断队列是否已满,如果已满就直接返回false,而不会阻塞当前线程。如果不满就调用enqueue方法将元素插入队列中。

注意:这里使用lock.lock()是保证同一时间只有一个线程修改成员变量,防止出现并发操作问题。虽然它也会阻塞当前线程,但是它并不是条件等待,只是因为锁被其他线程持有,而ArrayBlockingQueue中方法操作时间都不长,这里相当于不阻塞线程。

3.2 put方法

  1. // 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
  2. public void put(E e) throws InterruptedException {
  3. checkNotNull(e);
  4. final ReentrantLock lock = this.lock;
  5. // 使用lock来保证,多线程修改成员属性的安全
  6. lock.lockInterruptibly();
  7. try {
  8. // 队列已满,则调用notFull.await()方法,让当前线程等待,直到队列不是满的
  9. while (count == items.length)
  10. notFull.await();
  11. // 调用enqueue方法将元素插入队列中
  12. enqueue(e);
  13. } finally {
  14. lock.unlock();
  15. }
  16. }
复制代码

与offer方法大体流程一样,只是当队列已满的时候,会调用notFull.await()方法,让当前线程阻塞等待,直到队列被别的线程移除了元素,队列不满的时候,会唤醒这个等待线程。

3.3 offer(E e, long timeout, TimeUnit unit)方法

  1. /**
  2. * 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
  3. * 如果等待时间超过timeout了,那么返回false,表示添加失败
  4. */
  5. public boolean offer(E e, long timeout, TimeUnit unit)
  6. throws InterruptedException {
  7. checkNotNull(e);
  8. // 计算一共最多等待的时间值nanos
  9. long nanos = unit.toNanos(timeout);
  10. final ReentrantLock lock = this.lock;
  11. // 使用lock来保证,多线程修改成员属性的安全
  12. lock.lockInterruptibly();
  13. try {
  14. // 队列已满
  15. while (count == items.length) {
  16. // nanos <= 0表示最大等待时间已到,那么不用再等待了,返回false,表示添加失败。
  17. if (nanos <= 0)
  18. return false;
  19. // 调用notFull.awaitNanos(nanos)方法,超时nanos时间会被自动唤醒,
  20. // 如果被提前唤醒,那么返回剩余的时间
  21. nanos = notFull.awaitNanos(nanos);
  22. }
  23. // 调用enqueue方法将元素插入队列中
  24. enqueue(e);
  25. return true;
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
复制代码

与put的方法大体流程一样,只不过是调用notFull.awaitNanos(nanos)方法,让当前线程等待,并设置等待时间。

四. 删除元素方法

4.1 remove()和poll()方法:

  1. // 调用AbstractQueue父类中的方法。
  2. public E remove() {
  3. // 通过调用poll来时实现
  4. E x = poll();
  5. if (x != null)
  6. return x;
  7. else
  8. throw new NoSuchElementException();
  9. }
  10. // 删除队列第一个元素(即队列头),并返回它。如果队列是空的,它不会抛出异常,而是会返回null。
  11. public E poll() {
  12. final ReentrantLock lock = this.lock;
  13. // 使用lock来保证,多线程修改成员属性的安全
  14. lock.lock();
  15. try {
  16. // 如果count == 0,列表为空,就返回null,否则调用dequeue方法,返回列表头元素
  17. return (count == 0) ? null : dequeue();
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
复制代码

remove方法是调用poll()方法实现的。在 poll()方法中,如果列表为空,就返回null,否则调用dequeue方法,返回列表头元素。

4.2 take()方法

  1. /**
  2. * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
  3. */
  4. public E take() throws InterruptedException {
  5. final ReentrantLock lock = this.lock;
  6. // 使用lock来保证,多线程修改成员属性的安全
  7. lock.lockInterruptibly();
  8. try {
  9. // 如果队列为空,就调用notEmpty.await()方法,让当前线程等待。
  10. // 直到有别的线程向队列中插入元素,那么这个线程会被唤醒。
  11. while (count == 0)
  12. notEmpty.await();
  13. // 调用dequeue方法,返回列表头元素
  14. return dequeue();
  15. } finally {
  16. lock.unlock();
  17. }
  18. }
复制代码

take()方法当队列为空的时候,当前线程必须等待,直到有别的线程向队列中插入新元素,那么这个线程会被唤醒。

4.3 poll(long timeout, TimeUnit unit)方法

  1. /**
  2. * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
  3. * 如果等待时间超过timeout了,那么返回false,表示获取元素失败
  4. */
  5. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  6. // 计算一共最多等待的时间值nanos
  7. long nanos = unit.toNanos(timeout);
  8. final ReentrantLock lock = this.lock;
  9. // 使用lock来保证,多线程修改成员属性的安全
  10. lock.lockInterruptibly();
  11. try {
  12. // 队列为空
  13. while (count == 0) {
  14. // nanos <= 0表示最大等待时间已到,那么不用再等待了,返回null。
  15. if (nanos <= 0)
  16. return null;
  17. // 调用notEmpty.awaitNanos(nanos)方法让档期线程等待,并设置超时时间。
  18. nanos = notEmpty.awaitNanos(nanos);
  19. }
  20. // 调用dequeue方法,返回列表头元素
  21. return dequeue();
  22. } finally {
  23. lock.unlock();
  24. }
  25. }
复制代码

与take()方法流程一样,只不过调用awaitNanos(nanos)方法,让当前线程等待,并设置等待时间。

五. 查看元素的方法

5.1 element()与peek() 方法

  1. // 调用AbstractQueue父类中的方法。
  2. public E element() {
  3. E x = peek();
  4. if (x != null)
  5. return x;
  6. else
  7. throw new NoSuchElementException();
  8. }
  9. // 查看队列头元素
  10. public E peek() {
  11. final ReentrantLock lock = this.lock;
  12. // 使用lock来保证,多线程修改成员属性的安全
  13. lock.lock();
  14. try {
  15. // 返回当前队列头的元素
  16. return itemAt(takeIndex); // null when queue is empty
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
复制代码

六. 其他重要方法

6.1 enqueue和dequeue方法

  1. // 将元素x插入队列尾
  2. private void enqueue(E x) {
  3. // assert lock.getHoldCount() == 1;
  4. // assert items[putIndex] == null; //当前putIndex位置元素一定是null
  5. final Object[] items = this.items;
  6. items[putIndex] = x;
  7. // 如果putIndex等于items.length,那么将putIndex重新设置为0
  8. if (++putIndex == items.length)
  9. putIndex = 0;
  10. // 队列数量加一
  11. count++;
  12. // 因为插入一个元素,那么当前队列肯定不为空,那么唤醒在notEmpty条件下等待的一个线程
  13. notEmpty.signal();
  14. }
  15. // 删除队列头的元素,返回它
  16. private E dequeue() {
  17. // assert lock.getHoldCount() == 1;
  18. // assert items[takeIndex] != null;
  19. final Object[] items = this.items;
  20. // 得到当前队列头的元素
  21. @SuppressWarnings("unchecked")
  22. E x = (E) items[takeIndex];
  23. // 将当前队列头位置设置为null
  24. items[takeIndex] = null;
  25. if (++takeIndex == items.length)
  26. takeIndex = 0;
  27. // 队列数量减一
  28. count--;
  29. if (itrs != null)
  30. itrs.elementDequeued();
  31. // 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notFull条件下等待的一个线程
  32. notFull.signal();
  33. return x;
  34. }
复制代码

这两个方法分别代表,向队列中插入元素和从队列中删除元素。而且它们要唤醒等待的线程。当插入元素后,那么队列一定不为空,那么就要唤醒在notEmpty条件下等待的线程。当删除元素后,那么队列一定不满,那么就要唤醒在notFull条件下等待的线程。

6.2 remove(Object o)方法

  1. // 删除队列中对象o元素,最多删除一条
  2. public boolean remove(Object o) {
  3. if (o == null) return false;
  4. final Object[] items = this.items;
  5. // 使用lock来保证,多线程修改成员属性的安全
  6. final ReentrantLock lock = this.lock;
  7. lock.lock();
  8. try {
  9. // 当队列中有值的时候,才进行删除。
  10. if (count > 0) {
  11. // 队列尾下一个位置
  12. final int putIndex = this.putIndex;
  13. // 队列头的位置
  14. int i = takeIndex;
  15. do {
  16. // 当前位置元素与被删除元素相同
  17. if (o.equals(items[i])) {
  18. // 删除i位置元素
  19. removeAt(i);
  20. // 返回true
  21. return true;
  22. }
  23. if (++i == items.length)
  24. i = 0;
  25. // 当i==putIndex表示遍历完所有元素
  26. } while (i != putIndex);
  27. }
  28. return false;
  29. } finally {
  30. lock.unlock();
  31. }
  32. }
复制代码

从队列中删除指定对象o,那么就要遍历队列,删除第一个与对象o相同的元素,如果队列中没有对象o元素,那么返回false删除失败。

这里有两点需要注意:

如何遍历队列,就是从队列头遍历到队列尾。就要靠takeIndex和putIndex两个变量了。

为什么Object[] items = this.items;这句代码没有放到同步锁lock代码块内。items是成员变量,那么多线程操作的时候,不会有并发问题么?

这个是因为items是个引用变量,不是基本数据类型,而且我们对队列的插入和删除操作,都是针对这一个items数组,没有改变数组的引用,所以在lock代码中,items会得到其他线程对它最新的修改。但是如果这里将int putIndex = this.putIndex;方法lock代码块外面,就会产生问题。

removeAt(final int removeIndex)方法

  1. // 删除队列removeIndex位置的元素
  2. void removeAt(final int removeIndex) {
  3. // assert lock.getHoldCount() == 1;
  4. // assert items[removeIndex] != null;
  5. // assert removeIndex >= 0 && removeIndex < items.length;
  6. final Object[] items = this.items;
  7. // 表示删除元素是列表头,就容易多了,与dequeue方法流程差不多
  8. if (removeIndex == takeIndex) {
  9. // 移除removeIndex位置元素
  10. items[takeIndex] = null;
  11. // 到了数组末尾,就要转到数组头位置
  12. if (++takeIndex == items.length)
  13. takeIndex = 0;
  14. // 队列数量减一
  15. count--;
  16. if (itrs != null)
  17. itrs.elementDequeued();
  18. } else {
  19. // an "interior" remove
  20. final int putIndex = this.putIndex;
  21. for (int i = removeIndex;;) {
  22. int next = i + 1;
  23. if (next == items.length)
  24. next = 0;
  25. // 还没有到队列尾,那么就将后一个位置元素覆盖前一个位置的元素
  26. if (next != putIndex) {
  27. items[i] = items[next];
  28. i = next;
  29. } else {
  30. // 将队列尾元素置位null
  31. items[i] = null;
  32. // 重新设置putIndex的值
  33. this.putIndex = i;
  34. break;
  35. }
  36. }
  37. // 队列数量减一
  38. count--;
  39. if (itrs != null)
  40. itrs.removedAt(removeIndex);
  41. }
  42. // 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notFull条件下等待的一个线程
  43. notFull.signal();
  44. }
复制代码

在队列中删除指定位置的元素。需要注意的是删除之后的数组还能保持队列形式,分为两种情况:

  1. 如果删除位置是队列头,那么简单,只需要将队列头的位置元素设置为null,将将队列头位置加一。
  2. 如果删除位置不是队列头,那么麻烦了,这个时候,我们就要将从removeIndex位置后的元素全部左移一位,覆盖前一个元素。最后将原来队列尾的元素置位null。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持程序员之家。



回复

使用道具 举报