summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/SessionManager.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/SessionManager.h')
-rw-r--r--cpp/src/IceGrid/SessionManager.h166
1 files changed, 91 insertions, 75 deletions
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index 44261f603a0..243fe8b7957 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -5,22 +5,18 @@
#ifndef ICE_GRID_SESSION_MANAGER_H
#define ICE_GRID_SESSION_MANAGER_H
-#include <IceUtil/Handle.h>
-#include <IceUtil/Mutex.h>
-#include <IceUtil/Monitor.h>
-#include <IceUtil/Thread.h>
-
#include <Ice/Logger.h>
#include <Ice/LoggerUtil.h>
#include <IceGrid/Registry.h>
#include <IceGrid/Internal.h>
+#include <IceGrid/Util.h>
namespace IceGrid
{
template<class TPrx>
-class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
+class SessionKeepAliveThread
{
#if defined(__clang__)
# pragma clang diagnostic push
@@ -47,21 +43,28 @@ class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<I
public:
- SessionKeepAliveThread(const InternalRegistryPrx& registry, const Ice::LoggerPtr& logger) :
- IceUtil::Thread("IceGrid session keepalive thread"),
+ SessionKeepAliveThread(const std::shared_ptr<InternalRegistryPrx>& registry,
+ const std::shared_ptr<Ice::Logger>& logger) :
_registry(registry),
_logger(logger),
_state(InProgress),
- _nextAction(None)
+ _nextAction(None),
+ _thread([this] { run(); })
+ {
+ }
+
+ virtual ~SessionKeepAliveThread()
{
+ assert(_state == Destroyed);
}
- virtual void
+ void
run()
{
- TPrx session;
- InternalRegistryPrx registry;
- IceUtil::Time timeout = IceUtil::Time::seconds(10);
+ using namespace std::chrono_literals;
+ std::shared_ptr<TPrx> session;
+ std::shared_ptr<InternalRegistryPrx> registry;
+ std::chrono::seconds timeout = 10s;
Action action = Connect;
try
@@ -69,7 +72,7 @@ public:
while(true)
{
{
- Lock sync(*this);
+ std::unique_lock<std::mutex> lock(_mutex);
if(_state == Destroyed)
{
break;
@@ -98,7 +101,7 @@ public:
{
_nextAction = Connect;
}
- notifyAll();
+ _condVar.notify_all();
//
// Wait if there's nothing to do and if we are
@@ -108,12 +111,12 @@ public:
{
if(_state == Connected || action == Connect || action == KeepAlive)
{
- IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
- IceUtil::Time wakeTime = now + timeout;
+ auto now = std::chrono::steady_clock::now();
+ auto wakeTime = now + timeout;
while(_state != Destroyed && _nextAction == None && wakeTime > now)
{
- timedWait(wakeTime - now);
- now = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ _condVar.wait_for(lock, wakeTime - now);
+ now = std::chrono::steady_clock::now();
}
}
if(_nextAction == None)
@@ -130,23 +133,27 @@ public:
assert(_nextAction != None);
action = _nextAction;
- registry = InternalRegistryPrx::uncheckedCast(
- _registry->ice_timeout(static_cast<int>(timeout.toMilliSeconds())));
+ assert(timeout != 0s);
+ using namespace std::chrono;
+
+ registry = Ice::uncheckedCast<InternalRegistryPrx>(_registry->ice_timeout(secondsToInt(timeout)));
_nextAction = None;
_state = InProgress;
- notifyAll();
+ _condVar.notify_all();
}
switch(action)
{
case Connect:
assert(!session);
- session = createSession(registry, timeout);
+ {
+ session = createSession(registry, timeout);
+ }
break;
case Disconnect:
assert(session);
destroySession(session);
- session = 0;
+ session = nullptr;
break;
case KeepAlive:
assert(session);
@@ -186,57 +193,53 @@ public:
bool
isWaitingForCreate()
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
return _state != Destroyed && _state != Connected;
}
virtual bool
waitForCreate()
{
- Lock sync(*this);
- while(_state != Destroyed && _state != Connected)
- {
- wait();
- }
+ std::unique_lock<std::mutex> lock(_mutex);
+ _condVar.wait(lock, [this] { return _state == Destroyed || _state == Connected; });
return _state != Destroyed;
}
void
tryCreateSession()
{
+ std::lock_guard<std::mutex> lock(_mutex);
+ if(_state == Destroyed)
{
- Lock sync(*this);
- if(_state == Destroyed)
- {
- return;
- }
+ return;
+ }
- if(_state == Connected)
- {
- _nextAction = KeepAlive;
- }
- else
- {
- _nextAction = Connect;
- }
- notifyAll();
+ if(_state == Connected)
+ {
+ _nextAction = KeepAlive;
}
+ else
+ {
+ _nextAction = Connect;
+ }
+ _condVar.notify_all();
}
void
- waitTryCreateSession(const IceUtil::Time& timeout = IceUtil::Time())
+ waitTryCreateSession(std::chrono::seconds timeout = std::chrono::seconds(0))
{
- Lock sync(*this);
+ std::unique_lock<std::mutex> lock(_mutex);
// Wait until the action is executed and the state changes.
while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress)
{
- if(timeout == IceUtil::Time())
+ using namespace std::chrono_literals;
+ if(timeout == 0s)
{
- wait();
+ _condVar.wait(lock);
}
else
{
- if(!timedWait(timeout))
+ if(_condVar.wait_for(lock, timeout) == std::cv_status::timeout)
{
break;
}
@@ -247,19 +250,19 @@ public:
void
destroyActiveSession()
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
if(_state == Destroyed || _state == Disconnected)
{
return;
}
_nextAction = Disconnect;
- notifyAll();
+ _condVar.notify_all();
}
bool
terminateIfDisconnected()
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
if(_state != Disconnected)
{
return false; // Nothing we can do for now.
@@ -267,14 +270,14 @@ public:
assert(_state != Destroyed);
_state = Destroyed;
_nextAction = None;
- notifyAll();
+ _condVar.notify_all();
return true;
}
void
terminate(bool destroySession = true)
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
if(_state == Destroyed)
{
return;
@@ -282,67 +285,80 @@ public:
assert(_state != Destroyed);
_state = Destroyed;
_nextAction = destroySession ? Disconnect : None;
- notifyAll();
+ _condVar.notify_all();
+ }
+
+ void
+ join()
+ {
+ _thread.join();
}
bool
isDestroyed()
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
return _state == Destroyed;
}
- TPrx
+ std::shared_ptr<TPrx>
getSession()
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
return _session;
}
void
- setRegistry(const InternalRegistryPrx& registry)
+ setRegistry(const std::shared_ptr<InternalRegistryPrx>& registry)
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
_registry = registry;
}
- InternalRegistryPrx
+ std::shared_ptr<InternalRegistryPrx>
getRegistry() const
{
- Lock sync(*this);
+ std::lock_guard<std::mutex> lock(_mutex);
return _registry;
}
- virtual TPrx createSession(InternalRegistryPrx&, IceUtil::Time&) = 0;
- virtual void destroySession(const TPrx&) = 0;
- virtual bool keepAlive(const TPrx&) = 0;
+ virtual std::shared_ptr<TPrx> createSession(std::shared_ptr<InternalRegistryPrx>&, std::chrono::seconds&) = 0;
+ virtual void destroySession(const std::shared_ptr<TPrx>&) = 0;
+ virtual bool keepAlive(const std::shared_ptr<TPrx>&) = 0;
protected:
- InternalRegistryPrx _registry;
- Ice::LoggerPtr _logger;
- TPrx _session;
+ std::shared_ptr<InternalRegistryPrx> _registry;
+ std::shared_ptr<Ice::Logger> _logger;
+ std::shared_ptr<TPrx> _session;
State _state;
Action _nextAction;
+
+ mutable std::mutex _mutex;
+ std::condition_variable _condVar;
+ std::thread _thread;
};
-class SessionManager : public IceUtil::Monitor<IceUtil::Mutex>
+class SessionManager
{
public:
- SessionManager(const Ice::CommunicatorPtr&, const std::string&);
- virtual ~SessionManager();
+ SessionManager(const std::shared_ptr<Ice::Communicator>&, const std::string&);
+ virtual ~SessionManager() = default;
virtual bool isDestroyed() = 0;
protected:
- std::vector<IceGrid::QueryPrx> findAllQueryObjects(bool);
+ std::vector<std::shared_ptr<IceGrid::QueryPrx>> findAllQueryObjects(bool);
- Ice::CommunicatorPtr _communicator;
+ std::shared_ptr<Ice::Communicator> _communicator;
std::string _instanceName;
- InternalRegistryPrx _master;
- std::vector<IceGrid::QueryPrx> _queryObjects;
+ std::shared_ptr<InternalRegistryPrx> _master;
+ std::vector<std::shared_ptr<IceGrid::QueryPrx>> _queryObjects;
+
+ std::mutex _mutex;
+ std::condition_variable _condVar;
};
};