diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-23 11:19:55 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-23 11:19:55 +0200 |
commit | 113775a1ac1d1a4298ddc1fba661c1bad0b80906 (patch) | |
tree | a85b68e8a8f140fb01e113aa8cdb5fc1a47e62b1 /cpp/src | |
parent | Added support for DESTDIR in gradle build (diff) | |
download | ice-113775a1ac1d1a4298ddc1fba661c1bad0b80906.tar.bz2 ice-113775a1ac1d1a4298ddc1fba661c1bad0b80906.tar.xz ice-113775a1ac1d1a4298ddc1fba661c1bad0b80906.zip |
Fixed ICE-5759: IceGrid activation test failure on Windows
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReapThread.cpp | 109 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReapThread.h | 42 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 12 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.cpp | 47 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionServantManager.h | 3 |
7 files changed, 156 insertions, 61 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index 1af3426363f..97fff196b19 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -146,8 +146,6 @@ AdminSessionI::_register(const SessionServantManagerPtr& servantManager, const I templateId.category = category; _adminCallbackTemplate = _registry->createAdminCallbackProxy(templateId); - - setConnectionCallback(con); } Ice::ObjectPrx session = _servantManager->addSession(this, con, category); diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp index 569e6f7a4b2..68eb1181d7d 100644 --- a/cpp/src/IceGrid/ReapThread.cpp +++ b/cpp/src/IceGrid/ReapThread.cpp @@ -13,8 +13,39 @@ using namespace std; using namespace IceGrid; +namespace +{ + +class ConnectionCallbackI : public Ice::ConnectionCallback +{ +public: + + ConnectionCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) + { + } + + virtual void + heartbeat(const Ice::ConnectionPtr& con) + { + _reaper->connectionHeartbeat(con); + } + + virtual void + closed(const Ice::ConnectionPtr& con) + { + _reaper->connectionClosed(con); + } + +private: + + const ReapThreadPtr _reaper; +}; + +} + ReapThread::ReapThread() : IceUtil::Thread("Icegrid reaper thread"), + _callback(new ConnectionCallbackI(this)), _terminated(false) { } @@ -60,21 +91,39 @@ ReapThread::run() { p->item->timestamp(); // This should throw if the reapable is destroyed. ++p; + continue; } else if((IceUtil::Time::now(IceUtil::Time::Monotonic) - p->item->timestamp()) > p->timeout) { reap.push_back(*p); - p = _sessions.erase(p); } else { ++p; + continue; } } catch(const Ice::ObjectNotExistException&) { - p = _sessions.erase(p); } + + // + // Remove the reapable + // + if(p->connection) + { + map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator q = _connections.find(p->connection); + if(q != _connections.end()) + { + q->second.erase(p->item); + if(q->second.empty()) + { + p->connection->setCallback(0); + _connections.erase(q); + } + } + } + p = _sessions.erase(p); } } @@ -100,6 +149,13 @@ ReapThread::terminate() _terminated = true; notify(); reap.swap(_sessions); + + for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p) + { + p->first->setCallback(0); + } + _connections.clear(); + _callback = 0; } for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p) @@ -109,7 +165,7 @@ ReapThread::terminate() } void -ReapThread::add(const ReapablePtr& reapable, int timeout) +ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionPtr& connection) { Lock sync(*this); if(_terminated) @@ -132,9 +188,21 @@ ReapThread::add(const ReapablePtr& reapable, int timeout) ReapableItem item; item.item = reapable; + item.connection = connection; item.timeout = timeout == 0 ? IceUtil::Time() : IceUtil::Time::seconds(timeout); _sessions.push_back(item); + if(connection) + { + map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(connection); + if(p == _connections.end()) + { + p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first; + connection->setCallback(_callback); + } + p->second.insert(reapable); + } + if(timeout > 0) { // @@ -154,6 +222,41 @@ ReapThread::add(const ReapablePtr& reapable, int timeout) } } +void +ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con) +{ + Lock sync(*this); + map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con); + if(p == _connections.end()) + { + con->setCallback(0); + return; + } + + for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + { + (*q)->heartbeat(); + } +} + +void +ReapThread::connectionClosed(const Ice::ConnectionPtr& con) +{ + Lock sync(*this); + map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con); + if(p == _connections.end()) + { + con->setCallback(0); + return; + } + + for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + { + (*q)->destroy(false); + } + _connections.erase(p); +} + // // Returns true if the calculated wake interval is less than the current wake // interval (or if the original wake interval was "forever"). diff --git a/cpp/src/IceGrid/ReapThread.h b/cpp/src/IceGrid/ReapThread.h index 8f3077ef941..44d961a6d0c 100644 --- a/cpp/src/IceGrid/ReapThread.h +++ b/cpp/src/IceGrid/ReapThread.h @@ -17,18 +17,21 @@ #include <Ice/Logger.h> #include <Ice/LocalException.h> #include <Ice/LoggerUtil.h> +#include <Ice/Connection.h> #include <list> namespace IceGrid { -class Reapable : public IceUtil::Shared +class Reapable : public Ice::LocalObject { public: virtual ~Reapable() { } + virtual void heartbeat() const { }; + virtual IceUtil::Time timestamp() const = 0; virtual void destroy(bool) = 0; @@ -81,12 +84,38 @@ public: } } -private: +protected: const Ice::LoggerPtr _logger; const TPtr _session; }; +template<class T> +class SessionReapableWithHeartbeat : public SessionReapable<T> +{ + typedef IceUtil::Handle<T> TPtr; + +public: + + SessionReapableWithHeartbeat(const Ice::LoggerPtr& logger, const TPtr& session) : + SessionReapable<T>(logger, session) + { + } + + virtual void + heartbeat() const + { + try + { + SessionReapable<T>::_session->keepAlive(Ice::Current()); + } + catch(Ice::Exception&) + { + } + } +}; + + class ReapThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> { public: @@ -95,20 +124,27 @@ public: virtual void run(); void terminate(); - void add(const ReapablePtr&, int); + void add(const ReapablePtr&, int, const Ice::ConnectionPtr& = Ice::ConnectionPtr()); + + void connectionHeartbeat(const Ice::ConnectionPtr&); + void connectionClosed(const Ice::ConnectionPtr&); private: bool calcWakeInterval(); + Ice::ConnectionCallbackPtr _callback; IceUtil::Time _wakeInterval; bool _terminated; struct ReapableItem { ReapablePtr item; + Ice::ConnectionPtr connection; IceUtil::Time timeout; }; std::list<ReapableItem> _sessions; + + std::map<Ice::ConnectionPtr, std::set<ReapablePtr> > _connections; }; typedef IceUtil::Handle<ReapThread> ReapThreadPtr; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 521ba01dc8d..843b8b15cfa 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -968,7 +968,8 @@ RegistryI::createSession(const string& user, const string& password, const Curre SessionIPtr session = _clientSessionFactory->createSessionServant(user, 0); Ice::ObjectPrx proxy = session->_register(_servantManager, current.con); - _reaper->add(new SessionReapable<SessionI>(_traceLevels->logger, session), _sessionTimeout); + _reaper->add(new SessionReapableWithHeartbeat<SessionI>(_traceLevels->logger, session), _sessionTimeout, + current.con); return SessionPrx::uncheckedCast(proxy); } @@ -1024,7 +1025,8 @@ RegistryI::createAdminSession(const string& user, const string& password, const AdminSessionIPtr session = _adminSessionFactory->createSessionServant(user); Ice::ObjectPrx proxy = session->_register(_servantManager, current.con); - _reaper->add(new SessionReapable<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout); + _reaper->add(new SessionReapableWithHeartbeat<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout, + current.con); return AdminSessionPrx::uncheckedCast(proxy); } @@ -1089,7 +1091,8 @@ RegistryI::createSessionFromSecureConnection(const Current& current) SessionIPtr session = _clientSessionFactory->createSessionServant(userDN, 0); Ice::ObjectPrx proxy = session->_register(_servantManager, current.con); - _reaper->add(new SessionReapable<SessionI>(_traceLevels->logger, session), _sessionTimeout); + _reaper->add(new SessionReapableWithHeartbeat<SessionI>(_traceLevels->logger, session), _sessionTimeout, + current.con); return SessionPrx::uncheckedCast(proxy); } @@ -1143,7 +1146,8 @@ RegistryI::createAdminSessionFromSecureConnection(const Current& current) // AdminSessionIPtr session = _adminSessionFactory->createSessionServant(userDN); Ice::ObjectPrx proxy = session->_register(_servantManager, current.con); - _reaper->add(new SessionReapable<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout); + _reaper->add(new SessionReapableWithHeartbeat<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout, + current.con); return AdminSessionPrx::uncheckedCast(proxy); } diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp index f7f656e846f..6afba2bbf10 100644 --- a/cpp/src/IceGrid/SessionI.cpp +++ b/cpp/src/IceGrid/SessionI.cpp @@ -62,43 +62,6 @@ newAllocateObject(const SessionIPtr& session, const IceUtil::Handle<T>& cb) return new AllocateObject<T>(session, cb); } -class ConnectionCallbackI : public Ice::ConnectionCallback -{ -public: - - ConnectionCallbackI(const BaseSessionIPtr& session) : _session(session) - { - } - - virtual void - heartbeat(const Ice::ConnectionPtr&) - { - try - { - _session->keepAlive(Ice::Current()); - } - catch(const Ice::Exception&) - { - } - } - - virtual void - closed(const Ice::ConnectionPtr&) - { - try - { - _session->destroyImpl(false); - } - catch(const Ice::Exception&) - { - } - } - -private: - - const BaseSessionIPtr _session; -}; - } BaseSessionI::BaseSessionI(const string& id, const string& prefix, const DatabasePtr& database) : @@ -121,12 +84,6 @@ BaseSessionI::~BaseSessionI() } void -BaseSessionI::setConnectionCallback(const Ice::ConnectionPtr& con) -{ - con->setCallback(new ConnectionCallbackI(this)); -} - -void BaseSessionI::keepAlive(const Ice::Current& current) { Lock sync(*this); @@ -212,10 +169,6 @@ SessionI::_register(const SessionServantManagerPtr& servantManager, const Ice::C // This is supposed to be called after creation only, no need to synchronize. // _servantManager = servantManager; - if(con) - { - setConnectionCallback(con); - } return _servantManager->addSession(this, con, ""); } diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h index 5885a10e12f..c8aba2224b4 100644 --- a/cpp/src/IceGrid/SessionI.h +++ b/cpp/src/IceGrid/SessionI.h @@ -55,8 +55,6 @@ protected: BaseSessionI(const std::string&, const std::string&, const DatabasePtr&); - void setConnectionCallback(const Ice::ConnectionPtr&); - const std::string _id; const std::string _prefix; const TraceLevelsPtr _traceLevels; diff --git a/cpp/src/IceGrid/SessionServantManager.h b/cpp/src/IceGrid/SessionServantManager.h index 5fba8a18b64..c1d7ca0ba4f 100644 --- a/cpp/src/IceGrid/SessionServantManager.h +++ b/cpp/src/IceGrid/SessionServantManager.h @@ -44,6 +44,9 @@ public: Ice::ObjectPrx add(const Ice::ObjectPtr&, const Ice::ObjectPtr&); void remove(const Ice::Identity&); + void connectionHeartbeat(const Ice::ConnectionPtr&); + void connectionClosed(const Ice::ConnectionPtr&); + private: Ice::ObjectPrx addImpl(const Ice::ObjectPtr&, const Ice::ObjectPtr&); |