生产者消费者问题,也称有限缓冲问题,是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。

摘自维基百科

并发

生产者消费者问题的本质在于并发时的数据安全问题。如果在并发的时候不存在数据共享,则不存在安全问题,只存在性能问题;如果存在数据共享(主要是写,只有读也没问题),则还需要考虑数据的同步问题。

Java中处理线程安全问题的方式(我知道的)有以下几个:

  • 使用synchronized或者和wait()notify()的配合
  • 使用volatile同步内存修改
  • 使用实现了java.util.concurrent.locks.Lock的锁
  • 如果是集合还可以使用:
    • Hashtable
    • Vector
    • Collections封装原基本集合类:
      • Collections.synchronizedMap()
      • Collections.synchronizedList()
      • Collections.synchronizedSet()
    • java.util.concurrent包下提供的线程安全的集合类,以下列出部分:
      • CopyOnWriteArrayList
      • CopyOnWriteArraySet
      • ConcurrentHashMap
      • ConcurrentLinkedQueue
      • LinkedBlockingQueue

synchronized

使用synchronized关键字须遵循内置的锁等待-释放机制。所有的锁都是块结构的。当进入一个同步方法或同步块的时候必须获得该锁,而退出的时候(即使是异常退出)必须释放这个锁。

锁操作是建立在独立的线程上的而不是独立的调用基础上。一个线程能够进入一个同步代码的条件是当前锁未被占用或者是当前线程已经占用了这个锁,否则线程就会阻塞住。

单独使用

// 实例方法同步,同步在拥有该方法的对象上
public synchronized void setValue(int value){
  mValue = value;
}

// 静态方法同步,同步在拥有该方法的 类对象 上
public synchronized static int getValue(){
	return sValue;
}

// 实例方法中的同步块,同步在括号中的监视器对象上
public void setValue(int value){
	synchronized(this){
		mValue = value;
	}
}

// 静态方法中的同步块,同步在该方法所属的 类对象 上
public class Test{
	private static int sValue;
	public static void setValue(int value){
		synchronized(Test.class){
			sValue = value;
		}
	}
}

配合wait和notify

public class BlockingArray {
	private final int limit;
	private final Object lock = new Object();
	private final Object[] items;
	
	public BlockingArray(int capacity){
		limit = capacity;
		items = new Object[limit];
	}
	
	public void blockingRemove() {
		try {
			synchronized (lock) {
				while (isEmpty()) {
					//empty to wait
					lock.wait();
					//awake from empty
				}
				Object item = removeFirst();// remove the first item in array
				log("remove : " + item.toString());
				lock.notify();
			}
		} catch (InterruptedException e) {
		}
	}
	
	public void blockingAdd(Object item) {
		try {
			synchronized (lock) {
				while (isFull()) {
					//full to wait
					lock.wait();
					//awake from full
				}
				addLast(item);
				log("add : " + item.toString());
				lock.notify();
			}
		} catch (InterruptedException e) {
		}
	}
	
	public int size(){...}
	
	public boolean isEmpty() {...}
	
	public boolean isFull() {...}
	
	private void log(String msg) {...}
}

其中size() isEmpty() isFull() 方法里为public,所以也应该处理线程安全问题。

volatile

每一个Object类及其子类的实例都拥有一个锁。其中,标量类型int,float等不是对象类型,但是标量类型可以通过其包装类来作为锁。单独的成员变量是不能被标明为同步的。锁只能用在使用了这些变量的方法上。但是成员变量可以被声明为volatile,这种方式会影响该变量的原子性,可见性以及排序性。

类似的,持有标量变量元素的数组对象拥有锁,但是其中的标量元素却不拥有锁。(也就是说,没有办法将数组成员声明为volatile类型的)。如果锁住了一个数组并不代表其数组成员都可以被原子的锁定。也没有能在一个原子操作中锁住多个对象的方法。

Lock

Lock接口提供了一组广泛、灵活的锁操作。Lock必须被显式地创建、锁定和释放。和关键字synchronized相比,JDK1.5的时候使用synchronized,吞吐量严重下降,而Lock则基本保持稳定;到了JDK1.6时,对synchronized做了很多优化,性能上不比Lock差,所以仅当synchronized不能实现需求的时候,才优先考虑Lock来进行同步。

Lock接口有三个实现类:

  • ReentrantLock 重入锁
  • ReetrantReadWriteLock.ReadLock 读锁
  • ReetrantReadWriteLock.WriteLock 写锁

用法上ReentrantLocksynchronized很像,相对于synchronized而言,ReentrantLock有几个高级一点的功能:

  • 等待可中断:当持有锁的线程长期不释放锁时,正在等待的线程可以选择放弃等待,改为处理其他事情。
ReentrantLock lock = new ReentrantLock();   
//获取响应中断锁,如果当前线程被打断,这里的等待会被放弃
lock.lockInterruptibly();  
try {
	// do something  
}finally{  
    lock.unlock();		//保证锁一定会释放  
} 
  • 可实现公平锁:多个线程在等待同一个锁时,必须按照申请锁的时间顺序排队等待,而非公平锁则不保证这点,在锁释放时,任何一个等待锁的线程都有机会获得锁。synchronized中的锁时非公平锁,ReentrantLock默认情况下也是非公平锁。
