00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #ifndef QUEUE_SHAVIT_H_
00017 #define QUEUE_SHAVIT_H_
00018 #include "smr.h"
00019 #include <iostream>
00020
00021 namespace amino {
00022 using namespace internal;
00023
00061 template<typename T> class ShavitQueue {
00062 private:
00066 class QueueItem {
00067 public:
00071 T data;
00075 QueueItem* next;
00080 QueueItem* prev;
00081
00082 QueueItem() :
00083 data() {
00084 next = NULL;
00085 prev = NULL;
00086 }
00087
00088 QueueItem(const T& val) :
00089 data(val) {
00090 next = NULL;
00091 prev = NULL;
00092 }
00093
00094 QueueItem(QueueItem* n) :
00095 data() {
00096 next = n;
00097 prev = NULL;
00098 }
00099
00100 inline QueueItem* getNext() {
00101 return prev;
00102 }
00103 };
00104
00105
00106
00107
00108
00109 QueueItem *dummy;
00110
00114 atomic<QueueItem *> head;
00118 atomic<QueueItem *> tail;
00119
00120
00121
00122 SMR<QueueItem, 2>* mm;
00123 typedef typename SMR<QueueItem, 2>::HP_Rec HazardP;
00124
00129 static const bool hasBackoff = false;
00130
00131
00132 ShavitQueue(const ShavitQueue& q) {
00133 }
00134
00135 ShavitQueue& operator=(const ShavitQueue& q) {
00136 }
00137
00138 void fixList(QueueItem *tail, QueueItem *head) {
00139 QueueItem *curNode, *curNodeNext, *nextNodePrev;
00140 curNode = tail;
00141 while ((this->head.load(memory_order_relaxed) == head) && (curNode
00142 != head)) {
00143 curNodeNext = curNode->next;
00144 if (NULL == curNodeNext) {
00145 assert(false);
00146 }
00147 nextNodePrev = curNodeNext->prev;
00148 if (nextNodePrev != curNode) {
00149 curNodeNext->prev = curNode;
00150 }
00151 curNode = curNodeNext;
00152 }
00153 }
00154 public:
00158 ShavitQueue() {
00159 mm = getSMR<QueueItem, 2> ();
00160 dummy = new QueueItem();
00161 head.store(dummy, memory_order_relaxed);
00162 tail.store(dummy, memory_order_relaxed);
00163 }
00164
00168 ~ShavitQueue() {
00169 QueueItem *first = head.load(memory_order_relaxed);
00170 if (first == dummy) {
00171 first = first->prev;
00172 }
00173 while (tail.load(memory_order_relaxed) != dummy) {
00174 if (first == tail.load(memory_order_relaxed)) {
00175 delete first;
00176 break;
00177 }
00178 QueueItem *prev = first -> prev;
00179 delete first;
00180 first = prev;
00181 }
00182
00183 delete dummy;
00184 }
00185
00191 void enqueue(const T& x) {
00192 HazardP * hp = mm->getHPRec();
00193 #ifdef FREELIST
00194 QueueItem *node = mm->newNode(hp);
00195 node->data =x;
00196 #else
00197 QueueItem *node = new QueueItem(x);
00198 #endif
00199
00200 int wait_for_backoff = 10000;
00201 while (true) {
00202 QueueItem * t = tail.load(memory_order_relaxed);
00203
00204
00205
00206 mm->employ(hp,0, t);
00207
00208 if (tail.load(memory_order_relaxed) != t) {
00209 continue;
00210 }
00211
00212 node->next = t;
00213 if (tail.compare_swap(t, node)) {
00214 t->prev = node;
00215 break;
00216 }
00217
00218 if (hasBackoff) {
00219 usleep(wait_for_backoff);
00220 wait_for_backoff <<= 1;
00221 }
00222 }
00223 }
00224
00233 bool dequeue(T& val) {
00234 QueueItem *tl, *hd, *fstNodePrev;
00235
00236 HazardP * hp = mm->getHPRec();
00237
00238 int wait_for_backoff = 10000;
00239 while (true) {
00240 hd = head.load(memory_order_relaxed);
00241
00242
00243 mm->employ(hp, 0, hd);
00244
00245 if (head.load(memory_order_relaxed) != hd) {
00246 continue;
00247 }
00248
00249 tl = tail.load(memory_order_relaxed);
00250
00251 fstNodePrev = hd->prev;
00252
00253
00254 mm->employ(hp, 1, fstNodePrev);
00255 if (hd->prev != fstNodePrev) {
00256 continue;
00257 }
00258
00259 val = hd->data;
00260
00261 if (head.load(memory_order_relaxed) == hd) {
00262
00263
00264
00265 if (hd != dummy) {
00266 if (tail.load(memory_order_relaxed) != head.load(
00267 memory_order_relaxed)) {
00272 if (NULL == fstNodePrev) {
00273 fixList(tl, hd);
00274 continue;
00275 }
00276 } else {
00277 dummy->next = tl;
00278 dummy->prev = NULL;
00279 if (tail.compare_swap(tl, dummy)) {
00280 hd->prev = dummy;
00281 }
00282 continue;
00283 }
00284
00285 if (head.compare_swap(hd, fstNodePrev)) {
00286
00287
00288 mm->delNode(hp, hd);
00289 return true;
00290 }
00291 } else {
00292 if (tl == hd) {
00293
00294 return false;
00295 } else {
00296 if (NULL == fstNodePrev) {
00297 fixList(tl, hd);
00298 continue;
00299 }
00300
00301 head.compare_swap(hd, fstNodePrev);
00302 }
00303 }
00304 }
00305 if (hasBackoff) {
00306 usleep(wait_for_backoff);
00307 wait_for_backoff <<= 1;
00308 }
00309 }
00310 }
00311
00316 bool empty() {
00317 return head.load(memory_order_relaxed) == dummy && head.load(memory_order_relaxed) == tail.load(memory_order_relaxed);
00318 }
00319
00324 int size() {
00325 int ret = 0;
00326 QueueItem *first = head.load(memory_order_relaxed);
00327 if (first == dummy) {
00328 first = first->prev;
00329 }
00330 while (tail.load(memory_order_relaxed) != dummy) {
00331 ++ret;
00332 if (first == tail.load(memory_order_relaxed))
00333 break;
00334 first = first->prev;
00335 }
00336 return ret;
00337 }
00338
00348 bool peekFront(T& ret) {
00349 while (true) {
00350 QueueItem *front = head.load(memory_order_relaxed);
00351 ret = front->data;
00352 if (dummy != front) {
00353 return true;
00354 }
00355
00356 QueueItem *end = tail.load(memory_order_relaxed);
00357 if (front == end) {
00358 return false;
00359 } else {
00360 QueueItem *fstNodePrev = front->prev;
00361 if (NULL == fstNodePrev) {
00362 fixList(end, front);
00363 continue;
00364 }
00365 head.compare_swap(front, fstNodePrev);
00366 }
00367 return true;
00368 }
00369 }
00370 };
00371 }
00372 #endif