00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef PARALLEL_SORT_H
00023 #define PARALLEL_SORT_H
00024
00025 #include <algorithm>
00026 #include <stdexcept>
00027 #include <assert.h>
00028
00029 #include <amino/aasort.h>
00030 #include <amino/executor.h>
00031 #include <amino/future.h>
00032 #include <amino/ftask.h>
00033
00034 #ifdef DEBUG
00035 #include <sstream>
00036 using namespace std;
00037 #endif
00038
00039 namespace amino{
00040
00048 template<typename _sort, typename rand_access_iter>
00049 class SortTask:public Runnable{
00050 private:
00051 _sort fsorter;
00052 rand_access_iter fstart;
00053 rand_access_iter fend;
00054 public:
00055 SortTask(rand_access_iter start, rand_access_iter end, _sort sorter){
00056 fstart=start;
00057 fend=end;
00058 fsorter=sorter;
00059 }
00060
00061 void * run(){
00062 fsorter(fstart, fend);
00063 return NULL;
00064 }
00065 };
00066
00074 template<typename _merger, typename rand_access_iter>
00075 class MergerTask:public Runnable{
00076 private:
00077 _merger fmerger;
00078 rand_access_iter fstart;
00079 rand_access_iter fmiddle;
00080 rand_access_iter fend;
00081 public:
00082 MergerTask(rand_access_iter start, rand_access_iter middle,
00083 rand_access_iter end, _merger merger){
00084 fstart=start;
00085 fend=end;
00086 fmiddle = middle;
00087 fmerger=merger;
00088 }
00089
00090 void * run(){
00091 #ifdef DEBUG
00092 stringstream ss;
00093 ss<<"Merging: "<<fstart<<","<<fmiddle<<","<<fend<<endl;
00094 cout<<ss.str();
00095 #endif
00096 fmerger(fstart, fmiddle, fend);
00097 #ifdef DEBUG
00098 cout<<"Merging Completed!\n";
00099 #endif
00100 return NULL;
00101 }
00102 };
00103
00108 template<typename value_type, typename local_sort, typename merge_sort,
00109 typename rand_access_iter, typename ExecutorType>
00110 void parallel_sort(rand_access_iter start, rand_access_iter end,
00111 local_sort _sort, merge_sort _merge,
00112 int threadNum, ExecutorType * executor){
00113
00114
00115 int length=end-start;
00116 int divNum = 64*threadNum;
00117 int step = length/divNum;
00118 if(step<32) {
00119 step=32;
00120 if(32>length)
00121 step = length;
00122 divNum = length/step;
00123 }
00124 int first_step= step+(length- step*divNum);
00125
00126 typedef SortTask<local_sort, rand_access_iter> * pSortTask;
00127 typedef FutureTask * pFutureTask;
00128 pSortTask * sortTasks= new pSortTask[divNum];
00129 FutureTask ** futures = new pFutureTask[divNum];
00130 sortTasks[0] = new SortTask<local_sort, rand_access_iter>(start,
00131 start+first_step, _sort);
00132 futures[0] = new FutureTask(sortTasks[0]);
00133 executor->execute(futures[0]);
00134
00135 vector<int> steps;
00136 steps.push_back(0);
00137
00138 int pos=first_step;
00139 steps.push_back(first_step);
00140 for(int i=1;i<divNum;i++){
00141 #ifdef DEBUG
00142 ostringstream output;
00143 output<<"PSort POS: "<<pos<<endl;
00144 cout<<output.str();
00145 #endif
00146 sortTasks[i] = new SortTask<local_sort, rand_access_iter>(start+pos,
00147 start+pos+step, _sort);
00148 futures[i] = new FutureTask(sortTasks[i]);
00149 executor->execute(futures[i]);
00150 pos+=step;
00151 steps.push_back(pos);
00152 }
00153
00154 for(int i=0;i<divNum;i++){
00155 futures[i]->get();
00156 delete futures[i];
00157 delete sortTasks[i];
00158 }
00159
00160 delete [] futures;
00161 delete [] sortTasks;
00162
00163
00164 typedef MergerTask<merge_sort, rand_access_iter> * pMergeTask;
00165 pMergeTask * mergeTasks = new pMergeTask[divNum/2];
00166 futures = new pFutureTask[divNum/2];
00167 while(steps.size()>2){
00168 unsigned int pos = 0;
00169 while(pos<steps.size()-2){
00170 int start_i = steps[pos];
00171 int middle_i = steps[pos+1];
00172 int end_i = steps[pos+2];
00173 int i=pos/2;
00174 mergeTasks[i] = new MergerTask<merge_sort, rand_access_iter>(
00175 start+start_i, start+middle_i, start+end_i,_merge);
00176 #ifdef DEBUG
00177 stringstream ss;
00178 ss<<"Executing MergerTask "<<i<<endl;
00179 cout<<ss.str();
00180 #endif
00181 futures[i] = new FutureTask(mergeTasks[i]);
00182 executor->execute(futures[i]);
00183
00184 pos +=2;
00185 }
00186
00187 pos = 0;
00188 while(pos<steps.size()-2){
00189 int i=pos/2;
00190 futures[i]->get();
00191 delete futures[i];
00192 delete mergeTasks[i];
00193 pos += 2;
00194 }
00195
00196
00197 pos = 0;
00198 for(vector<int>::iterator i=steps.begin();i<steps.end();){
00199 if(pos%2==0||(i+1)==steps.end())
00200 i++;
00201 else
00202 i= steps.erase(i);
00203 pos++;
00204 }
00205 }
00206
00207 delete [] futures;
00208 delete [] mergeTasks;
00209
00210 }
00211
00212 template<typename ExecutorType, typename value_type>
00213 void parallel_sort(value_type * start, value_type * end,
00214 int threadNum, ExecutorType * executor){
00215 void (* pSort)(value_type *, value_type *) = & std::sort<value_type*>;
00216 void (* pMerge)(value_type *, value_type *, value_type *) = & std::inplace_merge<value_type*>;
00217 parallel_sort<value_type>(start, end, pSort, pMerge, threadNum, executor);
00218 }
00219
00223
00224
00225
00226
00227
00228
00229
00230 }
00231
00232 #endif