From 1a00f09ac03f7b61edbd88454a0695c9364b705b Mon Sep 17 00:00:00 2001 From: ponchio Date: Tue, 29 Nov 2011 12:41:28 +0000 Subject: [PATCH] fixed flushing problems. --- wrap/gcache/cache.h | 39 ++++++++++++++++---- wrap/gcache/controller.h | 78 ++++++++++++++++++++++++++-------------- wrap/gcache/door.h | 76 ++++++++++++++++++++++++++++++++------- 3 files changed, 146 insertions(+), 47 deletions(-) diff --git a/wrap/gcache/cache.h b/wrap/gcache/cache.h index 1bbad0fb..b2bb2f78 100644 --- a/wrap/gcache/cache.h +++ b/wrap/gcache/cache.h @@ -19,6 +19,8 @@ using namespace std; /** Cache virtual base class. You are required to implement the pure virtual functions get, drop and size. */ +template class Transfer; + template class Cache: public Provider { @@ -29,10 +31,15 @@ public: bool quit; ///keeps track of changes (if 1 then something was loaded or dropped QAtomicInt changed; + ///callback for changed + void (*callback)(void *data); ///data is fetched from here Provider *input; + ///threads running over cache... + std::vector *> transfers; + protected: ///max space available quint64 s_max; @@ -58,15 +65,16 @@ public: ///empty the cache. Make sure no resource is locked before calling this. /// Require pause or stop before. Ensure there no locked item void flush() { - std::vector tokens; + //std::vector tokens; { for(int i = 0; i < this->heap.size(); i++) { Token *token = &(this->heap[i]); - tokens.push_back(token); + //tokens.push_back(token); s_curr -= drop(token); assert(!(token->count >= Token::LOCKED)); if(final) token->count.testAndSetOrdered(Token::READY, Token::CACHE); + input->heap.push(token); } this->heap.clear(); } @@ -76,11 +84,11 @@ public: } //assert(s_curr == 0); - { +/* { for(unsigned int i = 0; i < tokens.size(); i++) { input->heap.push(tokens[i]); } - } + }*/ } ///empty the cache. Make sure no resource is locked before calling this. @@ -112,6 +120,7 @@ public: } } + virtual void abort() {} protected: ///return the space used in the cache by the loaded resource virtual int size(Token *token) = 0; @@ -119,6 +128,8 @@ protected: virtual int get(Token *token) = 0; ///return amount removed virtual int drop(Token *token) = 0; + ///make sure the get function do not access token after abort is returned. + @@ -136,16 +147,15 @@ protected: 2) make room until eliminating an element would leave space. */ begin(); while(!this->quit) { - input->check_queue.enter(true); //wait for cache below to load something or priorities to change - + input->check_queue.enter(); //wait for cache below to load something or priorities to change if(this->quit) break; if(unload() || load()) { changed.testAndSetOrdered(0, 1); //if not changed, set as changed input->check_queue.open(); //we signal ourselves to check again } + input->check_queue.leave(); } - flush(); this->quit = false; //in case someone wants to restart; end(); } @@ -255,4 +265,19 @@ protected: } }; + +template +class Transfer: public QThread { + public: + Transfer(Cache *_cache): cache(_cache) {} + private: + Cache *cache; + + void run() { + cache->loop(); + //end(); + } +}; + + #endif // GCACHE_H diff --git a/wrap/gcache/controller.h b/wrap/gcache/controller.h index 3f8e59e3..0d3d2311 100644 --- a/wrap/gcache/controller.h +++ b/wrap/gcache/controller.h @@ -25,7 +25,7 @@ class Controller { std::vector *> caches; Controller(): paused(false), stopped(true) {} - ~Controller() { finish(); } + ~Controller() { if(!stopped) finish(); } ///called before the cache is started to add a cache in the chain /** The order in which the caches are added is from the lowest to the highest. */ @@ -81,7 +81,7 @@ class Controller { ///start the various cache threads. void start() { - if(!stopped) return; + assert(stopped); assert(!paused); assert(caches.size() > 1); caches.back()->final = true; @@ -89,11 +89,25 @@ class Controller { caches[i]->start(); stopped = false; } - ///stops the ache threads + + ///stops the cache threads void stop() { - if(stopped) return; - if(paused) resume(); - //stop threads + assert(!paused); + assert(!stopped); + + //signal al caches to quit + for(int i = 0; i < caches.size(); i++) + caches[i]->quit = true; + //abort current gets + for(int i = 0; i < caches.size(); i++) + caches[i]->abort(); + //make sure all caches actually run a cycle. + for(unsigned int i = 0; i < caches.size(); i++) + caches[i]->input->check_queue.open(); + + for(int i = 0; i < caches.size(); i++) + caches[i]->wait(); +/* //stop threads for(int i = caches.size()-1; i >= 0; i--) { caches[i]->quit = true; //hmmmmmmmmmmmmmm not very clean. if(i == 0) @@ -101,48 +115,57 @@ class Controller { else caches[i-1]->check_queue.open(); //cache i listens on queue i-1 caches[i]->wait(); - } + } */ stopped = true; } void finish() { - flush(); stop(); + flush(); } void pause() { - if(paused) return; - provider.check_queue.lock(); - for(unsigned int i = 0; i < caches.size()-1; i++) { - caches[i]->check_queue.lock(); - } -/* provider.heap_lock.lock(); - for(unsigned int i = 0; i < caches.size(); i++) - caches[i]->heap_lock.lock(); */ + assert(!stopped); + assert(!paused); + + //lock all doors. + for(unsigned int i = 1; i < caches.size(); i++) + caches[i]->input->check_queue.lock(); + + //abort all pending calls + for(unsigned int i = 1; i < caches.size(); i++) + caches[i]->abort(); + + //make sure no cache is running (must be done after abort! otherwise we have to wait for the get) + for(unsigned int i = 0; i < caches.size()-1; i++) + caches[i]->input->check_queue.room.lock(); + paused = true; } void resume() { - if(!paused) return; - provider.check_queue.unlock(); - for(unsigned int i = 0; i < caches.size()-1; i++) - caches[i]->check_queue.unlock(); + assert(!stopped); + assert(paused); + + //unlock and open all doors + for(unsigned int i = 1; i < caches.size(); i++) { + caches[i]->input->check_queue.unlock(); + caches[i]->input->check_queue.open(); + } + + //allow all cache to enter again. + for(unsigned int i = 0; i < caches.size()-1; i++) + caches[i]->input->check_queue.room.unlock(); -/* provider.heap_lock.unlock(); - for(unsigned int i = 0; i < caches.size(); i++) - caches[i]->heap_lock.unlock(); */ paused = false; } ///empty all caches AND REMOVES ALL TOKENS! void flush() { - bool running = !stopped; - stop(); for(int i = (int)caches.size()-1; i >= 0; i--) caches[i]->flush(); provider.heap.clear(); - if(running) - start(); } + bool isChanged() { bool c = false; for(int i = (int)caches.size() -1; i >= 0; i--) { @@ -150,6 +173,7 @@ class Controller { } return c; } + bool isWaiting() { bool waiting = true; for(int i = (int)caches.size() -1; i >= 0; i--) { diff --git a/wrap/gcache/door.h b/wrap/gcache/door.h index cdc24735..ce19a3c8 100644 --- a/wrap/gcache/door.h +++ b/wrap/gcache/door.h @@ -26,21 +26,62 @@ #define CACHE_DOOR_H #include +#include +#include +#include +#include -//#define USE_SEMAPHORES -#ifdef USE_SEMAPHORES + +#define METHOD_2 + +#ifdef METHOD_1 + +class QDoor { + private: + QSemaphore door; + QMutex room; //lock when entering. unlock when exiting + QAtomicInt key; //keep tracks of door status + + public: + QDoor():key(0) {} + void open() { + if(key.testAndSetOrdered(0, 1)) + door.release(1); + } + + void enter() { + door.acquire(1); //here I am sure that key is 1 + //if here a open appends will have no effect. + key.testAndSetOrdered(1, 0); + room.lock(); + } + void leave() { + room.unlock(); + } + void lock() { + int r = key.fetchAndStoreOrdered(-1); + if(r == 1) //if the door was open + door.tryAcquire(1); //might file if whe are between enter acquire and key = 0. + } + void unlock() { + key = 0; + } +}; +#endif + +#ifdef METHOD_2 //a door needs to be open for the thread to continue, //if it is open the thread enter and closes the door //this mess is to avoid [if(!open.available()) open.release(1)] -#include - class QDoor { private: QSemaphore _open; QSemaphore _close; + public: + QMutex room; QDoor(): _open(0), _close(1) {} //this means closed void open() { @@ -48,7 +89,7 @@ class QDoor { _open.release(1); //open } void close() { - if(_open.tryAcquire(1)) //check not already cloed + if(_open.tryAcquire(1)) //check not already closed _close.release(1); } void enter(bool close = false) { @@ -57,8 +98,9 @@ class QDoor { _close.release(1); //close door behind else _open.release(1); //leave door opened + room.lock(); } - bool isOpen() { return _open.available() == 1; } + void leave() { room.unlock(); } void lock() { //door might be open or closed, but we might happen just in the middle @@ -68,18 +110,24 @@ class QDoor { } void unlock(bool open = false) { if(open) - _open.release(1) + _open.release(1); else _close.release(1); } + bool isWaiting() { + if(_open.tryAcquire(1)) { + _close.release(1); + return false; + } + return true; + } }; -#else +#endif -#include -#include +#ifdef METHOD_3 /** A wait condition class that works as a door. Should check if the semaphore version is faster. @@ -99,8 +147,8 @@ class QDoor { } ///attempt to enter the door. if the door is closed the thread will wait until the door is opened. - /** if close is true, the door will be closed after the thread is awakened, this allows to - have only one thread entering the door each time open() is called */ + /// if close is true, the door will be closed after the thread is awakened, this allows to + /// have only one thread entering the door each time open() is called void enter(bool close = false) { m.lock(); waiting = true; @@ -112,6 +160,7 @@ class QDoor { waiting = false; m.unlock(); } + void leave() {} bool isWaiting() { m.lock(); bool w = waiting; @@ -132,6 +181,7 @@ class QDoor { bool waiting; }; -#endif //ifdef USE_SEMAPHORES +#endif + #endif //CACHE_DOOR_H