ReentrantLock lock = new ReentrantLock(true);
  • 锁可以绑定多个条件ReentrantLock可以同时绑定多个Condition,也就是说有多个关联条件的时候,不需要创建额外的锁,通过ReentrantLock. newCondition()绑定多个Condition即可。
package pub.fury.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by zhenxi on 2016/9/28.
 * producer & consumer problem
 */
public class PCPLock {
	private static final int L = 10;
	private static final Random RANDOM = new Random();
	
	public static void main(String[] args) throws InterruptedException {
		final BufferedList<String> list = new BufferedList<String>(5);
		
		int processors = Runtime.getRuntime().availableProcessors();
//		ScheduledExecutorService service = Executors.newScheduledThreadPool(processors + 1);
		for (int i = 0; i < processors; i++) {
//			service.execute(new Producer(list));
			new Thread(new Consumer(list), "Consumer-" + i).start();
		}
		Thread.sleep(1000);
		for (int i = 0; i < processors; i++) {
//			service.execute(new Consumer(list));
			new Thread(new Producer(list), "Producer-" + i).start();
		}
	}
	
	public static class Producer implements Runnable {
		private final BufferedList<String> list;
		private static final int limit = L;
		private int count = 0;
		
		public Producer(BufferedList<String> list) {
			this.list = list;
		}
		
		@Override
		public void run() {
			for (; ; ) {
				try {
					if (count < limit) {
						list.add(System.nanoTime() + "");
						count++;
						Thread.sleep(RANDOM.nextInt(1000));
					} else {
						log("生产完成");
						return;
					}
				} catch (InterruptedException e) {
					log(e.toString());
				}
			}
		}
	}
	
	public static class Consumer implements Runnable {
		
		private final BufferedList list;
		private static final int limit = L;
		private int count = 0;
		
		public Consumer(BufferedList list) {
			this.list = list;
		}
		
		@Override
		public void run() {
			for (; ; ) {
				try {
					if (count < limit) {
						list.remove();
						count++;
						Thread.sleep(RANDOM.nextInt(1000));
					} else {
						log("消费完成");
						return;
					}
				} catch (InterruptedException e) {
					log(e.toString());
				}
			}
		}
	}
	
	public static class BufferedList<T> {
		//		private final Queue<T> items;
		private final List<T> items;
		private final int limit;
		private static final Lock lock = new ReentrantLock();
		private static final Condition notFull = lock.newCondition();
		private static final Condition notEmpty = lock.newCondition();
		
		public BufferedList(int capacity) {
			limit = capacity;
//			items = Collections.synchronizedList(new ArrayList<T>(capacity));
//			items = new ConcurrentLinkedQueue<T>();
			items = new ArrayList<T>(limit);
		}
		
		public void add(T t) throws InterruptedException {
			lock.lock();
			try {
				while (isFull()) {
					log("Full!");
					notFull.await();
				}
				items.add(t);
				log("add : " + t.toString());
				notEmpty.signal();
			} catch (InterruptedException e) {
				log("Interrupted!");
			} finally {
				lock.unlock();
			}
		}
		
		public void remove() throws InterruptedException {
			lock.lock();
			try {
				while (isEmpty()) {
					log("Empty");
					notEmpty.await();
				}
				T item = items.remove(0);
				log("remove : " + item.toString());
				notFull.signal();
			} catch (InterruptedException e) {
				log("Interrupted!");
			} finally {
				lock.unlock();
			}
		}
		
		public int size() {
			lock.lock();
			try {
				return items.size();
			} finally {
				lock.unlock();
			}
		}
		
		private boolean isEmpty() {
			return items.isEmpty();
		}
		
		private boolean isFull() {
			return items.size() >= limit;
		}
		
		private void log(String msg) {
			PCPLock.log(msg + "\t size:[" + size() + "]");
		}
	}
	
	public static void log(String msg) {
		String tName = Thread.currentThread().getName();
		System.out.println("[" + tName + "]" + msg);
	}
}

集合

java.util包中的集合类返回 fail-fast 迭代器,意味着它们假设线程在集合内容中进行迭代时,集合不会更改它的内容。如果 fail-fast 迭代器检测到在迭代过程中进行了更改操作,那么它会抛出 ConcurrentModificationException,这是不可控异常。

当某一个线程A通过iterator去遍历某集合的过程中,若该集合的内容被其他线程所改变了;那么线程A访问集合时,就会抛出ConcurrentModificationException异常,产生fail-fast事件。

java.util.concurrent集合返回的迭代器称为弱一致的(weakly consistent)迭代器。对于这些类,如果元素自从迭代开始已经删除,且尚未由 next() 方法返回,那么它将不返回到调用者。如果元素自迭代开始已经添加,那么它可能返回调用者,也可能不返回。在一次迭代中,无论如何更改底层集合,元素不会被返回两次。

正是因为使用Vector或使用同步的List封装器,返回的迭代器是 fail-fast 的,这意味着如果在迭代过程中任何其他线程修改 List,迭代可能失败。这个时候CopyOnWriteArrayListCopyOnWriteArraySet就派上用场了,同样的还包括ConcurrentHashMap等。

参考

Doug Lea 大神的文章

java.util.concurrent介绍