012-Java常用队列

tech2025-08-22  3

概念

队列是一种先进先出(FIFO)的抽象数据结构。在Java中,队列使用了两种数据类型来实现,分别是数组和链表这两种数据结构。所有的队列都实现了Queue接口。

分类及特性

队列分为阻塞和非阻塞队列两种。

如上图(IDEA生成),阻塞队列实现了BlockingQueue接口,包含LinkedBlockingDeque,LinkedBlockingQueue,LinkedTransferQueue,ArrayBlockingQueue,PriorityBlockingQueue,DelayQueue和SynchronousQueue。

非阻塞队列包含LinkedList,PriorityQueue和ConcurrentLinkedQueue。

常用队列比较

队列底层数据结构锁阻塞机制其他ArrayBlockingQueue数组读写都采用一把ReentrantLock 锁取数据基于notEmpty的Condition,加数据基于notFull的Condition维护两个序列来达到FIFO,取数据的takeIndex,加数据的putIndex;LinkedBlockingQueue链表读写锁分离分别基于读写锁的Condition进行阻塞或者唤醒不指定容量时队列大小默认为Integer.MAX_VALUE。读写锁分离,在高并发下生产者和消费者能同时操作队列,提高并发性能PriorityBlockingQueue数组读锁基于读锁的notEmpty的Condition带有优先级的队列,初始容量为11,会自动扩容,所以一般把它当做无界的队列,put不需要锁。其队列元素必须实现Comparator接口,以便其基于完全二叉树的最小堆和最大堆排序DelayQueue数组读写采用一把ReentrantLock锁take取数据时为空阻塞,无边界所以加入数据不会阻塞带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等ConcurrentLinkedQueue链表CAS无锁无不需要在每个操作时使用锁,所以扩展性表现要更加优异,在常见的多线程访问场景,一般可以提供较高吞吐量

队列的常用操作(实现Queue接口)

操作异常版返回值版插入元素add(e) 满异常IllegalStateExceptionoffer(e) 满返回false删除元素remove() 空异常NoSuchElementExceptionpoll() 空返回null获取队首元素element() 空异常NoSuchElementExceptionpeek() 空返回null

阻塞队列额外操作(实现BlockingQueue接口, BlockingQueue继承了Queue接口,所以实现了BlockingQueue接口的类以上操作都支持)

操作阻塞版带超时的阻塞版插入元素put(e) 满则阻塞等待offer(e, time, unit) 满则等待至超时时间后退出阻塞删除元素take() 空则阻塞等待poll(time, unit) 空则等待至超时时间后退出阻塞

实例

LinkedBlockingQueue

import java.util.concurrent.*; public class QueueTest { private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(20); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(10); service.submit(new Consumer("consumer1")); service.submit(new Consumer("consumer2")); service.submit(new Consumer("consumer3")); service.submit(new Consumer("consumer4")); service.submit(new Consumer("consumer5")); service.submit(new Producer("producer1")); service.submit(new Producer("producer2")); service.submit(new Producer("producer3")); service.shutdown(); } static class Producer implements Runnable { String name; public Producer(String name) { this.name = name; } public void run() { for (int i = 1; i <= 5; i++) { try { System.out.println(name + " producing: " + i); if (!linkedBlockingQueue.offer(i + "")) { System.out.println("仓库满了"); } //当队列满时阻塞 //linkedBlockingQueue.put(i+""); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer implements Runnable { String name; public Consumer(String name) { this.name = name; } public void run() { for (int i = 0; i < 5; i++) { try { //当队列空时阻塞 //System.out.println(name + " consuming: " + linkedBlockingQueue.take()); String element = linkedBlockingQueue.peek(); if (element == null) { System.out.println("没有货了"); } Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } }

PriorityBlockingQueue

import java.util.concurrent.PriorityBlockingQueue; public class PriorityBlockingQueueTest { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue queue = new PriorityBlockingQueue(); //PriorityBlockingQueue的元素必须实现Comparable接口 queue.add(new Task(1, "1号任务")); queue.add(new Task(4, "4号任务")); queue.add(new Task(3, "3号任务")); queue.add(new Task(2, "2号任务")); queue.add(new Task(1, "1号任务")); queue.add(new Task(5, "5号任务")); //[1号任务, 1号任务, 3号任务, 4号任务, 2号任务, 5号任务] System.out.println(queue); //1号任务 System.out.println(queue.take()); //[1号任务, 2号任务, 3号任务, 4号任务, 5号任务] System.out.println(queue); //1号任务 System.out.println(queue.take()); //[2号任务, 4号任务, 3号任务, 5号任务] System.out.println(queue); //2号任务 System.out.println(queue.take()); //[3号任务, 4号任务, 5号任务] System.out.println(queue); //3号任务 System.out.println(queue.take()); //[4号任务, 5号任务] System.out.println(queue); //4号任务 System.out.println(queue.take()); //[5号任务] System.out.println(queue); //5号任务 System.out.println(queue.take()); //[] System.out.println(queue); //队列为空阻塞 System.out.println(queue.take()); } static class Task implements Comparable<Task> { private int num; private String name; public Task(int num, String name) { this.num = num; this.name = name; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } @Override public int compareTo(Task o) { if(o== null) { return 1; } return this.num-o.getNum(); } public String toString(){ return name; } } }

DelayQueue

import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayQueueTest { public static void main(String[] args) throws InterruptedException { //超时任务列表 DelayQueue queue = new DelayQueue(); long start = System.currentTimeMillis(); for (int i = 1; i <= 10; i++) { String name = "task"+i; int spend = new Random().nextInt(10); queue.add(new Task(name, start, spend)); } //任务剩余时间少的任务将出列 System.out.println(queue); System.out.println(queue.take()); System.out.println(queue); System.out.println(queue.take()); System.out.println(queue); System.out.println(queue.take()); } //超时任务 static class Task implements Delayed { //任务名称 private String name; //任务开始时间 private long start; //任务完成需要花费的时间(秒数) private int spend; //计算delay的时间单位 private TimeUnit timeUnit = TimeUnit.MILLISECONDS; public Task(String name, long start, int spend) { this.spend = spend; this.name = name; this.start = start; } @Override public long getDelay(TimeUnit unit) { //计算任务剩余时间 return this.spend * 1000 + this.start - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { if(o == null){ return 1; } return (int)(this.getDelay(timeUnit) - o.getDelay(timeUnit)); } @Override public String toString(){ return this.name + ":" + this.spend; } } }

总结

在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)

在实际使用过程中对于LinkedBlockingQueue类的队列默认大小为Integer.MAX_VALUE,最好根据实际情况指定一个大小防止意外内存溢出的情况。

DelayQueue和PriorityBlockingQueue都用到了PriorityQueue具有优先级队列的特性,加入队列的元素必须实现Comparable接口,当元素超时才会出列,只保证最先出列的元素排在队首,其他的顺序不保证。

Java知识点总结系列目录

最新回复(0)