diff options
Diffstat (limited to 'cpp/src/IceGrid/SessionManager.h')
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 166 |
1 files changed, 91 insertions, 75 deletions
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index 44261f603a0..243fe8b7957 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -5,22 +5,18 @@ #ifndef ICE_GRID_SESSION_MANAGER_H #define ICE_GRID_SESSION_MANAGER_H -#include <IceUtil/Handle.h> -#include <IceUtil/Mutex.h> -#include <IceUtil/Monitor.h> -#include <IceUtil/Thread.h> - #include <Ice/Logger.h> #include <Ice/LoggerUtil.h> #include <IceGrid/Registry.h> #include <IceGrid/Internal.h> +#include <IceGrid/Util.h> namespace IceGrid { template<class TPrx> -class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> +class SessionKeepAliveThread { #if defined(__clang__) # pragma clang diagnostic push @@ -47,21 +43,28 @@ class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<I public: - SessionKeepAliveThread(const InternalRegistryPrx& registry, const Ice::LoggerPtr& logger) : - IceUtil::Thread("IceGrid session keepalive thread"), + SessionKeepAliveThread(const std::shared_ptr<InternalRegistryPrx>& registry, + const std::shared_ptr<Ice::Logger>& logger) : _registry(registry), _logger(logger), _state(InProgress), - _nextAction(None) + _nextAction(None), + _thread([this] { run(); }) + { + } + + virtual ~SessionKeepAliveThread() { + assert(_state == Destroyed); } - virtual void + void run() { - TPrx session; - InternalRegistryPrx registry; - IceUtil::Time timeout = IceUtil::Time::seconds(10); + using namespace std::chrono_literals; + std::shared_ptr<TPrx> session; + std::shared_ptr<InternalRegistryPrx> registry; + std::chrono::seconds timeout = 10s; Action action = Connect; try @@ -69,7 +72,7 @@ public: while(true) { { - Lock sync(*this); + std::unique_lock<std::mutex> lock(_mutex); if(_state == Destroyed) { break; @@ -98,7 +101,7 @@ public: { _nextAction = Connect; } - notifyAll(); + _condVar.notify_all(); // // Wait if there's nothing to do and if we are @@ -108,12 +111,12 @@ public: { if(_state == Connected || action == Connect || action == KeepAlive) { - IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic); - IceUtil::Time wakeTime = now + timeout; + auto now = std::chrono::steady_clock::now(); + auto wakeTime = now + timeout; while(_state != Destroyed && _nextAction == None && wakeTime > now) { - timedWait(wakeTime - now); - now = IceUtil::Time::now(IceUtil::Time::Monotonic); + _condVar.wait_for(lock, wakeTime - now); + now = std::chrono::steady_clock::now(); } } if(_nextAction == None) @@ -130,23 +133,27 @@ public: assert(_nextAction != None); action = _nextAction; - registry = InternalRegistryPrx::uncheckedCast( - _registry->ice_timeout(static_cast<int>(timeout.toMilliSeconds()))); + assert(timeout != 0s); + using namespace std::chrono; + + registry = Ice::uncheckedCast<InternalRegistryPrx>(_registry->ice_timeout(secondsToInt(timeout))); _nextAction = None; _state = InProgress; - notifyAll(); + _condVar.notify_all(); } switch(action) { case Connect: assert(!session); - session = createSession(registry, timeout); + { + session = createSession(registry, timeout); + } break; case Disconnect: assert(session); destroySession(session); - session = 0; + session = nullptr; break; case KeepAlive: assert(session); @@ -186,57 +193,53 @@ public: bool isWaitingForCreate() { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); return _state != Destroyed && _state != Connected; } virtual bool waitForCreate() { - Lock sync(*this); - while(_state != Destroyed && _state != Connected) - { - wait(); - } + std::unique_lock<std::mutex> lock(_mutex); + _condVar.wait(lock, [this] { return _state == Destroyed || _state == Connected; }); return _state != Destroyed; } void tryCreateSession() { + std::lock_guard<std::mutex> lock(_mutex); + if(_state == Destroyed) { - Lock sync(*this); - if(_state == Destroyed) - { - return; - } + return; + } - if(_state == Connected) - { - _nextAction = KeepAlive; - } - else - { - _nextAction = Connect; - } - notifyAll(); + if(_state == Connected) + { + _nextAction = KeepAlive; } + else + { + _nextAction = Connect; + } + _condVar.notify_all(); } void - waitTryCreateSession(const IceUtil::Time& timeout = IceUtil::Time()) + waitTryCreateSession(std::chrono::seconds timeout = std::chrono::seconds(0)) { - Lock sync(*this); + std::unique_lock<std::mutex> lock(_mutex); // Wait until the action is executed and the state changes. while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress) { - if(timeout == IceUtil::Time()) + using namespace std::chrono_literals; + if(timeout == 0s) { - wait(); + _condVar.wait(lock); } else { - if(!timedWait(timeout)) + if(_condVar.wait_for(lock, timeout) == std::cv_status::timeout) { break; } @@ -247,19 +250,19 @@ public: void destroyActiveSession() { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); if(_state == Destroyed || _state == Disconnected) { return; } _nextAction = Disconnect; - notifyAll(); + _condVar.notify_all(); } bool terminateIfDisconnected() { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); if(_state != Disconnected) { return false; // Nothing we can do for now. @@ -267,14 +270,14 @@ public: assert(_state != Destroyed); _state = Destroyed; _nextAction = None; - notifyAll(); + _condVar.notify_all(); return true; } void terminate(bool destroySession = true) { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); if(_state == Destroyed) { return; @@ -282,67 +285,80 @@ public: assert(_state != Destroyed); _state = Destroyed; _nextAction = destroySession ? Disconnect : None; - notifyAll(); + _condVar.notify_all(); + } + + void + join() + { + _thread.join(); } bool isDestroyed() { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); return _state == Destroyed; } - TPrx + std::shared_ptr<TPrx> getSession() { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); return _session; } void - setRegistry(const InternalRegistryPrx& registry) + setRegistry(const std::shared_ptr<InternalRegistryPrx>& registry) { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); _registry = registry; } - InternalRegistryPrx + std::shared_ptr<InternalRegistryPrx> getRegistry() const { - Lock sync(*this); + std::lock_guard<std::mutex> lock(_mutex); return _registry; } - virtual TPrx createSession(InternalRegistryPrx&, IceUtil::Time&) = 0; - virtual void destroySession(const TPrx&) = 0; - virtual bool keepAlive(const TPrx&) = 0; + virtual std::shared_ptr<TPrx> createSession(std::shared_ptr<InternalRegistryPrx>&, std::chrono::seconds&) = 0; + virtual void destroySession(const std::shared_ptr<TPrx>&) = 0; + virtual bool keepAlive(const std::shared_ptr<TPrx>&) = 0; protected: - InternalRegistryPrx _registry; - Ice::LoggerPtr _logger; - TPrx _session; + std::shared_ptr<InternalRegistryPrx> _registry; + std::shared_ptr<Ice::Logger> _logger; + std::shared_ptr<TPrx> _session; State _state; Action _nextAction; + + mutable std::mutex _mutex; + std::condition_variable _condVar; + std::thread _thread; }; -class SessionManager : public IceUtil::Monitor<IceUtil::Mutex> +class SessionManager { public: - SessionManager(const Ice::CommunicatorPtr&, const std::string&); - virtual ~SessionManager(); + SessionManager(const std::shared_ptr<Ice::Communicator>&, const std::string&); + virtual ~SessionManager() = default; virtual bool isDestroyed() = 0; protected: - std::vector<IceGrid::QueryPrx> findAllQueryObjects(bool); + std::vector<std::shared_ptr<IceGrid::QueryPrx>> findAllQueryObjects(bool); - Ice::CommunicatorPtr _communicator; + std::shared_ptr<Ice::Communicator> _communicator; std::string _instanceName; - InternalRegistryPrx _master; - std::vector<IceGrid::QueryPrx> _queryObjects; + std::shared_ptr<InternalRegistryPrx> _master; + std::vector<std::shared_ptr<IceGrid::QueryPrx>> _queryObjects; + + std::mutex _mutex; + std::condition_variable _condVar; }; }; |