ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

多线程中生产消费模型

2021-11-30 19:59:57  阅读:154  来源: 互联网

标签:抽水 objs 消费 模型 注水 线程 多线程 唤醒 public


一、生产消费模型

实际生活中,需要操作共享的某个资源(水池),但是对这个共享资源的操作方式不同(部分是注水[生产]、部分是抽水[消费])。把这种现象我们可以称为生产和消费模型。

生产:它可以采用部分线程进行模拟。多个线程同时给水池中注水。

消费:它可以采用部分线程进行模拟。多个线程同时从水池中抽水。

对资源的不同的操作方式,每种方式都可以让部分的线程去负责。多个不同的线程,他们对相同的资源(超市、水池等)操作方式不一致。

这个时候我们不能使用一个run方法对线程的任务进行封装。所以这里就需要定义不同的线程任务类,描述不同的线程的任务。

通过不同的线程操作,来控制同一个资源,这种现象就属于生产消费模型

二、 简单的实现生产消费模型

创建公共资源类
1.创建公共资源类

package com.wangxing.test1;
/*
 * 被多个线程操作的共享数据的共享资源类
 */
public class Resoucre {
	//保存共享资源的数组【资源池】
	private Object objs[]=new Object[1];
	//记录生产和消费的方法
	private int num=1;
	
	//生产的方法
	public void production(){
		objs[0]="水"+num;
		System.out.println(Thread.currentThread().getName() + "正在生产:" + objs[0]);
		num++;
	}
	//消费的方法
	public void delete(){
		System.out.println(Thread.currentThread().getName() + "消费中:" + objs[0]);
		objs[0]=null;
	}
}

2.生产共享资源的目标类

package com.wangxing.test1;
/*
 * 生产共享资源的目标类
 */
public class product implements Runnable{
	//定义共享资源对象
	private Resoucre resoucre;
	//通过构造方法传入共享资源对象
	public product(Resoucre resoucre){
		this.resoucre=resoucre;
	}
	//重写Runnable接口的run方法(为了共享字眼选择这个)
	@Override
	public void run() {
		//持续注水
		while(true){
			//访问共享资源的生产方法
			resoucre.production();
		}
	}
}

3.消费共享资源的目标类

package com.wangxing.test1;

public class XiaoFei implements Runnable{
	// 定义共享资源对象
	private Resoucre resoucre;

	// 通过构造方法传入共享资源对象
	public XiaoFei(Resoucre resoucre){
			this.resoucre=resoucre;
		}

	// 重写Runnable接口的run方法(为了共享字眼选择这个)
	@Override
	public void run() {
		//持续
		while(true){
			// 访问共享资源的生产方法
			resoucre.delete();
		}
	}
}

4.测试

package com.wangxing.test1;

import javax.annotation.Resource;

public class Main {

	public static void main(String[] args) {
		//创建共享资源类对象
		Resoucre resoucre=new Resoucre();
		//创建生产共享资源的目标类对象
		product sc=new product(resoucre);
		//创建消费共享资源的目标类对象
		XiaoFei xFei=new XiaoFei(resoucre);
		//创建生产者线程对象	
		Thread scth=new Thread(sc);
		//创建消费者线程对象
		Thread xfTh=new Thread(xFei);
		//启动生产和消费线程
		scth.start();
		xfTh.start();
	}
}

有时会出现生产者注水为null的情况:

有两个线程分别是生产者负责注水的线程和消费者负责抽水线程。
假设CPU在消费者线程上,那么消费者正要打印了抽水为null的情况下,还没有将数组空间赋值为null之前,CPU切换到生产者,生产者将水注到数组空间中之后,还没有打印,CPU又切回到消费者线程上,消费者线程就会将数组空间立刻赋值为null。CPU如果再切回到生产者线程上,打印出来的注水就是null。

有时会出现消费者抽水为null的情况:

