00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef AMINO_WS_SCHEDULER_H
00018 #define AMINO_WS_SCHEDULER_H
00019
00020 #include <stdexcept>
00021 #include <assert.h>
00022
00023 #include <amino/bdeque.h>
00024
00025 #ifdef DEBUG_SCHEDULER
00026 #include <sstream>
00027 using namespace std;
00028 #endif
00029
00030 namespace amino {
00038 template<typename TaskType>
00039 class ws_scheduler {
00040 private:
00041 typedef BlockingDeque<TaskType *>* pBlockingDeque;
00042 int fThreadNum;
00043 pBlockingDeque * queues;
00044 int randInd;
00045 public:
00046 ws_scheduler(int threadNum) {
00047 randInd = 0;
00048 fThreadNum = threadNum;
00049 queues = new pBlockingDeque[threadNum];
00050 for (int i = 0; i < threadNum; i++) {
00051 queues[i] = new BlockingDeque<TaskType *> ();
00052 }
00053 }
00054
00055 virtual ~ws_scheduler() {
00056 for (int i = 0; i < fThreadNum; i++) {
00057 delete queues[i];
00058 }
00059 delete[] queues;
00060 }
00061
00069 void shutdown() {
00070 for (int i = 0; i < fThreadNum; i++) {
00071 queues[i]->pushLeft(NULL);
00072 }
00073 }
00074
00081 void addTask(TaskType * task) {
00082 addTask(randInd++, task);
00083 }
00084
00093 void addTask(int index, TaskType* task) {
00094 index = index % fThreadNum;
00095 queues[index]->pushLeft(task);
00096 }
00097
00107 TaskType * getTask(int threadId) {
00108 pBlockingDeque curQ = queues[threadId];
00109
00110 TaskType * res;
00111 if (curQ->popRight(res)) {
00112 if (res != NULL)
00113 return res;
00114
00115 if (!curQ->empty()) {
00116 cerr<< "Current queue is not empty after retrieved end signal\n";
00117 curQ ->pushLeft(NULL);
00118 if(curQ->popRight(res))
00119 assert(res!=NULL);
00120 return res;
00121 }
00122
00123 return NULL;
00124
00125 } else {
00126 return stealing(threadId);
00127 }
00128 }
00129 private:
00130 TaskType * stealing(int threadId) {
00131 for (int i = 0; i < fThreadNum - 1; i++) {
00132 TaskType * res;
00133 if (queues[(i + threadId) % fThreadNum]->popLeft(res)) {
00134
00135 if (res != NULL) {
00136 return res;
00137 }
00138
00139
00140 #ifdef DEBUGJ
00141 if(!queues[(i+threadId)%fThreadNum]->empty()) {
00142 cout<<"Queue is not empty after retrieved end signal\n";
00143 }
00144 #endif
00145
00146 queues[(i + threadId) % fThreadNum]->pushLeft(res);
00147 break;
00148 }
00149 }
00150 TaskType * ret;
00151 queues[threadId]->takeRight(ret);
00152 return ret;
00153 }
00154 };
00155 }
00156 ;
00157
00158 #endif