有限容量BlockingQueue:消费者生产者
有限容量的BlockingQueue实现工作队列,用于生产者消费者问题。#include<iostream> #include<string> #include<vector> #include<deque> #include<assert.h> #include<pthread.h> #include<unistd.h> #include<boost/noncopyable.hpp> #include<boost/shared_ptr.hpp> #include<boost/weak_ptr.hpp> #include<boost/function.hpp> #include<boost/bind.hpp> #include<boost/circular_buffer.hpp> using namespace std; using namespace boost; class Mutex:public noncopyable{//互斥量的封装 public: Mutex(){ pthread_mutex_init(&mutex,NULL); } void lock(){ pthread_mutex_lock(&mutex); } void unlock(){ pthread_mutex_unlock(&mutex); } ~Mutex(){ pthread_mutex_destroy(&mutex); } pthread_mutex_t* getMutex(){ return &mutex; } private: mutable pthread_mutex_t mutex; }; class MutexLockGuard:noncopyable{//RAII管理互斥量 public: explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){ mutex_.lock(); } ~MutexLockGuard(){ mutex_.unlock(); } private: Mutex& mutex_;//注意是引用,Mutex继承了noncopyable后不能拷贝构造 }; class Condition:public noncopyable{//条件变量的封装 public: explicit Condition(Mutex& mutex):mutex_(mutex){ pthread_cond_init(&cond,NULL); } ~Condition(){ pthread_cond_destroy(&cond); } void wait(){ pthread_cond_wait(&cond,mutex_.getMutex()); } void notify(){ pthread_cond_signal(&cond); } void notifyALL(){ pthread_cond_broadcast(&cond); } private: Mutex& mutex_;//注意是引用 pthread_cond_t cond; }; template<typename T> class BlockingQueue:noncopyable{ public: explicit BlockingQueue(int x):mutex(),full(mutex),empty(mutex),Q(x){} void put(T a){ MutexLockGuard guard(mutex); while(Q.full()){//若队列满,则等待空条件empty empty.wait();//等待消费者消费 } assert(!Q.full()); Q.push_back(a); full.notifyALL();//通知消费者 } T take(){ MutexLockGuard guard(mutex); while(Q.empty()){//若队列空,则等待满条件full full.wait();//等待生产者生产 } assert(!Q.empty()); T front(Q.front()); front(); Q.pop_front(); empty.notify();//通知生产者 return front; } private: Mutex mutex; Condition full;//满条件 Condition empty;//空条件 circular_buffer<T> Q;//boost的循环队列 }; class test{//任务内容 public: explicit test(int x):data(x){} void show(){ cout<<"show "<<data<<endl; } private: int data; }; typedef function<void()> Functor;//任务T BlockingQueue<Functor> taskQueue(10); bool running=true;//终止线程标志 void* producer(void* arg){//生产者线程 int i=0; while(running){ //usleep(100); test one(i++); Functor task=bind(&test::show,one);//注意bind会拷贝参数。该处不能用&one. taskQueue.put(task); } } void* customer(void* arg){//消费者线程 while(running){ Functor task=taskQueue.take(); task(); } } int main(){ pthread_t pid0; pthread_t pid[2]; pthread_create(&pid0,NULL,producer,NULL); for(int i=0;i<2;i++){ pthread_create(&pid[i],NULL,customer,NULL); } sleep(1); running=false;//终止线程 pthread_join(pid0,NULL); return 0; }
补充:软件开发 , C++ ,