00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef AMINO_ACCUMULATE_H
00023 #define AMINO_ACCUMULATE_H
00024
00025 #include <numeric>
00026
00027 #include <amino/thread.h>
00028 #include <amino/ftask.h>
00029
00030 namespace amino {
00031 template<typename iteratorT, typename T, typename Executor>
00032 T accumulate(Executor exec, iteratorT first, iteratorT last) {
00033 int procCount = getProcessNum();
00034 return accumulate(exec, procCount, first, last);
00035 }
00036
00037 template<typename iteratorT, typename FuncT, typename T, typename Executor>
00038 T accumulate(Executor exec, iteratorT first, iteratorT last, FuncT func) {
00039 int procCount = getProcessNum();
00040 return accumulate(exec, procCount, first, last, func);
00041 }
00042
00043 template<typename iteratorT, typename T>
00044 class CallStdAccumulate: public Runnable {
00045 private:
00046 iteratorT f_first, f_last;
00047 T result;
00048 public:
00049 CallStdAccumulate() {
00050 }
00051 void setup(iteratorT first, iteratorT last) {
00052 f_first = first;
00053 f_last = last;
00054 }
00055
00056 T getResult() {
00057 return result;
00058 }
00059
00060 void* run() {
00061 assert(f_first + 1 <= f_last);
00062 result = std::accumulate(f_first + 1, f_last, *f_first);
00063 return NULL;
00064 }
00065 };
00066
00067 template<typename iteratorT, typename T, typename FuncT>
00068 class CallStdAccumulate_Func: public Runnable {
00069 private:
00070 iteratorT f_first, f_last;
00071 FuncT* f_func;
00072 T result;
00073 public:
00074 CallStdAccumulate_Func() {
00075 }
00076
00077 void setup(iteratorT first, iteratorT last, FuncT& func) {
00078 f_first = first;
00079 f_last = last;
00080 f_func = &func;
00081 }
00082 T getResult() {
00083 return result;
00084 }
00085
00086 void* run() {
00087 result = std::accumulate(f_first + 1, f_last, *f_first, *f_func);
00088 return NULL;
00089 }
00090 };
00091
00092 template<typename iteratorT, typename T, typename Executor>
00093 T accumulate(Executor& exec, int threadNum, iteratorT first, iteratorT last) {
00094 const int MULTI_NUM = 3;
00095
00096 if (last - first < MULTI_NUM * threadNum)
00097 return std::accumulate(first + 1, last, *first);
00098
00099 int step = (last - first) / threadNum;
00100
00101 typedef CallStdAccumulate<iteratorT, T> Caller;
00102 Caller* pCallers = new Caller[threadNum];
00103 FutureTask **pFTasks = new FutureTask*[threadNum];
00104 T* result = new T[threadNum];
00105
00106 for (int i = 0; i < threadNum - 1; ++i) {
00107 pCallers[i].setup(first + i * step, first + (i + 1) * step);
00108 pFTasks[i] = new FutureTask(pCallers + i);
00109 exec.execute(pFTasks[i]);
00110 }
00111
00112 pCallers[threadNum - 1].setup(first + (threadNum - 1) * step, last);
00113 pFTasks[threadNum - 1] = new FutureTask(pCallers + threadNum - 1);
00114 exec.execute(pFTasks[threadNum - 1]);
00115
00116 for (int i = 0; i < threadNum; ++i) {
00117 pFTasks[i] -> get();
00118 result[i] = pCallers[i].getResult();
00119 }
00120 T ret = std::accumulate(result + 1, result + threadNum, *result);
00121
00122 delete[] pCallers;
00123 for (int i = 0; i < threadNum; ++i) {
00124 delete pFTasks[i];
00125 }
00126 delete[] pFTasks;
00127 delete[] result;
00128
00129 return ret;
00130 }
00131
00132 template<typename iteratorT, typename T, typename FuncT, typename Executor>
00133 T accumulate(Executor& exec, int threadNum, iteratorT first, iteratorT last,
00134 FuncT func) {
00135 const int MULTI_NUM = 3;
00136
00137 if (last - first < MULTI_NUM * threadNum)
00138 return std::accumulate(first + 1, last, *first);
00139
00140 int step = (last - first) / threadNum;
00141
00142 typedef CallStdAccumulate_Func<iteratorT, T, FuncT> Caller;
00143 Caller* pCallers = new Caller[threadNum];
00144 FutureTask **pFTasks = new FutureTask*[threadNum];
00145 T* result = new T[threadNum];
00146
00147 for (int i = 0; i < threadNum - 1; ++i) {
00148 pCallers[i].setup(first + i * step, first + (i + 1) * step, func);
00149 pFTasks[i] = new FutureTask(pCallers + i);
00150 exec.execute(pFTasks[i]);
00151 }
00152
00153 pCallers[threadNum - 1].setup(first + (threadNum - 1) * step, last, func);
00154 pFTasks[threadNum - 1] = new FutureTask(pCallers + threadNum - 1);
00155 exec.execute(pFTasks[threadNum - 1]);
00156
00157 for (int i = 0; i < threadNum; ++i) {
00158 pFTasks[i] -> get();
00159 result[i] = pCallers[i].getResult();
00160 }
00161 T ret = std::accumulate(result + 1, result + threadNum, *result, func);
00162
00163 delete[] pCallers;
00164 for (int i = 0; i < threadNum; ++i) {
00165 delete pFTasks[i];
00166 }
00167 delete[] pFTasks;
00168 delete[] result;
00169
00170 return ret;
00171 }
00172
00173 }
00174 ;
00175
00176 #endif