diff options
Diffstat (limited to 'cpp/src/IceGrid/AdminSessionI.cpp')
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.cpp | 318 |
1 files changed, 148 insertions, 170 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index 947298e7842..4120f274f28 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -8,6 +8,7 @@ #include <IceGrid/AdminSessionI.h> #include <IceGrid/AdminI.h> #include <IceGrid/Database.h> +#include <IceGrid/Internal.h> #include <IceSSL/Plugin.h> @@ -19,67 +20,45 @@ namespace class SubscriberForwarderI : public Ice::BlobjectArrayAsync { - class CallbackI: public IceUtil::Shared - { - public: - - virtual void - exception(const Ice::Exception& ex, const Ice::AMD_Object_ice_invokePtr& amdCB) - { - try - { - ex.ice_throw(); - } - catch(const Ice::Exception&) - { - // Throw ObjectNotExistException, the subscriber is unreachable - amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); - } - } - - virtual void - response(bool ok, - const pair<const Ice::Byte*, const Ice::Byte*>& outP, - const Ice::AMD_Object_ice_invokePtr& amdCB) - { - amdCB->ice_response(ok, outP); - } - }; - public: - SubscriberForwarderI(const Ice::ObjectPrx& proxy) : - _proxy(proxy), - _callback(newCallback_Object_ice_invoke(new CallbackI(), &CallbackI::response, &CallbackI::exception)) + SubscriberForwarderI(const std::shared_ptr<Ice::ObjectPrx>& proxy) : + _proxy(proxy) { } - virtual void - ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB, - const pair<const Ice::Byte*, const Ice::Byte*>& inParams, - const Ice::Current& current) + void + ice_invokeAsync(pair<const Ice::Byte*, const Ice::Byte*> inParams, + function<void(bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> response, + function<void(exception_ptr)> exception, const Ice::Current& current) override { - _proxy->begin_ice_invoke(current.operation, current.mode, inParams, current.ctx, _callback, amdCB); + _proxy->ice_invokeAsync(current.operation, current.mode, inParams, + move(response), + [exception = move(exception)] (exception_ptr) + { + // Throw ObjectNotExistException, the subscriber is unreachable + exception(make_exception_ptr(Ice::ObjectNotExistException(__FILE__, __LINE__))); + }, + nullptr, current.ctx); } private: - const Ice::ObjectPrx _proxy; - const Ice::Callback_Object_ice_invokePtr _callback; + const shared_ptr<Ice::ObjectPrx> _proxy; }; } -FileIteratorI::FileIteratorI(const AdminSessionIPtr& session, - const FileReaderPrx& reader, +FileIteratorI::FileIteratorI(const shared_ptr<AdminSessionI>& session, + const shared_ptr<FileReaderPrx>& reader, const string& filename, - Ice::Long offset, - int messageSizeMax) : + long long offset, + int messageMaxSize) : _session(session), _reader(reader), _filename(filename), _offset(offset), - _messageSizeMax(messageSizeMax - 256) // Room for the header + _messageMaxSize(messageMaxSize - 256) // Room for the header { } @@ -88,13 +67,11 @@ FileIteratorI::read(int size, Ice::StringSeq& lines, const Ice::Current&) { try { - return _reader->read(_filename, _offset, size > _messageSizeMax ? _messageSizeMax : size, _offset, lines); + return _reader->read(_filename, _offset, size > _messageMaxSize ? _messageMaxSize : size, _offset, lines); } - catch(const Ice::LocalException& ex) + catch(const std::exception& ex) { - ostringstream os; - os << ex; - throw FileNotAvailableException(os.str()); + throw FileNotAvailableException(ex.what()); } } @@ -104,7 +81,8 @@ FileIteratorI::destroy(const Ice::Current& current) _session->removeFileIterator(current.id, current); } -AdminSessionI::AdminSessionI(const string& id, const DatabasePtr& db, int timeout, const RegistryIPtr& registry) : +AdminSessionI::AdminSessionI(const string& id, const shared_ptr<Database>& db, chrono::seconds timeout, + const shared_ptr<RegistryI>& registry) : BaseSessionI(id, "admin", db), _timeout(timeout), _replicaName(registry->getName()), @@ -112,12 +90,9 @@ AdminSessionI::AdminSessionI(const string& id, const DatabasePtr& db, int timeou { } -AdminSessionI::~AdminSessionI() -{ -} - -Ice::ObjectPrx -AdminSessionI::_register(const SessionServantManagerPtr& servantManager, const Ice::ConnectionPtr& con) +shared_ptr<Ice::ObjectPrx> +AdminSessionI::_register(const shared_ptr<SessionServantManager>& servantManager, + const shared_ptr<Ice::Connection>& con) { // // This is supposed to be called after creation only, no need to synchronize. @@ -137,117 +112,123 @@ AdminSessionI::_register(const SessionServantManagerPtr& servantManager, const I _adminCallbackTemplate = _registry->createAdminCallbackProxy(templateId); } - Ice::ObjectPrx session = _servantManager->addSession(this, con, category); + auto self = static_pointer_cast<AdminSessionI>(shared_from_this()); + + auto session = _servantManager->addSession(self, con, category); - _admin = AdminPrx::uncheckedCast(_servantManager->add(new AdminI(_database, _registry, this), this)); + _admin = Ice::uncheckedCast<AdminPrx>(_servantManager->add(make_shared<AdminI>(_database, _registry, self), self)); return session; } -AdminPrx +shared_ptr<AdminPrx> AdminSessionI::getAdmin(const Ice::Current&) const { return _admin; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> AdminSessionI::getAdminCallbackTemplate(const Ice::Current&) const { return _adminCallbackTemplate; } void -AdminSessionI::setObservers(const RegistryObserverPrx& registryObserver, - const NodeObserverPrx& nodeObserver, - const ApplicationObserverPrx& appObserver, - const AdapterObserverPrx& adapterObserver, - const ObjectObserverPrx& objectObserver, +AdminSessionI::setObservers(shared_ptr<RegistryObserverPrx> registryObserver, + shared_ptr<NodeObserverPrx> nodeObserver, + shared_ptr<ApplicationObserverPrx> appObserver, + shared_ptr<AdapterObserverPrx> adapterObserver, + shared_ptr<ObjectObserverPrx> objectObserver, const Ice::Current& current) { - Lock sync(*this); + lock_guard lock(_mutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, "", ""); } - const int t = _timeout * 1000; - const Ice::LocatorPrx l = _registry->getLocator(); + const auto t = secondsToInt(_timeout); + assert(t != 0); + const auto l = _registry->getLocator(); if(registryObserver) { - setupObserverSubscription(RegistryObserverTopicName, + setupObserverSubscription(TopicName::RegistryObserver, addForwarder(registryObserver->ice_timeout(t)->ice_locator(l))); } else { - setupObserverSubscription(RegistryObserverTopicName, Ice::ObjectPrx()); + setupObserverSubscription(TopicName::RegistryObserver, nullptr); } if(nodeObserver) { - setupObserverSubscription(NodeObserverTopicName, + setupObserverSubscription(TopicName::NodeObserver, addForwarder(nodeObserver->ice_timeout(t)->ice_locator(l))); } else { - setupObserverSubscription(NodeObserverTopicName, Ice::ObjectPrx()); + setupObserverSubscription(TopicName::NodeObserver, nullptr); } if(appObserver) { - setupObserverSubscription(ApplicationObserverTopicName, + setupObserverSubscription(TopicName::ApplicationObserver, addForwarder(appObserver->ice_timeout(t)->ice_locator(l))); } else { - setupObserverSubscription(ApplicationObserverTopicName, Ice::ObjectPrx()); + setupObserverSubscription(TopicName::ApplicationObserver, nullptr); } if(adapterObserver) { - setupObserverSubscription(AdapterObserverTopicName, + setupObserverSubscription(TopicName::AdapterObserver, addForwarder(adapterObserver->ice_timeout(t)->ice_locator(l))); } else { - setupObserverSubscription(AdapterObserverTopicName, Ice::ObjectPrx()); + setupObserverSubscription(TopicName::AdapterObserver, nullptr); } if(objectObserver) { - setupObserverSubscription(ObjectObserverTopicName, + setupObserverSubscription(TopicName::ObjectObserver, addForwarder(objectObserver->ice_timeout(t)->ice_locator(l))); } else { - setupObserverSubscription(ObjectObserverTopicName, Ice::ObjectPrx()); + setupObserverSubscription(TopicName::ObjectObserver, nullptr); } } void -AdminSessionI::setObserversByIdentity(const Ice::Identity& registryObserver, - const Ice::Identity& nodeObserver, - const Ice::Identity& appObserver, - const Ice::Identity& adapterObserver, - const Ice::Identity& objectObserver, +AdminSessionI::setObserversByIdentity(Ice::Identity registryObserver, + Ice::Identity nodeObserver, + Ice::Identity appObserver, + Ice::Identity adapterObserver, + Ice::Identity objectObserver, const Ice::Current& current) { - Lock sync(*this); + lock_guard lock(_mutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, "", ""); } - setupObserverSubscription(RegistryObserverTopicName, addForwarder(registryObserver, current), true); - setupObserverSubscription(NodeObserverTopicName, addForwarder(nodeObserver, current), true); - setupObserverSubscription(ApplicationObserverTopicName, addForwarder(appObserver, current), true); - setupObserverSubscription(AdapterObserverTopicName, addForwarder(adapterObserver, current), true); - setupObserverSubscription(ObjectObserverTopicName, addForwarder(objectObserver, current), true); + setupObserverSubscription(TopicName::RegistryObserver, addForwarder(registryObserver, current), true); + setupObserverSubscription(TopicName::NodeObserver, addForwarder(nodeObserver, current), true); + setupObserverSubscription(TopicName::ApplicationObserver, addForwarder(appObserver, current), true); + setupObserverSubscription(TopicName::AdapterObserver, addForwarder(adapterObserver, current), true); + setupObserverSubscription(TopicName::ObjectObserver, addForwarder(objectObserver, current), true); } int AdminSessionI::startUpdate(const Ice::Current& current) { - Lock sync(*this); + lock_guard lock(_mutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, "", ""); @@ -260,7 +241,8 @@ AdminSessionI::startUpdate(const Ice::Current& current) void AdminSessionI::finishUpdate(const Ice::Current& current) { - Lock sync(*this); + lock_guard lock(_mutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, "", ""); @@ -275,12 +257,12 @@ AdminSessionI::getReplicaName(const Ice::Current&) const return _replicaName; } -FileIteratorPrx -AdminSessionI::openServerLog(const string& id, const string& path, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openServerLog(string id, string path, int nLines, const Ice::Current& current) { try { - return addFileIterator(_database->getServer(id)->getProxy(false, 5), "#" + path, nLines, current); + return addFileIterator(_database->getServer(move(id))->getProxy(false, 5s), "#" + move(path), nLines, current); } catch(const SynchronizationException&) { @@ -288,12 +270,12 @@ AdminSessionI::openServerLog(const string& id, const string& path, int nLines, c } } -FileIteratorPrx -AdminSessionI::openServerStdOut(const string& id, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openServerStdOut(string id, int nLines, const Ice::Current& current) { try { - return addFileIterator(_database->getServer(id)->getProxy(false, 5), "stdout", nLines, current); + return addFileIterator(_database->getServer(move(id))->getProxy(false, 5s), "stdout", nLines, current); } catch(const SynchronizationException&) { @@ -301,12 +283,12 @@ AdminSessionI::openServerStdOut(const string& id, int nLines, const Ice::Current } } -FileIteratorPrx -AdminSessionI::openServerStdErr(const string& id, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openServerStdErr(string id, int nLines, const Ice::Current& current) { try { - return addFileIterator(_database->getServer(id)->getProxy(false, 5), "stderr", nLines, current); + return addFileIterator(_database->getServer(move(id))->getProxy(false, 5s), "stderr", nLines, current); } catch(const SynchronizationException&) { @@ -314,44 +296,44 @@ AdminSessionI::openServerStdErr(const string& id, int nLines, const Ice::Current } } -FileIteratorPrx -AdminSessionI::openNodeStdOut(const string& name, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openNodeStdOut(string name, int nLines, const Ice::Current& current) { - return addFileIterator(_database->getNode(name)->getProxy(), "stdout", nLines, current); + return addFileIterator(_database->getNode(move(name))->getProxy(), "stdout", nLines, current); } -FileIteratorPrx -AdminSessionI::openNodeStdErr(const string& name, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openNodeStdErr(string name, int nLines, const Ice::Current& current) { - return addFileIterator(_database->getNode(name)->getProxy(), "stderr", nLines, current); + return addFileIterator(_database->getNode(move(name))->getProxy(), "stderr", nLines, current); } -FileIteratorPrx -AdminSessionI::openRegistryStdOut(const string& name, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openRegistryStdOut(string name, int nLines, const Ice::Current& current) { - FileReaderPrx reader; + shared_ptr<FileReaderPrx> reader; if(name == _replicaName) { reader = _database->getReplicaCache().getInternalRegistry(); } else { - reader = _database->getReplica(name)->getProxy(); + reader = _database->getReplica(move(name))->getProxy(); } return addFileIterator(reader, "stdout", nLines, current); } -FileIteratorPrx -AdminSessionI::openRegistryStdErr(const string& name, int nLines, const Ice::Current& current) +shared_ptr<FileIteratorPrx> +AdminSessionI::openRegistryStdErr(string name, int nLines, const Ice::Current& current) { - FileReaderPrx reader; + shared_ptr<FileReaderPrx> reader; if(name == _replicaName) { reader = _database->getReplicaCache().getInternalRegistry(); } else { - reader = _database->getReplica(name)->getProxy(); + reader = _database->getReplica(move(name))->getProxy(); } return addFileIterator(reader, "stderr", nLines, current); } @@ -363,7 +345,7 @@ AdminSessionI::destroy(const Ice::Current&) } void -AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer, bool forwarder) +AdminSessionI::setupObserverSubscription(TopicName name, const shared_ptr<Ice::ObjectPrx>& observer, bool forwarder) { if(_observers.find(name) != _observers.end() && _observers[name].first != observer) { @@ -390,27 +372,28 @@ AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& o } } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> AdminSessionI::addForwarder(const Ice::Identity& id, const Ice::Current& current) { if(id.name.empty()) { - return Ice::ObjectPrx(); + return nullptr; } return addForwarder(current.con->createProxy(id)->ice_encodingVersion(current.encoding)); } -Ice::ObjectPrx -AdminSessionI::addForwarder(const Ice::ObjectPrx& prx) +shared_ptr<Ice::ObjectPrx> +AdminSessionI::addForwarder(const shared_ptr<Ice::ObjectPrx>& prx) { - return _registry->getRegistryAdapter()->addWithUUID(new SubscriberForwarderI(prx)); + return _registry->getRegistryAdapter()->addWithUUID(make_shared<SubscriberForwarderI>(prx)); } -FileIteratorPrx -AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, int nLines, +shared_ptr<FileIteratorPrx> +AdminSessionI::addFileIterator(const shared_ptr<FileReaderPrx>& reader, const string& filename, int nLines, const Ice::Current& current) { - Lock sync(*this); + lock_guard lock(_mutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__, current.id, "", ""); @@ -420,29 +403,29 @@ AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filena // Always call getOffsetFromEnd even if nLines < 0. This allows to // throw right away if the file doesn't exit. // - Ice::Long offset; + long long offset; try { offset = reader->getOffsetFromEnd(filename, nLines); } - catch(const Ice::LocalException& ex) + catch(const std::exception& ex) { - ostringstream os; - os << ex; - throw FileNotAvailableException(os.str()); + throw FileNotAvailableException(ex.what()); } - Ice::PropertiesPtr properties = reader->ice_getCommunicator()->getProperties(); - int messageSizeMax = properties->getPropertyAsIntWithDefault("Ice.MessageSizeMax", 1024) * 1024; + auto properties = reader->ice_getCommunicator()->getProperties(); + int messageMaxSize = properties->getPropertyAsIntWithDefault("Ice.MessageMaxSize", 1024) * 1024; - Ice::ObjectPrx obj = _servantManager->add(new FileIteratorI(this, reader, filename, offset, messageSizeMax), this); - return FileIteratorPrx::uncheckedCast(obj); + auto self = static_pointer_cast<AdminSessionI>(shared_from_this()); + auto obj = _servantManager->add(make_shared<FileIteratorI>(self, reader, filename, offset, messageMaxSize), self); + return Ice::uncheckedCast<FileIteratorPrx>(obj); } void AdminSessionI::removeFileIterator(const Ice::Identity& id, const Ice::Current&) { - Lock sync(*this); + lock_guard lock(_mutex); + _servantManager->remove(id); } @@ -451,7 +434,7 @@ AdminSessionI::destroyImpl(bool shutdown) { BaseSessionI::destroyImpl(shutdown); - _servantManager->removeSession(this); + _servantManager->removeSession(shared_from_this()); try { @@ -466,18 +449,18 @@ AdminSessionI::destroyImpl(bool shutdown) // // Unsubscribe from the topics. // - setupObserverSubscription(RegistryObserverTopicName, 0); - setupObserverSubscription(NodeObserverTopicName, 0); - setupObserverSubscription(ApplicationObserverTopicName, 0); - setupObserverSubscription(AdapterObserverTopicName, 0); - setupObserverSubscription(ObjectObserverTopicName, 0); + setupObserverSubscription(TopicName::RegistryObserver, nullptr); + setupObserverSubscription(TopicName::NodeObserver, nullptr); + setupObserverSubscription(TopicName::ApplicationObserver, nullptr); + setupObserverSubscription(TopicName::AdapterObserver, nullptr); + setupObserverSubscription(TopicName::ObjectObserver, nullptr); } } -AdminSessionFactory::AdminSessionFactory(const SessionServantManagerPtr& servantManager, - const DatabasePtr& database, - const ReapThreadPtr& reaper, - const RegistryIPtr& registry) : +AdminSessionFactory::AdminSessionFactory(const shared_ptr<SessionServantManager>& servantManager, + const shared_ptr<Database>& database, + const shared_ptr<ReapThread>& reaper, + const shared_ptr<RegistryI>& registry) : _servantManager(servantManager), _database(database), _timeout(registry->getSessionTimeout(Ice::emptyCurrent)), @@ -487,35 +470,29 @@ AdminSessionFactory::AdminSessionFactory(const SessionServantManagerPtr& servant { if(_servantManager) // Not set if Glacier2 session manager adapter not enabled { - Ice::PropertiesPtr props = database->getCommunicator()->getProperties(); + auto props = database->getCommunicator()->getProperties(); const_cast<bool&>(_filters) = props->getPropertyAsIntWithDefault("IceGrid.Registry.AdminSessionFilters", 0) > 0; } } -Glacier2::SessionPrx -AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacier2::SessionControlPrx& ctl) +shared_ptr<Glacier2::SessionPrx> +AdminSessionFactory::createGlacier2Session(const string& sessionId, const shared_ptr<Glacier2::SessionControlPrx>& ctl) { assert(_servantManager); - AdminSessionIPtr session = createSessionServant(sessionId); - Ice::ObjectPrx proxy = session->_register(_servantManager, 0); + auto session = createSessionServant(sessionId); + auto proxy = session->_register(_servantManager, nullptr); - int timeout = 0; + chrono::seconds timeout = 0s; if(ctl) { try { if(_filters) { - Ice::IdentitySeq ids; - Ice::Identity queryId; - queryId.category = _database->getInstanceName(); - queryId.name = "Query"; - ids.push_back(queryId); - - _servantManager->setSessionControl(session, ctl, ids); + _servantManager->setSessionControl(session, ctl, { {"Query", _database->getInstanceName() } }); } - timeout = ctl->getSessionTimeout(); + timeout = chrono::seconds(ctl->getSessionTimeout()); } catch(const Ice::LocalException& e) { @@ -528,39 +505,40 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie } } - _reaper->add(new SessionReapable<AdminSessionI>(_database->getTraceLevels()->logger, session), timeout); - return Glacier2::SessionPrx::uncheckedCast(proxy); + _reaper->add(make_shared<SessionReapable<AdminSessionI>>(_database->getTraceLevels()->logger, session), timeout); + return Ice::uncheckedCast<Glacier2::SessionPrx>(proxy); } -AdminSessionIPtr +shared_ptr<AdminSessionI> AdminSessionFactory::createSessionServant(const string& id) { - return new AdminSessionI(id, _database, _timeout, _registry); + return make_shared<AdminSessionI>(id, _database, _timeout, _registry); } -const TraceLevelsPtr& +const shared_ptr<TraceLevels>& AdminSessionFactory::getTraceLevels() const { return _database->getTraceLevels(); } -AdminSessionManagerI::AdminSessionManagerI(const AdminSessionFactoryPtr& factory) : _factory(factory) +AdminSessionManagerI::AdminSessionManagerI(const shared_ptr<AdminSessionFactory>& factory) : _factory(factory) { } -Glacier2::SessionPrx -AdminSessionManagerI::create(const string& userId, const Glacier2::SessionControlPrx& ctl, const Ice::Current&) +shared_ptr<Glacier2::SessionPrx> +AdminSessionManagerI::create(string userId, shared_ptr<Glacier2::SessionControlPrx> ctl, + const Ice::Current&) { - return _factory->createGlacier2Session(userId, ctl); + return _factory->createGlacier2Session(move(userId), move(ctl)); } -AdminSSLSessionManagerI::AdminSSLSessionManagerI(const AdminSessionFactoryPtr& factory) : _factory(factory) +AdminSSLSessionManagerI::AdminSSLSessionManagerI(const shared_ptr<AdminSessionFactory>& factory) : _factory(factory) { } -Glacier2::SessionPrx -AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info, - const Glacier2::SessionControlPrx& ctl, +shared_ptr<Glacier2::SessionPrx> +AdminSSLSessionManagerI::create(Glacier2::SSLInfo info, + shared_ptr<Glacier2::SessionControlPrx> ctl, const Ice::Current&) { string userDN; @@ -568,7 +546,7 @@ AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info, { try { - IceSSL::CertificatePtr cert = IceSSL::Certificate::decode(info.certs[0]); + auto cert = IceSSL::Certificate::decode(info.certs[0]); userDN = cert->getSubjectDN(); } catch(const Ice::Exception& ex) @@ -580,5 +558,5 @@ AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info, } } - return _factory->createGlacier2Session(userDN, ctl); + return _factory->createGlacier2Session(move(userDN), move(ctl)); } |