00001 /* 00002 * (c) Copyright 2008, IBM Corporation. 00003 * Licensed under the Apache License, Version 2.0 (the "License"); 00004 * you may not use this file except in compliance with the License. 00005 * You may obtain a copy of the License at 00006 * 00007 * http://www.apache.org/licenses/LICENSE-2.0 00008 * 00009 * Unless required by applicable law or agreed to in writing, software 00010 * distributed under the License is distributed on an "AS IS" BASIS, 00011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00012 * See the License for the specific language governing permissions and 00013 * limitations under the License. 00014 * 00015 * Change History: 00016 * 00017 * yy-mm-dd Developer Defect Description 00018 * -------- --------- ------ ----------- 00019 * 08-07-28 ganzhi N/A Initial version 00020 */ 00021 #ifndef AMINO_UTIL_THREAD_EXECUTOR 00022 #define AMINO_UTIL_THREAD_EXECUTOR 00023 00024 #include <pthread.h> 00025 #include <string> 00026 #include <stdio.h> 00027 #include <memory> 00028 00029 #include <amino/thread.h> 00030 #include <amino/exec_serv.h> 00031 #include <amino/condition.h> 00032 #include <amino/mutex.h> 00033 #include <amino/lock.h> 00034 #include <amino/ws_scheduler.h> 00035 #include <amino/util.h> 00036 #include <amino/future.h> 00037 #include <amino/ftask.h> 00038 00039 #ifdef DEBUG_EXEC 00040 #include <sstream> 00041 #endif 00042 00043 namespace amino { 00044 using namespace std; 00045 00046 class ThreadPoolExecutor; 00047 typedef ws_scheduler<Runnable> Scheduler; 00048 typedef Scheduler * pScheduler; 00049 00058 class ThreadPoolExecutor:public ExecutorService{ 00066 class WorkerThread:public Thread{ 00067 private: 00068 ThreadPoolExecutor * pExec; 00069 int threadId; 00070 public: 00071 WorkerThread(ThreadPoolExecutor * tp, int index){ 00072 pExec = tp; 00073 threadId = index; 00074 } 00075 00080 virtual void* run() { 00081 while(true){ 00082 Runnable * task = pExec->scheduler->getTask(threadId); 00083 //NULL means end of task queue 00084 if(task==NULL){ 00085 #ifdef DEBUG_EXEC 00086 ostringstream output; 00087 output<<"Thread Exit: "<<pExec->active_count->load()<<endl; 00088 cout<<output.str(); 00089 #endif 00090 unique_lock<mutex> llock(pExec->f_mutex_finish); 00091 int count = pExec->active_count->fetch_sub(1, memory_order_relaxed); 00092 assert(count>0); 00093 SC_FENCE; 00094 if(count==1){ 00095 #ifdef DEBUG 00096 cout<<"I'm notifying finish of running()\n"; 00097 #endif 00098 pExec->notify_finish(); 00099 } 00100 return NULL; 00101 } 00102 00103 #ifdef DEBUG_EXEC 00104 cout<<"A real task\n"; 00105 #endif 00106 task->run(); 00107 } 00108 } 00109 }; 00110 00111 friend class WorkerThread; 00112 typedef WorkerThread * pThread; 00113 00114 private: 00118 pScheduler scheduler; 00119 00120 /* Number of working threads */ 00121 int fThreadNum; 00122 00123 /* Array of threads */ 00124 WorkerThread ** threads; 00125 00126 /* Count of working threads */ 00127 atomic<int> * active_count; 00128 00129 void init(int threadNum){ 00130 fShutdown = false; 00131 00132 fThreadNum = threadNum; 00133 00134 scheduler = new Scheduler(fThreadNum); 00135 active_count = new atomic<int>(); 00136 active_count->store(threadNum, memory_order_relaxed); 00137 00138 threads = new pThread[threadNum]; 00139 for(int i=0;i<threadNum;i++){ 00140 threads[i] = new WorkerThread(this, i); 00141 threads[i]->start(); 00142 } 00143 } 00144 00145 public: 00146 ThreadPoolExecutor(){ 00147 init(getProcessNum()); 00148 } 00149 00155 ThreadPoolExecutor(int threadNum){ 00156 init(threadNum); 00157 } 00158 00164 virtual ~ThreadPoolExecutor(){ 00165 if(!finished()){ 00166 if(!fShutdown) 00167 shutdown(); 00168 waitTermination(); 00169 // There is no problem if you used waitTermination() and get this error. 00170 // Some exceptions might skipped your waitTermination() 00171 // throw logic_error("Some threads are still running"); 00172 } 00173 00174 for(int i=0;i<fThreadNum;i++){ 00175 delete threads[i]; 00176 } 00177 00178 delete [] threads; 00179 delete active_count; 00180 00181 delete scheduler; 00182 } 00183 00189 void execute(Runnable* task){ 00190 if(fShutdown) 00191 throw logic_error("Already shutdown\n!"); 00192 //TODO what will happen if shutdown becomes true just at this moments? 00193 scheduler->addTask(task); 00194 } 00195 00201 virtual void shutdown(){ 00202 fShutdown = true; 00203 scheduler->shutdown(); 00204 } 00205 00206 virtual void halt(){ 00207 fShutdown = true; 00208 } 00209 00210 virtual void waitTermination(){ 00211 for(int i=0;i<fThreadNum;i++){ 00212 threads[i]->join(); 00213 } 00214 } 00215 00216 virtual bool waitTermination(int timeOut){ 00217 bool res=ExecutorService::waitTermination(timeOut); 00218 if(res) 00219 for(int i=0;i<fThreadNum;i++){ 00220 threads[i]->join(); 00221 } 00222 #ifdef DEBUG 00223 if(!res) 00224 assert(!finished()); 00225 00226 if(res) 00227 assert(finished()); 00228 00229 stringstream ss; 00230 ss<<"Timed Wait Output: "<< res<<endl; 00231 cout<<ss.str(); 00232 #endif 00233 return res; 00234 } 00235 00236 protected: 00237 virtual bool finished(){ 00238 #ifdef DEBUG 00239 ostringstream output; 00240 output<<"Finish_Active_Count: "<<active_count->load(memory_order_relaxed)<<" in "<<this<<endl; 00241 cout<<output.str()<<flush; 00242 assert(active_count->load(memory_order_relaxed)>=0); 00243 #endif 00244 unique_lock<mutex> llock(f_mutex_finish); 00245 return active_count->load(memory_order_relaxed)==0; 00246 } 00247 }; 00248 } 00249 #endif