当前位置:编程学习 > C/C++ >>

有限容量BlockingQueue:消费者生产者

有限容量的BlockingQueue实现工作队列,用于生产者消费者问题。
 
#include<iostream>  
#include<string>  
#include<vector>  
#include<deque>  
#include<assert.h>  
#include<pthread.h>  
#include<unistd.h>  
#include<boost/noncopyable.hpp>  
#include<boost/shared_ptr.hpp>  
#include<boost/weak_ptr.hpp>  
#include<boost/function.hpp>  
#include<boost/bind.hpp>  
#include<boost/circular_buffer.hpp>  
using namespace std;  
using namespace boost;  
class Mutex:public noncopyable{//互斥量的封装  
    public:  
        Mutex(){  
            pthread_mutex_init(&mutex,NULL);  
        }  
        void lock(){  
            pthread_mutex_lock(&mutex);  
        }  
        void unlock(){  
            pthread_mutex_unlock(&mutex);  
        }  
        ~Mutex(){  
            pthread_mutex_destroy(&mutex);  
        }  
        pthread_mutex_t* getMutex(){  
            return &mutex;  
        }  
    private:  
        mutable pthread_mutex_t mutex;  
};  
class MutexLockGuard:noncopyable{//RAII管理互斥量  
    public:  
        explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){  
            mutex_.lock();  
        }  
        ~MutexLockGuard(){  
            mutex_.unlock();  
        }  
    private:  
        Mutex& mutex_;//注意是引用,Mutex继承了noncopyable后不能拷贝构造  
};  
class Condition:public noncopyable{//条件变量的封装  
    public:  
        explicit Condition(Mutex& mutex):mutex_(mutex){  
            pthread_cond_init(&cond,NULL);  
        }  
        ~Condition(){  
            pthread_cond_destroy(&cond);  
        }  
        void wait(){  
            pthread_cond_wait(&cond,mutex_.getMutex());  
        }  
        void notify(){  
            pthread_cond_signal(&cond);  
        }  
        void notifyALL(){  
            pthread_cond_broadcast(&cond);  
        }  
    private:  
        Mutex& mutex_;//注意是引用  
        pthread_cond_t cond;  
};  
template<typename T>  
class BlockingQueue:noncopyable{  
    public:  
        explicit BlockingQueue(int x):mutex(),full(mutex),empty(mutex),Q(x){}  
        void put(T a){  
            MutexLockGuard guard(mutex);  
            while(Q.full()){//若队列满,则等待空条件empty  
                empty.wait();//等待消费者消费  
            }  
            assert(!Q.full());  
            Q.push_back(a);  
            full.notifyALL();//通知消费者  
        }  
        T take(){  
            MutexLockGuard guard(mutex);  
            while(Q.empty()){//若队列空,则等待满条件full  
                full.wait();//等待生产者生产  
            }  
            assert(!Q.empty());  
            T front(Q.front());  
            front();  
            Q.pop_front();  
            empty.notify();//通知生产者  
            return front;  
        }  
    private:  
        Mutex mutex;  
        Condition full;//满条件  
        Condition empty;//空条件  
        circular_buffer<T> Q;//boost的循环队列  
};  
class test{//任务内容  
    public:  
        explicit test(int x):data(x){}  
        void show(){  
            cout<<"show "<<data<<endl;  
        }  
    private:  
        int data;  
};  
typedef function<void()> Functor;//任务T  
BlockingQueue<Functor> taskQueue(10);  
bool running=true;//终止线程标志  
void* producer(void* arg){//生产者线程  
    int i=0;  
    while(running){  
        //usleep(100);  
        test one(i++);  
        Functor task=bind(&test::show,one);//注意bind会拷贝参数。该处不能用&one.  
        taskQueue.put(task);  
    }  
}  
void* customer(void* arg){//消费者线程  
    while(running){  
        Functor task=taskQueue.take();  
        task();  
    }  
}  
int main(){  
    pthread_t pid0;  
    pthread_t pid[2];  
    pthread_create(&pid0,NULL,producer,NULL);  
    for(int i=0;i<2;i++){  
        pthread_create(&pid[i],NULL,customer,NULL);  
    }  
    sleep(1);  
    running=false;//终止线程  
    pthread_join(pid0,NULL);  
    return 0;  
}  

 

 
补充:软件开发 , C++ ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,