Java 线程之间通信
发布日期:2021-06-30 17:39:11 浏览次数:2 分类:技术文章

本文共 16879 字,大约阅读时间需要 56 分钟。

目录

 


概念

用多线程的目的:更好的利用CPU的资源。因为所有的多线程代码都可以用单线程来实现。

多线程:指的是这个程序(一个进程)运行时产生了不止一个线程。

并行:多个CPU实例或者多台机器同时执行一段处理逻辑,是真正的同时。

并发:通过CPU调度算法,让用户看上去同时执行,实际上从CPU操作层面不是真正的同时。

线程安全:经常用来描绘一段代码。指在并发的情况下,该代码经过多线程使用,线程的调度顺序不影响任何结果。这个时候使用多线程,我们只需要关注系统的内存,CPU是不是够用即可。

线程不安全:线程的调度顺序会影响最终结果,如不加事务的转账代码。

同步:Java中的同步指的是通过人为的控制和调度,保证共享资源的多线程访问称为线程安全,来保证结果的准确。在保证结果准确的同时,提高性能,才是优秀的程序。线程安全的优先级高于性能。

 

原理

                          

        Java的并发采用的是共享变量模型,Java线程之间的通信由Java内存模型控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。线程之间的共享变量存储在主内存中(Main Memory),每一个线程都有自己的本地内存(Local Memory),本地内存中存储着读/写共享变量的副本。从上图中可以看出,如果线程A与线程B之间要通信的话,必须要经历下面两个步骤:

1、线程A把本地内存A中更新过的共享变量刷新到主内存中去。

2、线程B到内存中去读取线程A之前已更新过的共享变量。

                            

       

        如上图所示,本地内存A和本地内存B由主内存中共享变量x的副本。假设在最开始时,这3个内存中的x值都为0。线程A在执行时,把更新后的x值(假设值为1)临时存放在自己的本地内存A中。当线程A和线程B需要通信时,线程A首先会把自己本地内存中修改后的x值刷新到主内存中,此时主内存中的x值变为1.随后,线程B到主内存中去读取线程A更新后的x值,此时线程B的本地内存的x值也变为1。

       从整体上看,这就是线程A在向线程B发送消息,而且这个消息必须经过主内存。JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序员提供内存可见性保证。

实现

1. 第一种解法,包含多种小的不同实现方式,但一个共同点就是靠一个共享变量来做控制;

a. 利用最基本的synchronizednotifywait

public class MethodOne {    private final ThreadToGo threadToGo = new ThreadToGo();    public Runnable newThreadOne() {        final String[] inputArr = Helper.buildNoArr(52);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                try {                    for (int i = 0; i < arr.length; i=i+2) {                        synchronized (threadToGo) {                            while (threadToGo.value == 2)                                threadToGo.wait();                            Helper.print(arr[i], arr[i + 1]);                            threadToGo.value = 2;                            threadToGo.notify();                        }                    }                } catch (InterruptedException e) {                    System.out.println("Oops...");                }            }        };    }    public Runnable newThreadTwo() {        final String[] inputArr = Helper.buildCharArr(26);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                try {                    for (int i = 0; i < arr.length; i++) {                        synchronized (threadToGo) {                            while (threadToGo.value == 1)                                threadToGo.wait();                            Helper.print(arr[i]);                            threadToGo.value = 1;                            threadToGo.notify();                        }                    }                } catch (InterruptedException e) {                    System.out.println("Oops...");                }            }        };    }    class ThreadToGo {        int value = 1;    }    public static void main(String args[]) throws InterruptedException {        MethodOne one = new MethodOne();        Helper.instance.run(one.newThreadOne());        Helper.instance.run(one.newThreadTwo());        Helper.instance.shutdown();    }}

b. 利用LockCondition

