00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 #ifndef AMINO_WS_SCHEDULER_H
00018 #define AMINO_WS_SCHEDULER_H
00019 
00020 #include <stdexcept>
00021 #include <assert.h>
00022 
00023 #include <amino/bdeque.h>
00024 
00025 #ifdef DEBUG_SCHEDULER
00026 #include <sstream>
00027 using namespace std;
00028 #endif
00029 
00030 namespace amino {
00038 template<typename TaskType>
00039 class ws_scheduler {
00040 private:
00041         typedef BlockingDeque<TaskType *>* pBlockingDeque;
00042         int fThreadNum;
00043         pBlockingDeque * queues;
00044         int randInd;
00045 public:
00046         ws_scheduler(int threadNum) {
00047                 randInd = 0;
00048                 fThreadNum = threadNum;
00049                 queues = new pBlockingDeque[threadNum];
00050                 for (int i = 0; i < threadNum; i++) {
00051                         queues[i] = new BlockingDeque<TaskType *> ();
00052                 }
00053         }
00054 
00055         virtual ~ws_scheduler() {
00056                 for (int i = 0; i < fThreadNum; i++) {
00057                         delete queues[i];
00058                 }
00059                 delete[] queues;
00060         }
00061 
00069         void shutdown() {
00070                 for (int i = 0; i < fThreadNum; i++) {
00071                         queues[i]->pushLeft(NULL);
00072                 }
00073         }
00074 
00081         void addTask(TaskType * task) {
00082         addTask(randInd++, task);
00083         }
00084 
00093         void addTask(int index, TaskType* task) {
00094                 index = index % fThreadNum;
00095                 queues[index]->pushLeft(task);
00096         }
00097 
00107         TaskType * getTask(int threadId) {
00108                 pBlockingDeque curQ = queues[threadId];
00109 
00110                 TaskType * res;
00111                 if (curQ->popRight(res)) {
00112                         if (res != NULL)
00113                                 return res;
00114                         
00115                         if (!curQ->empty()) {
00116                                 cerr<< "Current queue is not empty after retrieved end signal\n";
00117                                 curQ ->pushLeft(NULL);
00118                                 if(curQ->popRight(res))
00119                     assert(res!=NULL);
00120                                 return res;
00121                         }
00122 
00123                         return NULL;
00124 
00125                 } else {
00126                         return stealing(threadId);
00127                 }
00128         }
00129 private:
00130         TaskType * stealing(int threadId) {
00131                 for (int i = 0; i < fThreadNum - 1; i++) {
00132                         TaskType * res;
00133                         if (queues[(i + threadId) % fThreadNum]->popLeft(res)) {
00134                                 
00135                                 if (res != NULL) {
00136                                         return res;
00137                                 }
00138 
00139                                 
00140 #ifdef DEBUGJ
00141                                 if(!queues[(i+threadId)%fThreadNum]->empty()) {
00142                                         cout<<"Queue is not empty after retrieved end signal\n";
00143                                 }
00144 #endif
00145                                 
00146                                 queues[(i + threadId) % fThreadNum]->pushLeft(res);
00147                                 break;
00148                         }
00149                 }
00150                 TaskType * ret;
00151                 queues[threadId]->takeRight(ret);
00152                 return ret;
00153         }
00154 };
00155 }
00156 ;
00157 
00158 #endif