有两个线程分别是生产者负责注水的线程和消费者负责抽水线程。
假设CPU在消费者线程上,那么消费者打印完抽水为”水1”的情况下,还没有将数组空间赋值为null之前,CPU切换到生产者,生产者将水注到数组空间中之后,打印出正要注进入的水是:水2,CPU又切回到消费者线程上,消费者线程就会将数组空间立刻赋值为null。CPU如果再切回到生产者线程上,执行了注水次数加1之后。CPU如果再切回到消费者线程上,这是消费者线程就会输出抽水为null的情况。

上面的这两个问题就是因为当前线程正在访问的共享资源的时候,其他的线程也可以访问共享资源所产生的。所以线程操作共享数据时,需要进行线程同步。

线程同步能够保证注水的时候不能抽水,或者抽水的时候不能给当前这个空间注水。
修改Resource为注水和抽水方法添加同步代码块保证注水的时候不能抽水,或者抽水的时候不能给当前这个空间注水。

package com.wangxing.test1;
/*
 * 被多个线程操作的共享数据的共享资源类
 */
public class Resoucre {
	//保存共享资源的数组【资源池】
	private Object objs[]=new Object[1];
	//记录生产和消费的方法
	private int num=1;
	//创建同步对象
	private static final Object loc=new Object();
	
	//生产的方法
	public void production(){
		synchronized(loc){
			objs[0]="水"+num;
			System.out.println(Thread.currentThread().getName() + "正在生产:" + objs[0]);
			num++;
		}
	}
	//消费的方法
	public void delete(){
		synchronized(loc){
			System.out.println(Thread.currentThread().getName() + "消费中:" + objs[0]);
			objs[0]=null;
		}
	}
}

上面执行完成以后会出现多次注水没有抽水,或者多次抽水,没有注水的问题?

要解决上面这个多次操作的问题,首先需要先判断是否满足抽水或者注水的条件。

什么时候抽水:当数组空间中不是null的时候可以进行抽水。

什么时候注水:数组空间为null的时候才能注水。

如果不满足注水的时候,但是当前正好CPU在注水的线程上,这时就必须让这个注水的线程等待,等到可以注水的时候将本次注水的动作做完。

如果不满足抽水的时候,但是当前正好CPU在抽水的线程上,必须让抽水的线程等待,等到数组有水的时候将本次的抽水的动作做完。

