00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef LOCKFREE_PRIORITYQUEUE_H_
00024 #define LOCKFREE_PRIORITYQUEUE_H_
00025
00026 #include <amino/smr.h>
00027
00028 #include <vector>
00029 #include <cstdlib>
00030 #include <ctime>
00031 #include <iostream>
00032
00033 namespace amino {
00034 using namespace internal;
00035
00036 #define COPY_NODE(node) (node)
00037 #define READ_NODE(node) (IS_MARKED(node) ? NULL : (node))
00038 #define RELEASE_NODE(node) (node)
00039 #define GET_UNMARKED(p) ((PQNode<E>*)(((long)(p))&(~3)))
00040 #define GET_UNMARKED_VALUE(p) ((Value<E>*)(((long)(p))&(~3)))
00041
00042 #define GET_MARKED(p) ((PQNode<E>*)(((long)(p))|(1)))
00043
00044 #define GET_MARKED_VALUE(p) ((Value<E>*)(((long)(p))|(1)))
00045
00046 #define IS_MARKED(p) (((long)(p))&(1))
00047
00048 #define MAXLEVEL 10
00049 #define SLCONST 0.5
00050
00051 template<typename E> struct Value {
00052 E v;
00053
00054 Value() {
00055 }
00056
00057 Value(const E& value) :
00058 v(value) {
00059 }
00060 };
00061
00062 template<typename E> struct PQNode {
00063 int level;
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 int validLevel;
00079 atomic<Value<E>*> value;
00080 PQNode<E>* prev;
00081 atomic<PQNode<E>*> next[MAXLEVEL];
00082 E key;
00083
00084 PQNode() :
00085 level(0), validLevel(-1), prev(NULL),key() {
00086 value.store(NULL);
00087 for (int i = 0; i < MAXLEVEL; ++i) {
00088 next[i].store(NULL, memory_order_relaxed);
00089 }
00090 }
00091
00092 PQNode(const E& v) :
00093 prev(NULL), validLevel(-1), level(0), key(v) {
00094 value.store(new Value<E> (v), memory_order_relaxed);
00095 for (int i = 0; i < MAXLEVEL; ++i) {
00096 next[i].store(NULL, memory_order_relaxed);
00097 }
00098 }
00099
00100 PQNode(int l, const E& v) :
00101 level(l), validLevel(-1), prev(NULL), key(v) {
00102 value.store(new Value<E> (v), memory_order_relaxed);
00103 for (int i = 0; i < MAXLEVEL; ++i) {
00104 next[i].store(NULL, memory_order_relaxed);
00105 }
00106 }
00107
00108 ~PQNode() {
00109 delete GET_UNMARKED_VALUE(value.load(memory_order_relaxed));
00110 }
00111 };
00112
00113 template<typename E> class LockFreePriorityQueue {
00114
00115 PQNode<E>* head;
00116
00117 PQNode<E>* tail;
00118
00119 PQNode<E>* INVALID;
00120
00121 SMR<PQNode<E>, MAXLEVEL>* mm;
00122 typedef typename SMR<PQNode<E>, MAXLEVEL>::HP_Rec HazardP;
00123
00124 public:
00125 LockFreePriorityQueue() {
00126 mm = getSMR<PQNode<E>, MAXLEVEL> ();
00127 head = new PQNode<E> ();
00128 head->validLevel = MAXLEVEL - 1;
00129 tail = new PQNode<E> ();
00130 tail->validLevel = MAXLEVEL - 1;
00131 INVALID = new PQNode<E> ();
00132
00133 for (int i = 0; i < MAXLEVEL; ++i) {
00134 head->next[i].store(tail, memory_order_relaxed);
00135 }
00136
00137 srand(time(NULL));
00138 }
00139
00140 ~LockFreePriorityQueue() {
00141 PQNode<E> *curr = head->next[0].load(memory_order_relaxed);
00142 PQNode<E> *tmp = NULL;
00143 while (tail != curr) {
00144 cout << "destructing node .........." << endl;
00145 tmp = curr;
00146 curr = curr->next[0].load(memory_order_relaxed);
00147 delete tmp;
00148 }
00149
00150 delete head;
00151 delete tail;
00152 delete INVALID;
00153
00154 }
00155
00156 bool empty() {
00157 return head->next[0].load(memory_order_relaxed) == tail;
00158 }
00159
00160 int size() {
00161 int size = 0;
00162 for (PQNode<E>* itr = head->next[0].load(memory_order_relaxed); itr
00163 != tail; itr = itr->next[0].load(memory_order_relaxed)) {
00164 ++size;
00165 }
00166 return size;
00167 }
00168
00169 bool peek(E& top) {
00170 top = head->next[0].load(memory_order_relaxed)->value.load(
00171 memory_order_relaxed)->v;
00172 if (top == tail) {
00173 return false;
00174 } else {
00175 return true;
00176 }
00177 }
00178
00179 bool enqueue(const E& value) {
00180 return insert(value);
00181 }
00182
00183 bool dequeue(E& result) {
00184 return deleteMin(result);
00185 }
00186
00187 bool insert(const E& value) {
00188
00189 E key = value;
00190 int curLevel = randomLevel();
00191 PQNode<E>* savedNodes[curLevel];
00192 HazardP * hp = mm->getHPRec();
00193 #ifdef FREELIST
00194 PQNode<E>* newNode = mm->newNode(hp);
00195 newNode->level = curLevel;
00196 newNode->key = value;
00197 newNode->value.store(new Value<E> (value), memory_order_relaxed);
00198 #else
00199 PQNode<E>* newNode = new PQNode<E> (curLevel, value);
00200 #endif
00201
00202
00203
00204 try_again:
00205 PQNode<E>* node1 = COPY_NODE(head);
00206 mm->employ(hp,0,head);
00207 if(node1 != head) {
00208 goto try_again;
00209 }
00210
00211 PQNode<E>* node2;
00212 Value<E>* value2;
00213
00214
00215
00216
00217
00218
00219
00220
00221 for (int i = MAXLEVEL - 1; i >= 1; --i) {
00222 node2 = scanKey(node1, i, key);
00223
00224 mm->retire(hp,node2);
00225 if (i < curLevel) {
00226 savedNodes[i] = COPY_NODE(node1);
00227
00228 mm->employ(hp,i,node1);
00229 if(node1 != savedNodes[i]) {
00230 goto try_again;
00231 }
00232 }
00233 }
00234
00235 while (true) {
00236 node2 = scanKey(node1, 0, key);
00237 value2 = node2->value.load(memory_order_relaxed);
00238
00239
00240
00241
00242
00243 if ((node2 != tail) && (!IS_MARKED(value2) && (node2->key == key))) {
00244 if (node2->value.compare_swap(value2, new Value<E> (value))) {
00245 cout << "found same value: " << value2->v << ":" << value
00246 << endl;
00247
00248
00249
00250 mm->retire(hp,node1);
00251 mm->retire(hp,node2);
00252 for (int i = 1; i < curLevel; ++i) {
00253
00254 mm->retire(hp,savedNodes[i]);
00255 }
00256
00257
00258 delete value2;
00259 mm->delNode(hp,newNode);
00260 cout << "found same value" << endl;
00261 return true;
00262 } else {
00263
00264 mm->retire(hp,node2);
00265 continue;
00266 }
00267 }
00268
00269
00270
00271
00272
00273
00274 newNode->next[0].store(node2, memory_order_relaxed);
00275
00276 mm->retire(hp,node2);
00277 if (node1->next[0].compare_swap(node2, newNode)) {
00278
00279
00280 mm->retire(hp,node1);
00281 break;
00282 }
00283
00284 }
00285 for (int i = 1; i < curLevel; ++i) {
00286 newNode->validLevel = i;
00287 node1 = savedNodes[i];
00288 while (true) {
00289 node2 = scanKey(node1, i, key);
00290 newNode->next[i].store(node2);
00291
00292 mm->retire(hp,node2);
00293
00294
00295
00296
00297
00298 if (IS_MARKED(newNode->value.load(memory_order_relaxed))
00299 || node1->next[i].compare_swap(node2, newNode)) {
00300
00301 mm->retire(hp,node1);
00302 break;
00303 }
00304
00305 }
00306 }
00307
00308 newNode->validLevel = curLevel;
00309 if (IS_MARKED(newNode->value.load(memory_order_relaxed))) {
00310 newNode = helpDelete(newNode, 0);
00311 }
00312
00313
00314 return true;
00315 }
00316
00317 bool deleteMin(E& result) {
00318 PQNode<E>* node1;
00319 PQNode<E>* node2;
00320 PQNode<E>* last;
00321 Value<E>* value;
00322
00323 HazardP * hp = mm->getHPRec();
00324
00325
00326
00327
00328 try_again:
00329 PQNode<E>* prev = COPY_NODE(head);
00330 mm->employ(hp,0,head);
00331 if(prev != head) {
00332 goto try_again;
00333 }
00334
00335 while (true) {
00336 node1 = readNext(prev, 0);
00337 if (node1 == tail) {
00338
00339
00340 mm->retire(hp,prev);
00341 mm->retire(hp,node1);
00342 return false;
00343 }
00344 retry: value = node1->value.load(memory_order_relaxed);
00345 result = value->v;
00346 if (!IS_MARKED(value)) {
00347
00348
00349
00350
00351
00352
00353
00354
00355 if (node1->value.compare_swap(value, GET_MARKED_VALUE(value))) {
00356 node1->prev = prev;
00357 break;
00358 } else {
00359 goto retry;
00360 }
00361 } else if (IS_MARKED(value)) {
00362 node1 = helpDelete(node1, 0);
00363 }
00364
00365 mm->retire(hp,prev);
00366 prev = node1;
00367 }
00368
00369
00370
00371
00372
00373 assert(node1 != tail && node1 != head);
00374 for (int i = 0; i < node1->level; ++i) {
00375 do {
00376 node2 = node1->next[i].load(memory_order_relaxed);
00377 } while (!IS_MARKED(node2) && !node1->next[i].compare_swap(node2, GET_MARKED(node2)));
00378 }
00379 prev = COPY_NODE(head);
00380 mm->employ(hp,0,head);
00381 if(prev != head) {
00382 goto try_again;
00383 }
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395 for (int i = node1->level - 1; i >= 0; --i) {
00396 while (true) {
00397 assert(node1 != tail && node1 != head);
00398 if (GET_UNMARKED(node1->next[i].load(memory_order_relaxed)) == INVALID) {
00399 break;
00400 }
00401 last = scanKey(prev, i, node1->key);
00402
00403 mm->retire(hp,last);
00404 if (last != node1 || GET_UNMARKED(node1->next[i].load(memory_order_relaxed)) == INVALID) {
00405 break;
00406 }
00407 if (prev->next[i].compare_swap(node1, GET_UNMARKED(
00408 node1->next[i].load(memory_order_relaxed)))) {
00409 node1->next[i].store(INVALID, memory_order_relaxed);
00410 break;
00411 }
00412 if (GET_UNMARKED(node1->next[i].load(memory_order_relaxed)) == INVALID) {
00413 break;
00414 }
00415
00416 }
00417 }
00418
00419 mm->retire(hp,prev);
00420
00421
00422
00423 mm->delNode(node1);
00424 return true;
00425 }
00426
00427 void dumpQueue() {
00428 cout << "dumping queue...................." << endl;
00429 cout << "head:" << head << "->";
00430 for (PQNode<E>* itr = head->next[0].load(memory_order_relaxed); itr
00431 != tail; itr = itr->next[0].load(memory_order_relaxed)) {
00432 cout << GET_UNMARKED(itr) << "->";
00433 }
00434 cout << tail << ":tail" << endl;
00435 }
00436
00437 private:
00445 PQNode<E>* readNext(PQNode<E>*& node, int curLevel) {
00446
00447 PQNode<E>* nextNode;
00448 PQNode<E>* tmpNode;
00449
00450 assert(node != tail);
00451 if (IS_MARKED(node->value.load(memory_order_relaxed))) {
00452 node = helpDelete(node, curLevel);
00453 }
00454 tmpNode = node->next[curLevel].load(memory_order_relaxed);
00455 nextNode = READ_NODE(tmpNode);
00456 while (NULL == nextNode) {
00457 node = helpDelete(node, curLevel);
00458 tmpNode = node->next[curLevel].load(memory_order_relaxed);
00459 nextNode = READ_NODE(tmpNode);
00460 }
00461 assert(!IS_MARKED(nextNode));
00462 return nextNode;
00463 }
00464
00469 PQNode<E>* scanKey(PQNode<E>*& node, int curLevel, const E& expectedKey) {
00470 HazardP * hp = mm->getHPRec();
00471
00472 PQNode<E>* nextNode = readNext(node, curLevel);
00473
00474 while ((nextNode != tail) && ((nextNode == head) || nextNode->key
00475 < expectedKey)) {
00476
00477 mm->retire(hp,node);
00478
00479 node = nextNode;
00480 nextNode = readNext(node, curLevel);
00481 }
00482
00483 return nextNode;
00484 }
00485
00486
00487
00488
00489 PQNode<E>* helpDelete(PQNode<E>* node, int curLevel) {
00490
00491
00492 HazardP * hp = mm->getHPRec();
00493
00494 PQNode<E>* node2;
00495 PQNode<E>* last;
00496
00497
00498
00499
00500 for (int i = curLevel; i < node->level; ++i) {
00501 do {
00502 node2 = node->next[i].load(memory_order_relaxed);
00503 } while (!IS_MARKED(node2) && !node->next[i].compare_swap(node2, GET_MARKED(
00504 node2)));
00505 }
00506
00507
00508
00509
00510 PQNode<E>* prev = node->prev;
00511 if (NULL == prev || (curLevel >= prev->validLevel)) {
00512
00513
00514
00515 try_again:
00516 prev = COPY_NODE(head);
00517 mm->employ(hp,0,head);
00518 if(prev != head) {
00519 goto try_again;
00520 }
00521
00522 for (int i = MAXLEVEL - 1; i >= curLevel; --i) {
00523 node2 = scanKey(prev, i, node->key);
00524
00525 mm->retire(hp, node2);
00526 }
00527 } else {
00528
00529 }
00530
00531
00532
00533
00534
00535
00536 while (true) {
00537 assert(node != tail && node != head);
00538 if (GET_UNMARKED(node->next[curLevel].load(memory_order_relaxed)) == INVALID) {
00539 break;
00540 }
00541 last = scanKey(prev, curLevel, node->key);
00542
00543 mm->retire(hp,last);
00544 if (last != node || GET_UNMARKED(node->next[curLevel].load(memory_order_relaxed)) == INVALID) {
00545 break;
00546 }
00547 if (prev->next[curLevel].compare_swap(node, GET_UNMARKED(
00548 node->next[curLevel].load(memory_order_relaxed)))) {
00549 node->next[curLevel].store(INVALID);
00550 break;
00551 }
00552 if (GET_UNMARKED(node->next[curLevel].load(memory_order_relaxed)) == INVALID) {
00553 break;
00554 }
00555
00556 }
00557
00558 mm->retire(hp,node);
00559 return prev;
00560 }
00561
00562 int randomLevel() {
00563 int v = 1;
00564 while (((static_cast<double> (rand()) / RAND_MAX) < SLCONST) && (v
00565 < MAXLEVEL - 1)) {
00566 ++v;
00567 }
00568
00569 return v;
00570 }
00571
00572 };
00573
00574 }
00575
00576 #endif