public class MethodTwo {    private Lock lock = new ReentrantLock(true);    private Condition condition = lock.newCondition();    private final ThreadToGo threadToGo = new ThreadToGo();    public Runnable newThreadOne() {        final String[] inputArr = Helper.buildNoArr(52);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                for (int i = 0; i < arr.length; i=i+2) {                    try {                        lock.lock();                        while(threadToGo.value == 2)                            condition.await();                        Helper.print(arr[i], arr[i + 1]);                        threadToGo.value = 2;                        condition.signal();                    } catch (InterruptedException e) {                        e.printStackTrace();                    } finally {                        lock.unlock();                    }                }            }        };    }    public Runnable newThreadTwo() {        final String[] inputArr = Helper.buildCharArr(26);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                for (int i = 0; i < arr.length; i++) {                    try {                        lock.lock();                        while(threadToGo.value == 1)                            condition.await();                        Helper.print(arr[i]);                        threadToGo.value = 1;                        condition.signal();                    } catch (Exception e) {                        e.printStackTrace();                    } finally {                        lock.unlock();                    }                }            }        };    }    class ThreadToGo {        int value = 1;    }    public static void main(String args[]) throws InterruptedException {        MethodTwo two = new MethodTwo();        Helper.instance.run(two.newThreadOne());        Helper.instance.run(two.newThreadTwo());        Helper.instance.shutdown();    }}

c. 利用volatile:

volatile修饰的变量值直接存在main memory里面,子线程对该变量的读写直接写入main memory,而不是像其它变量一样在local thread里面产生一份copy。volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。 

public class MethodThree {    private volatile ThreadToGo threadToGo = new ThreadToGo();    class ThreadToGo {        int value = 1;    }    public Runnable newThreadOne() {        final String[] inputArr = Helper.buildNoArr(52);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                for (int i = 0; i < arr.length; i=i+2) {                    while(threadToGo.value==2){}                    Helper.print(arr[i], arr[i + 1]);                    threadToGo.value=2;                }            }        };    }    public Runnable newThreadTwo() {        final String[] inputArr = Helper.buildCharArr(26);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                for (int i = 0; i < arr.length; i++) {                    while(threadToGo.value==1){}                    Helper.print(arr[i]);                    threadToGo.value=1;                }            }        };    }    public static void main(String args[]) throws InterruptedException {        MethodThree three = new MethodThree();        Helper.instance.run(three.newThreadOne());        Helper.instance.run(three.newThreadTwo());        Helper.instance.shutdown();    }}

d. 利用AtomicInteger: 

public class MethodFive {    private AtomicInteger threadToGo = new AtomicInteger(1);    public Runnable newThreadOne() {        final String[] inputArr = Helper.buildNoArr(52);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                for (int i = 0; i < arr.length; i=i+2) {                    while(threadToGo.get()==2){}                    Helper.print(arr[i], arr[i + 1]);                    threadToGo.set(2);                }            }        };    }    public Runnable newThreadTwo() {        final String[] inputArr = Helper.buildCharArr(26);        return new Runnable() {            private String[] arr = inputArr;            public void run() {                for (int i = 0; i < arr.length; i++) {                    while(threadToGo.get()==1){}                    Helper.print(arr[i]);                    threadToGo.set(1);                }            }        };    }    public static void main(String args[]) throws InterruptedException {        MethodFive five = new MethodFive();        Helper.instance.run(five.newThreadOne());        Helper.instance.run(five.newThreadTwo());        Helper.instance.shutdown();    }}

2. 第二种解法,是利用CyclicBarrierAPI;

CyclicBarrier可以实现让一组线程在全部到达Barrier时(执行await()),再一起同时执行,并且所有线程释放后,还能复用它,即为Cyclic。

CyclicBarrier类提供两个构造器:

 