需要使用Java中线程等待和唤醒机制(线程间的通信

等待:如果判断发现不满足,这个线程就要等待。等待到满足操作的时候,才能继续进行执行。

注水线程注水结束之后,应该告诉抽水线程可以抽水。同样道理,抽水线程抽完水之后,应该告诉注水线程可以注水了。

唤醒:当某个一方操作完成之后,需要将处于另外一方操作的等待的线程等待的状态恢复到可以操作的状态(把一方通知另外一方的这个操作称为线程的唤醒)。

在Java提供两个不同的方法分别代表等待和唤醒:

等待和唤醒的方法没有定义在Thread类中,而是定义在Object类中(因为只有同步的锁才能让线程等待或者将等待的线程唤醒,而同步的锁是任意对象,等待和唤醒的方法只能定义在Object类中)

 void

wait() 在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等待

 void

notify() 唤醒在此对象监视器上等待的单个线程。

 void

notifyAll() 唤醒在此对象监视器上等待的所有线程。

注意:等待和唤醒(线程通信)必须位于同步中。因为等待和唤醒必须使用当前的锁才完成。
修改Resource为注水和抽水方法添加线程等待和唤醒操作

package com.wangxing.test1;
/*
 * 被多个线程操作的共享数据的共享资源类
 */
public class Resoucre {
	//保存共享资源的数组【资源池】
	private Object objs[]=new Object[1];
	//记录生产和消费的方法
	private int num=1;
	//创建同步对象
	private static final Object loc=new Object();
	
	//生产的方法
	public void production() throws Exception{
		synchronized(loc){
			//生产时判断数组是否有水
			//有水,就无需注水,如果此时正好切换到注水线程,
			//那么生产线程就应该等待
			if (objs[0]!=null) {
				//注水线程等待
				loc.wait();
			}
			objs[0]="水"+num;
			System.out.println(Thread.currentThread().getName() + "正在生产:" + objs[0]);
			num++;
			//唤醒消费线程运行
			loc.notify();
		}
	}
	//消费的方法
	public void delete() throws Exception{
		synchronized(loc){
			//消费时判断数组是否有水
			//没水,就无需抽水,如果此时正好切换到抽水线程,
			//那么消费线程就应该等待
			if(objs[0]==null){
				loc.wait();
			}
			System.out.println(Thread.currentThread().getName() + "消费中:" + objs[0]);
			objs[0]=null;
			//唤醒生产线程
			loc.notify();
		}
	}
}

 上面的程序处理好了单线程的注水和抽水动作。

下面我们将程序修改成多注水和所抽水的情况。
修改主类多创建几个注水和抽水线程对象,并启动运行。

package com.wangxing.test1;

import javax.annotation.Resource;

public class Main {

	public static void main(String[] args) {
		//创建共享资源类对象
		Resoucre resoucre=new Resoucre();
		//创建生产共享资源的目标类对象
		product sc=new product(resoucre);
		//创建消费共享资源的目标类对象
		XiaoFei xFei=new XiaoFei(resoucre);
		//创建生产者线程对象	
		Thread scth=new Thread(sc);
		Thread scth2=new Thread(sc);
		//创建消费者线程对象
		Thread xfTh=new Thread(xFei);
		Thread xfTh2=new Thread(xFei);
		//启动生产和消费线程
		scth.start();
		scth2.start();
		xfTh.start();
		xfTh2.start();
	}
}

 将单注水和单抽水修改为两个注水和两个抽水,结果程序中又出现了多次注水,或者多次抽水的现象。
发生这个现象原因:是因为在唤醒的时候,抽水的线程将另外一个抽水的线程唤醒了。或者注水的线程将另外一个注水的线程唤醒了。只要自己同伴线程将自己唤醒之后,这时被唤醒的线程就可以继续操作。导致出现了多次注水,或者多次抽水的现象。
解决上面的问题:将判断有没有水的if修改为while即可。唤醒之后可以继续判断。

package com.wangxing.test1;
/*
 * 被多个线程操作的共享数据的共享资源类
 */
public class Resoucre {
	//保存共享资源的数组【资源池】
	private Object objs[]=new Object[1];
	//记录生产和消费的方法
	private int num=1;
	//创建同步对象
	private static final Object loc=new Object();
	
	//生产的方法
	public void production() throws Exception{
		synchronized(loc){
			//生产时判断数组是否有水
			//有水,就无需注水,如果此时正好切换到注水线程,
			//那么生产线程就应该等待
			while(objs[0]!=null) {
				//注水线程等待
				loc.wait();
			}
			objs[0]="水"+num;
			System.out.println(Thread.currentThread().getName() + "正在生产:" + objs[0]);
			num++;
			//唤醒消费线程运行
			loc.notify();
		}
	}
	//消费的方法
	public void delete() throws Exception{
		synchronized(loc){
			//消费时判断数组是否有水
			//没水,就无需抽水,如果此时正好切换到抽水线程,
			//那么消费线程就应该等待
			while(objs[0]==null){
				loc.wait();
			}
			System.out.println(Thread.currentThread().getName() + "消费中:" + objs[0]);
			objs[0]=null;
			//唤醒生产线程
			loc.notify();
		}
	}
}

 修改为while之后,程序又出现了新的问题:死锁(所有的线程都处于等待状态了。外面没有可以执行的线程了)。

解决方案:只能使用notifyAll唤醒所有线程。每次在唤醒的时候都是唤醒所有线程,即使唤醒了自己的同伴,也无所谓,因为还要继续判断,这样一定还会等待,但是唤醒唤醒中一定有另外一方的线程,它们肯定不会等待。它们不等待,就会去操作,它们操作完成也唤醒所有。

package com.wangxing.test1;
/*
 * 被多个线程操作的共享数据的共享资源类
 */
public class Resoucre {
	//保存共享资源的数组【资源池】
	private Object objs[]=new Object[1];
	//记录生产和消费的方法
	private int num=1;
	//创建同步对象
	private static final Object loc=new Object();
	
	//生产的方法
	public void production() throws Exception{
		synchronized(loc){
			//生产时判断数组是否有水
			//有水,就无需注水,如果此时正好切换到注水线程,
			//那么生产线程就应该等待
			while(objs[0]!=null) {
				//注水线程等待
				loc.wait();
			}
			objs[0]="水"+num;
			System.out.println(Thread.currentThread().getName() + "正在生产:" + objs[0]);
			num++;
			//唤醒消费线程运行
			loc.notifyAll();
		}
	}
	//消费的方法
	public void delete() throws Exception{
		synchronized(loc){
			//消费时判断数组是否有水
			//没水,就无需抽水,如果此时正好切换到抽水线程,
			//那么消费线程就应该等待
			while(objs[0]==null){
				loc.wait();
			}
			System.out.println(Thread.currentThread().getName() + "消费中:" + objs[0]);
			objs[0]=null;
			//唤醒生产线程
			loc.notifyAll();
		}
	}
}
package com.wangxing.test1;
/*
 * 生产共享资源的目标类
 */
public class product implements Runnable{
	//定义共享资源对象
	private Resoucre resoucre;
	//通过构造方法传入共享资源对象
	public product(Resoucre resoucre){
		this.resoucre=resoucre;
	}
	//重写Runnable接口的run方法(为了共享字眼选择这个)
	//定义共享的生产资源
	private int i=50;
	@Override
	public void run() {
		//持续注水
		while(i>1){
			//访问共享资源的生产方法
			try {
				resoucre.production();
				i--;
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}	
	}
}

 多生产多消费的程序中,为了保证不出现全部线程被wait的情况,只能在唤醒的时候使用notifyAll将所有处于等待的线程唤醒。这样每次都可以保证一定会有存活的线程。但是这种唤醒效率太低了,经常会发生生产方唤醒自己的同伴线程,或者是消费方唤醒自己的同伴线程。

在JDK5中提供Condition接口。它用来代替等待和唤醒机制。
java.util.concurrent.locks接口 Condition
public interface Condition
在JDK5之前,一个同步的锁下面的等待和唤醒无法辨别当前让等待或唤醒的线程到底属于生产还是属于消费。而Condition接口,它可以创建出不同的等待和唤醒的对象,然后可以用在不同的场景下:
可以创建一个Condition对象,专门负责生产。
可以创建一个Condition对象,专门负责消费。
可以通过负责生产的Condition对象专门监视负责生产的线程。通过负责消费的Condition监视消费的线程。等待和唤醒的时候,可以使用各自的Condition对象。

 void

await()造成当前线程在接到信号或被中断之前一直处于等待状态。

 void

signal()唤醒一个等待线程。

 void

signalAll() 唤醒所有等待线程。

注意:如果要想使用Condition接口,同步必须使用Lock接口。
如果程序中同步使用的同步代码块,等待和唤醒只能使用Object中的wait、notify、notifyAll方法。
只有同步使用的Lock接口,等待和唤醒才能使用Condition接口。
其他同上只需要更改共享数据的共享资源类Resource类

package com.wangxing.test2;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*
 * 被多个线程操作的共享数据的共享资源类
 */
public class Resoucre {
	// 保存共享资源的数组【资源池】
	private Object objs[] = new Object[1];
	// 记录生产和消费的方法
	private int num = 1;
	// 创建Lock接口,作为同步的锁
	private Lock lock = new ReentrantLock();
	// 创建监视生产的线程
	private Condition scCondition = lock.newCondition();
	// 创建监视消费的线程
	private Condition xfCondition = lock.newCondition();

	// 生产的方法
	public void production() {
		try {
			// 线程同步,获取锁
			lock.lock();
			// 生产时判断数组是否有水
			// 有水,就无需注水,如果此时正好切换到注水线程,
			// 那么生产线程就应该等待
			while (objs[0] != null) {
				// 生产线程等待
				scCondition.await();
			}
			objs[0] = "水" + num;
			System.out.println(Thread.currentThread().getName() + "正在生产:" + objs[0]);
			num++;
			// 唤醒消费线程运行
			xfCondition.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 关锁
			lock.unlock();
		}
	}

	// 消费的方法
	public void delete(){
		try {
			//线程同步,获取锁
			lock.lock();
			// 消费时判断数组是否有水
			// 没水,就无需抽水,如果此时正好切换到抽水线程,
			// 那么消费线程就应该等待
			while (objs[0] == null){
				//消费线程等待
				xfCondition.await();
			}
			System.out.println(Thread.currentThread().getName() + "消费中:" + objs[0]);
			objs[0] = null;
			// 唤醒生产线程
			scCondition.signal();
		}catch (Exception e) {
				e.printStackTrace();
		}finally{
			//关锁
			lock.unlock();
		}
	}
}

为什么Lock接口替换同步代码块?

不使用Lock接口完成线程同步,那么我们就得使用同步代码【synchronized】,实现线程同步。如果我们使用同步代码【synchronized】,实现线程同步的话,这时我们就只能使用Object类提供的wait、notify、notifyAll方法。来实现线程的等待和唤醒操作。缺点就是通过Object类提供的notifyAll这个方法会唤醒所有的等待线程,这时就就可能会唤醒自己的同伴线程,如果唤醒的是自己的同伴线程的话,那么程序就会多执行一次是否注水/抽水的判断过程,这样程序的执行效率就会降低。为了提高程序的运行效率,我们就需要在唤醒等待的线程的时候,只唤醒注水线程/抽水线程,而不会唤醒自己的同伴线程,这时我们就需要使用Condition 接口提供的等待和唤醒方法【await(),signal(),signalAll()】,因为他可以只唤醒对方线程,而不会唤醒同伴线程。Condition 接口在使用的时候是需要Lock接口对象的newCondition 方法才能创建出Condition 接口对象。所以我们在此处就使用Lock接口对象实现线程同步,来代替同步代码【synchronized】,实现线程同步。

同步代码【synchronized】-----Object类提供的wait、notify、notifyAll方法

Lock接口对象实现线程同步----Condition 接口提供的await(),signal(),signalAll()方法

signalAll这个唤醒全部线程在什么情况下使用?

signalAll这个方法是Condition 接口提供的唤醒所有等待线程,出现死锁的情况的时候可以使用signalAll这个方法,唤醒同一类的等待线程。

等待与唤醒机制的方式有2种,区别

Object类提供的wait、notify、notifyAll方法

Condition 接口提供的await(),signal(),signalAll()方法

同步代码【synchronized】实现线程同步

Lock接口对象实现线程同步

效率低

效率高

notify 与notifyAll的区别

notify

notifyAll

只随机唤醒一个 wait 线程

唤醒所有 wait 线程

可能会导致死锁

不会导致死锁

唤醒等待的线程不分彼此

signalsignalAll的区别

signal

signalAll

只随机唤醒一个 wait 线程【同一类】

唤醒所有 wait 线程【同一类】

sleep 与wait的区别

sleep

wait

Thread

Object

依赖于系统时钟和CPU调度机制

线程调用notify()或者notifyAll()方法

不释放已获取的锁资源

释放已获取的锁资源

标签:抽水,objs,消费,模型,注水,线程,多线程,唤醒,public
来源: https://blog.csdn.net/slom_fxt/article/details/121637151

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有