并发编程之Queue
发布日期:2021-06-28 21:08:32 浏览次数:2 分类:技术文章

本文共 8001 字,大约阅读时间需要 26 分钟。

1、在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪一种都继承自Queue。

 2、ConcurrentLinkedQueue

ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许null元素。

ConcurrentLinkedQueue重要方法:

        add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别)

       poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。

示例如下:

package com.bfbc.bfbc.coll013;import java.util.concurrent.ConcurrentLinkedQueue;public class UseQueue {   public static void main(String[] args) throws Exception {            //高性能无阻塞无界队列:ConcurrentLinkedQueue      ConcurrentLinkedQueue
q = new ConcurrentLinkedQueue
(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.add("e"); System.out.println(q.poll()); //a 从头部取出元素,并从队列里删除 System.out.println(q.size()); //4 System.out.println(q.peek()); //b System.out.println(q.size()); //4 }}

3、BlockingQueue接口

ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出货主先进后出,也叫有界队列,在很多场合非常适合使用。

示例如下:

package com.bfbc.coll013;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.TimeUnit;public class UseQueue {   public static void main(String[] args) throws Exception {      ArrayBlockingQueue
array = new ArrayBlockingQueue
(5); array.put("a"); array.put("b"); array.add("c"); array.add("d"); array.add("e"); array.add("f"); System.out.println(array.offer("a", 3, TimeUnit.SECONDS)); }}

LinkedBlockingQueue:基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。它是一个无界队列。

示例如下:

package com.bfbc.base.coll013;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.concurrent.LinkedBlockingQueue;public class UseQueue {   public static void main(String[] args) throws Exception {      //阻塞队列      LinkedBlockingQueue
q = new LinkedBlockingQueue
(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.offer("e"); q.add("f"); System.out.println(q.size()); for (Iterator iterator = q.iterator(); iterator.hasNext();) { String string = (String) iterator.next(); System.out.println(string); } List
list = new ArrayList
(); //批量从q中取3个元素放入list System.out.println(q.drainTo(list, 3)); System.out.println(list.size()); for (String string : list) { System.out.println(string); } }}

SynchronousQueue:一种没有缓冲的队列,生产者生产的数据直接会被消费者获取并消费。

PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。

示例如下:

package com.bfbc.base.coll013;public class Task implements Comparable
{ private int id ; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Task task) { return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0); } public String toString(){ return this.id + "," + this.name; } }
package com.bfbc.base.coll013;import java.util.concurrent.PriorityBlockingQueue;public class UsePriorityBlockingQueue {      public static void main(String[] args) throws Exception{                  PriorityBlockingQueue
q = new PriorityBlockingQueue
(); Task t1 = new Task(); t1.setId(3); t1.setName("id为3"); Task t2 = new Task(); t2.setId(4); t2.setName("id为4"); Task t3 = new Task(); t3.setId(1); t3.setName("id为1"); //return this.id > task.id ? 1 : 0; q.add(t1); //3 q.add(t2); //4 q.add(t3); //1 for(Task t: q) { System.out.println("容器:" + q); System.out.println(q.take().getId()); } // 1 3 4// System.out.println("容器:" + q);// System.out.println(q.take().getId());// System.out.println("容器:" + q);// System.out.println(q.take().getId());// System.out.println(q.take().getId()); }}

DelayQueue:带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。

示例如下:

package com.bfbc.base.coll013;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class Wangmin implements Delayed {          private String name;      //身份证      private String id;      //截止时间      private long endTime;      //定义时间工具类    private TimeUnit timeUnit = TimeUnit.SECONDS;          public Wangmin(String name,String id,long endTime){          this.name=name;          this.id=id;          this.endTime = endTime;      }            public String getName(){          return this.name;      }            public String getId(){          return this.id;      }            /**      * 用来判断是否到了截止时间      */      @Override      public long getDelay(TimeUnit unit) {         //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);       return endTime - System.currentTimeMillis();    }        /**      * 相互批较排序用      */      @Override      public int compareTo(Delayed delayed) {         Wangmin w = (Wangmin)delayed;          return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;      }       }

 

 

package com.bfbc.base.coll013;import java.util.concurrent.DelayQueue;public class WangBa implements Runnable {          private DelayQueue
queue = new DelayQueue
(); public boolean yinye =true; public void shangji(String name,String id,int money){ Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis()); System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机..."); this.queue.add(man); } public void xiaji(Wangmin man){ System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机..."); } @Override public void run() { while(yinye){ try { Wangmin man = queue.take(); xiaji(man); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String args[]){ try{ System.out.println("网吧开始营业"); WangBa siyu = new WangBa(); Thread shangwang = new Thread(siyu); shangwang.start(); siyu.shangji("路人甲", "123", 1); siyu.shangji("路人乙", "234", 10); siyu.shangji("路人丙", "345", 5); } catch(Exception e){ e.printStackTrace(); } } }

4、BlockingQueue接口的重要方法

放入数据:

offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。(本方法不阻塞当前执行方法的线程)

offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞知道BlockingQueue里面有空间再继续。

获取数据:

poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。

take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法可以提升获取数据效率;不需要多次分批加锁或释放锁。

5、Deque双端队列

Deque不允许在队列的头部或尾部进行出队和入队操作

LinkedBlockingQueue是一个线程安全的双端队列实现,可以说他是最为复杂的一种队列,在内部实现维护了前端和后端节点,但是其没有实现读写分离,因此同一时间只能有一个线程对其进行操作。在高并发中性能要远低于其他BlockingQueue。更要低于ConcurrentLinkedQueue,在jdk早期有一个非线程安全的Deque就是ArrayDeque了,java6里添加了LinkedBlockingQueue来弥补多线程场景下线程安全的问题。

转载地址:https://blog.csdn.net/yangweitusgsg/article/details/105459238 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:并发编程之一
下一篇:并发编程之三

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月29日 02时40分34秒