public CyclicBarrier(int parties, Runnable barrierAction) {}public CyclicBarrier(int parties) {}
public class MethodFour{      private final CyclicBarrier barrier;      private final List
list; public MethodFour() { list = Collections.synchronizedList(new ArrayList
()); barrier = new CyclicBarrier(2,newBarrierAction()); } public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0, j=0; i < arr.length; i=i+2,j++) { try { list.add(arr[i]); list.add(arr[i+1]); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { try { list.add(arr[i]); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }; } private Runnable newBarrierAction(){ return new Runnable() { @Override public void run() { Collections.sort(list); list.forEach(c->System.out.print(c)); list.clear(); } }; } public static void main(String args[]){ MethodFour four = new MethodFour(); Helper.instance.run(four.newThreadOne()); Helper.instance.run(four.newThreadTwo()); Helper.instance.shutdown(); }}

 这里多说一点,这个API其实还是利用lockcondition,无非是多个线程去争抢CyclicBarrier的instance的lock罢了,最终barrierAction执行时,是在抢到CyclicBarrierinstance的那个线程上执行的。

 

3. 第三种解法,是利用PipedInputStreamAPI;

这里用流在两个线程间通信,但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output。这显然是一种很搓的方式,不过也算是一种通信方式吧……-_-T,执行的时候那种速度简直。。。请不要BS我。

public class MethodSix {    private final PipedInputStream inputStream1;    private final PipedOutputStream outputStream1;    private final PipedInputStream inputStream2;    private final PipedOutputStream outputStream2;    private final byte[] MSG;    public MethodSix() {        inputStream1 = new PipedInputStream();        outputStream1 = new PipedOutputStream();        inputStream2 = new PipedInputStream();        outputStream2 = new PipedOutputStream();        MSG = "Go".getBytes();        try {            inputStream1.connect(outputStream2);            inputStream2.connect(outputStream1);        } catch (IOException e) {            e.printStackTrace();        }    }    public void shutdown() throws IOException {        inputStream1.close();        inputStream2.close();        outputStream1.close();        outputStream2.close();    }    public Runnable newThreadOne() {        final String[] inputArr = Helper.buildNoArr(52);        return new Runnable() {            private String[] arr = inputArr;            private PipedInputStream in = inputStream1;            private PipedOutputStream out = outputStream1;            public void run() {                for (int i = 0; i < arr.length; i=i+2) {                    Helper.print(arr[i], arr[i + 1]);                    try {                        out.write(MSG);                        byte[] inArr = new byte[2];                        in.read(inArr);                        while(true){                            if("Go".equals(new String(inArr)))                                break;                        }                    } catch (IOException e) {                        e.printStackTrace();                    }                }            }        };    }    public Runnable newThreadTwo() {        final String[] inputArr = Helper.buildCharArr(26);        return new Runnable() {            private String[] arr = inputArr;            private PipedInputStream in = inputStream2;            private PipedOutputStream out = outputStream2;            public void run() {                for (int i = 0; i < arr.length; i++) {                    try {                        byte[] inArr = new byte[2];                        in.read(inArr);                        while(true){                            if("Go".equals(new String(inArr)))                                break;                        }                        Helper.print(arr[i]);                        out.write(MSG);                    } catch (IOException e) {                        e.printStackTrace();                    }                }            }        };    }    public static void main(String args[]) throws IOException {        MethodSix six = new MethodSix();        Helper.instance.run(six.newThreadOne());        Helper.instance.run(six.newThreadTwo());        Helper.instance.shutdown();        six.shutdown();    }

4. 第四种解法,是利用BlockingQueue

 

BlockingQueue定义的常用方法如下:

  • add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。
  • offer(Object):表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
  • put(Object):把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。
  • poll(time):获取并删除BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。当不传入time值时,立刻返回。
  • peek():立刻获取BlockingQueue里排在首位的对象,但不从队列里删除,如果队列为空,则返回null。
  • take():获取并删除BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

BlockingQueue有四个具体的实现类:

  • ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  • LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。
  • PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

这里我用了两种玩法:

  • 一种是共享一个queue,根据peekpoll的不同来实现;
  • 第二种是两个queue,利用take()会自动阻塞来实现。
public class MethodSeven {    private final LinkedBlockingQueue
queue = new LinkedBlockingQueue<>(); public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { Helper.print(arr[i], arr[i + 1]); queue.offer("TwoToGo"); while(!"OneToGo".equals(queue.peek())){} queue.poll(); } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { while(!"TwoToGo".equals(queue.peek())){} queue.poll(); Helper.print(arr[i]); queue.offer("OneToGo"); } } }; } private final LinkedBlockingQueue
queue1 = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue
queue2 = new LinkedBlockingQueue<>(); public Runnable newThreadThree() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { Helper.print(arr[i], arr[i + 1]); try { queue2.put("TwoToGo"); queue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }; } public Runnable newThreadFour() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { try { queue2.take(); Helper.print(arr[i]); queue1.put("OneToGo"); } catch (InterruptedException e) { e.printStackTrace(); } } } }; } public static void main(String args[]) throws InterruptedException { MethodSeven seven = new MethodSeven(); Helper.instance.run(seven.newThreadOne()); Helper.instance.run(seven.newThreadTwo()); Thread.sleep(2000); System.out.println(""); Helper.instance.run(seven.newThreadThree()); Helper.instance.run(seven.newThreadFour()); Helper.instance.shutdown(); }

参考:

 

转载地址:https://lemonstone.blog.csdn.net/article/details/85246370 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Java 面试之语言基础
下一篇:消息中间件kafka与activeMQ、rabbitMQ、zeroMQ、rocketMQ的比较

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月13日 05时06分03秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章