diff options
author | Joe George <joe@zeroc.com> | 2021-01-28 16:26:44 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2021-02-01 16:59:30 -0500 |
commit | 92a6531e409f2691d82591e185a92299d415fc0f (patch) | |
tree | 60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceGrid/LocatorRegistryI.cpp | |
parent | Port Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff) | |
download | ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2 ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz ice-92a6531e409f2691d82591e185a92299d415fc0f.zip |
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/LocatorRegistryI.cpp')
-rw-r--r-- | cpp/src/IceGrid/LocatorRegistryI.cpp | 382 |
1 files changed, 183 insertions, 199 deletions
diff --git a/cpp/src/IceGrid/LocatorRegistryI.cpp b/cpp/src/IceGrid/LocatorRegistryI.cpp index 52e254e2ee0..e6a172f3f41 100644 --- a/cpp/src/IceGrid/LocatorRegistryI.cpp +++ b/cpp/src/IceGrid/LocatorRegistryI.cpp @@ -7,6 +7,7 @@ #include <IceGrid/ReplicaSessionManager.h> #include <IceGrid/Database.h> #include <IceGrid/Util.h> +#include <IceGrid/Internal.h> using namespace std; using namespace IceGrid; @@ -14,241 +15,175 @@ using namespace IceGrid; namespace IceGrid { -template<class AmdCB> -class SetDirectProxyCB : public LocatorRegistryI::AdapterSetDirectProxyCB +tuple<function<void()>, function<void(exception_ptr)>> +newSetDirectProxyCB(function<void()> responseCb, + function<void(exception_ptr)> exceptionCb, + const shared_ptr<TraceLevels>& traceLevels, const string& id, + const shared_ptr<Ice::ObjectPrx>& proxy) { -public: - - SetDirectProxyCB(const AmdCB& cb, - const TraceLevelsPtr& traceLevels, - const string& id, - const Ice::ObjectPrx& proxy) : - _cb(cb), _traceLevels(traceLevels), _id(id), _proxy(proxy) - { - } - - virtual void response() + auto response = [traceLevels, id, proxy, responseCb = move(responseCb)] () { - if(_traceLevels->locator > 1) + if(traceLevels->locator > 1) { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "registered adapter `" << _id << "' endpoints: `"; - out << (_proxy ? _proxy->ice_toString() : string("")) << "'"; + Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); + out << "registered adapter `" << id << "' endpoints: `"; + out << (proxy ? proxy->ice_toString() : string("")) << "'"; } - _cb->ice_response(); - } + responseCb(); + }; - virtual void exception(const ::Ice::Exception& ex) + auto exception = [traceLevels, id, exceptionCb = move(exceptionCb)](auto exptr) { - if(_traceLevels->locator > 1) + if(traceLevels->locator > 1) { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "failed to register adapter `" << _id << "' endpoints:\n" << ex; + try + { + rethrow_exception(exptr); + } + catch(const std::exception& ex) + { + Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); + out << "failed to register adapter `" << id << "' endpoints:\n" << ex; + } } try { - ex.ice_throw(); + rethrow_exception(exptr); } catch(const AdapterActiveException&) { - _cb->ice_exception(Ice::AdapterAlreadyActiveException()); + exceptionCb(make_exception_ptr(Ice::AdapterAlreadyActiveException())); return; } catch(const Ice::ObjectNotExistException&) { - _cb->ice_exception(Ice::AdapterNotFoundException()); // Expected if the adapter was destroyed. + exceptionCb(make_exception_ptr(Ice::AdapterNotFoundException())); // Expected if the adapter was destroyed). return; } catch(const Ice::Exception&) { - _cb->ice_exception(Ice::AdapterNotFoundException()); + exceptionCb(make_exception_ptr(Ice::AdapterNotFoundException())); return; } + }; - assert(false); - } - -private: - - const AmdCB _cb; - const TraceLevelsPtr _traceLevels; - const string _id; - const Ice::ObjectPrx _proxy; -}; - -template<class AmdCB> SetDirectProxyCB<AmdCB>* -newSetDirectProxyCB(const AmdCB& cb, const TraceLevelsPtr& traceLevels, const string& id, const Ice::ObjectPrx& p) -{ - return new SetDirectProxyCB<AmdCB>(cb, traceLevels, id, p); + return { move(response), move(exception) }; } -class ServerSetProcessCB : public virtual IceUtil::Shared +class SetAdapterDirectProxyCallback final : public SynchronizationCallback { public: - ServerSetProcessCB(const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr& cb, - const TraceLevelsPtr& traceLevels, - const string& id, - const Ice::ObjectPrx& proxy) : - _cb(cb), _traceLevels(traceLevels), _id(id), _proxy(proxy) - { - } - - virtual void ice_response() - { - if(_traceLevels->locator > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "registered server `" << _id << "' process proxy: `"; - out << (_proxy ? _proxy->ice_toString() : string("")) << "'"; - } - _cb->ice_response(); - } - - virtual void ice_exception(const ::Ice::Exception& ex) - { - if(_traceLevels->locator > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->locatorCat); - out << "failed to register server process proxy `" << _id << "':\n" << ex; - } - - try - { - ex.ice_throw(); - } - catch(const Ice::ObjectNotExistException&) - { - // Expected if the server was destroyed. - _cb->ice_exception(Ice::ServerNotFoundException()); - return; - } - catch(const Ice::LocalException&) - { - _cb->ice_exception(Ice::ServerNotFoundException()); - return; - } - - assert(false); - } - -private: - - const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr _cb; - const TraceLevelsPtr _traceLevels; - const string _id; - const Ice::ObjectPrx _proxy; -}; -typedef IceUtil::Handle<ServerSetProcessCB> ServerSetProcessCBPtr; - -class SetAdapterDirectProxyCallback : public SynchronizationCallback -{ -public: - - SetAdapterDirectProxyCallback(const LocatorRegistryIPtr& registry, - const LocatorRegistryI::AdapterSetDirectProxyCBPtr& amiCB, + SetAdapterDirectProxyCallback(const shared_ptr<LocatorRegistryI>& registry, + function<void()> response, + function<void(exception_ptr)> exception, const string& adapterId, const string& replicaGroupId, - const Ice::ObjectPrx& proxy) : - _registry(registry), _amiCB(amiCB), _adapterId(adapterId), _replicaGroupId(replicaGroupId), _proxy(proxy) + const shared_ptr<Ice::ObjectPrx>& proxy) : + _registry(registry), + _response(move(response)), + _exception(move(exception)), + _adapterId(adapterId), + _replicaGroupId(replicaGroupId), + _proxy(proxy) { } - virtual void - synchronized() + void + synchronized() override { try { - _registry->setAdapterDirectProxy(_amiCB, _adapterId, _replicaGroupId, _proxy); + _registry->setAdapterDirectProxy(_adapterId, _replicaGroupId, _proxy, _response, _exception); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - _amiCB->exception(ex); + _exception(current_exception()); } } - virtual void - synchronized(const Ice::Exception& ex) + void + synchronized(exception_ptr ex) override { - try - { - ex.ice_throw(); - } - catch(const Ice::Exception& e) - { - _amiCB->exception(e); - } + _exception(ex); } private: - const LocatorRegistryIPtr _registry; - const LocatorRegistryI::AdapterSetDirectProxyCBPtr _amiCB; + const shared_ptr<LocatorRegistryI> _registry; + const function<void()> _response; + const function<void(exception_ptr)> _exception; const string _adapterId; const string _replicaGroupId; - const Ice::ObjectPrx _proxy; + const shared_ptr<Ice::ObjectPrx> _proxy; }; -class SetServerProcessProxyCallback : public SynchronizationCallback +class SetServerProcessProxyCallback final : public SynchronizationCallback { public: - SetServerProcessProxyCallback(const LocatorRegistryIPtr& registry, - const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr& cb, + SetServerProcessProxyCallback(const shared_ptr<LocatorRegistryI>& registry, + const function<void()> response, + const function<void(exception_ptr)> exception, const string& id, - const Ice::ProcessPrx& proxy) : - _registry(registry), _cb(cb), _id(id), _proxy(proxy) + const shared_ptr<Ice::ProcessPrx>& proxy) : + _registry(registry), + _response(move(response)), + _exception(move(exception)), + _id(id), + _proxy(proxy) { } - virtual void - synchronized() + void + synchronized() override { try { - _registry->setServerProcessProxy_async(_cb, _id, _proxy, Ice::Current()); + _registry->setServerProcessProxyAsync(_id, _proxy, _response, _exception, Ice::Current()); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - _cb->ice_exception(ex); + _exception(current_exception()); } } - virtual void - synchronized(const Ice::Exception& sex) + void + synchronized(exception_ptr exptr) override { try { - sex.ice_throw(); + rethrow_exception(exptr); } catch(const ServerNotExistException&) { - _cb->ice_exception(Ice::ServerNotFoundException()); + _exception(make_exception_ptr(Ice::ServerNotFoundException())); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - const TraceLevelsPtr traceLevels = _registry->getTraceLevels(); + auto traceLevels = _registry->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); - out << "couldn't register server `" << _id << "' process proxy:\n" << toString(ex); + out << "couldn't register server `" << _id << "' process proxy:\n" << toString(current_exception()); } - _cb->ice_exception(Ice::ServerNotFoundException()); + _exception(make_exception_ptr(Ice::ServerNotFoundException())); } } private: - const LocatorRegistryIPtr _registry; - const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr _cb; + const shared_ptr<LocatorRegistryI> _registry; + const function<void()> _response; + const function<void(exception_ptr)> _exception; const string _id; - const Ice::ProcessPrx _proxy; + const shared_ptr<Ice::ProcessPrx> _proxy; }; }; -LocatorRegistryI::LocatorRegistryI(const DatabasePtr& database, +LocatorRegistryI::LocatorRegistryI(const shared_ptr<Database>& database, bool dynamicRegistration, bool master, ReplicaSessionManager& session) : @@ -260,36 +195,42 @@ LocatorRegistryI::LocatorRegistryI(const DatabasePtr& database, } void -LocatorRegistryI::setAdapterDirectProxy_async(const Ice::AMD_LocatorRegistry_setAdapterDirectProxyPtr& cb, - const string& adapterId, - const Ice::ObjectPrx& proxy, +LocatorRegistryI::setAdapterDirectProxyAsync(string adapterId, shared_ptr<Ice::ObjectPrx> proxy, + function<void()> response, + function<void(exception_ptr)> exception, const Ice::Current&) { - setAdapterDirectProxy(newSetDirectProxyCB(cb, _database->getTraceLevels(), adapterId, proxy), - adapterId, + auto [responseCb, exceptionCb] = newSetDirectProxyCB(move(response), move(exception), _database->getTraceLevels(), + adapterId, proxy); + + setAdapterDirectProxy(adapterId, "", - proxy); + proxy, + move(responseCb), + move(exceptionCb)); } void -LocatorRegistryI::setReplicatedAdapterDirectProxy_async( - const Ice::AMD_LocatorRegistry_setReplicatedAdapterDirectProxyPtr& cb, - const string& adapterId, - const string& replicaGroupId, - const Ice::ObjectPrx& proxy, - const Ice::Current&) +LocatorRegistryI::setReplicatedAdapterDirectProxyAsync(string adapterId, string replicaGroupId, + shared_ptr<Ice::ObjectPrx> proxy, + function<void()> response, + function<void(exception_ptr)> exception, + const Ice::Current&) { - setAdapterDirectProxy(newSetDirectProxyCB(cb, _database->getTraceLevels(), adapterId, proxy), - adapterId, + auto [responseCb, exceptionCb] = newSetDirectProxyCB(move(response), move(exception), _database->getTraceLevels(), + adapterId, proxy); + setAdapterDirectProxy(adapterId, replicaGroupId, - proxy); + proxy, + move(responseCb), + move(exceptionCb)); } void -LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr& cb, - const string& id, - const Ice::ProcessPrx& proxy, - const Ice::Current&) +LocatorRegistryI::setServerProcessProxyAsync(string id, shared_ptr<Ice::ProcessPrx> proxy, + function<void()> response, + function<void(exception_ptr)> exception, + const Ice::Current&) { try { @@ -301,7 +242,7 @@ LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_set // is needed for the session activation mode for cases where // the server is released during the server startup. // - ServerPrx server; + shared_ptr<ServerPrx> server; while(true) { try @@ -311,46 +252,86 @@ LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_set } catch(const SynchronizationException&) { - if(_database->getServer(id)->addSyncCallback(new SetServerProcessProxyCallback(this, cb, id, proxy))) + auto cb = make_shared<SetServerProcessProxyCallback>(shared_from_this(), response, + exception, id, proxy); + if(_database->getServer(id)->addSyncCallback(move(cb))) { return; } } } - server->begin_setProcess(proxy, IceGrid::newCallback_Server_setProcess( - new ServerSetProcessCB(cb, _database->getTraceLevels(), id, proxy), - &ServerSetProcessCB::ice_response, - &ServerSetProcessCB::ice_exception)); + server->setProcessAsync(proxy, + [id, proxy, response, traceLevels = _database->getTraceLevels()] + { + if(traceLevels->locator > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); + out << "registered server `" << id << "' process proxy: `"; + out << (proxy ? proxy->ice_toString() : string("")) << "'"; + } + response(); + }, + [id, exception, traceLevels = _database->getTraceLevels()] (exception_ptr exptr) + { + if(traceLevels->locator > 1) + { + try + { + rethrow_exception(exptr); + } + catch(const std::exception& ex) + { + Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); + out << "failed to register server process proxy `" << id << "':\n" << ex; + } + } + + try + { + rethrow_exception(exptr); + } + catch(const Ice::ObjectNotExistException&) + { + // Expected if the server was destroyed. + exception(make_exception_ptr(Ice::ServerNotFoundException())); + return; + } + catch(const Ice::LocalException&) + { + exception(make_exception_ptr(Ice::ServerNotFoundException())); + return; + } + }); } catch(const ServerNotExistException&) { - cb->ice_exception(Ice::ServerNotFoundException()); + exception(make_exception_ptr(Ice::ServerNotFoundException())); } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); + auto traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); - out << "couldn't register server `" << id << "' process proxy:\n" << toString(ex); + out << "couldn't register server `" << id << "' process proxy:\n" << toString(current_exception()); } - cb->ice_exception(Ice::ServerNotFoundException()); + exception(make_exception_ptr(Ice::ServerNotFoundException())); } } void -LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirectProxyCBPtr& amiCB, - const string& adapterId, - const string& replicaGroupId, - const Ice::ObjectPrx& proxy) +LocatorRegistryI::setAdapterDirectProxy(string adapterId, string replicaGroupId, + shared_ptr<Ice::ObjectPrx> proxy, + function<void()> response, + function<void(exception_ptr)> exception) { // // Ignore request with empty adapter id. // if(adapterId.empty()) { - amiCB->response(); + response(); return; } @@ -362,7 +343,7 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect // // Get the adapter from the registry and set its direct proxy. // - AdapterPrx adapter; + shared_ptr<AdapterPrx> adapter; while(true) { try @@ -376,17 +357,19 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect } catch(const SynchronizationException&) { - if(_database->addAdapterSyncCallback(adapterId, new SetAdapterDirectProxyCallback( - this, amiCB, adapterId, replicaGroupId, proxy))) + if(_database->addAdapterSyncCallback(adapterId, + make_shared<SetAdapterDirectProxyCallback>(shared_from_this(), + response, exception, + adapterId, + replicaGroupId, + proxy))) { return; } } } - adapter->begin_setDirectProxy(proxy, IceGrid::newCallback_Adapter_setDirectProxy(amiCB, - &LocatorRegistryI::AdapterSetDirectProxyCB::response, - &LocatorRegistryI::AdapterSetDirectProxyCB::exception)); + adapter->setDirectProxyAsync(proxy, response, exception); return; } catch(const AdapterNotExistException&) @@ -396,13 +379,13 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect throw Ice::AdapterNotFoundException(); } } - catch(const Ice::Exception& ex) + catch(const Ice::Exception&) { - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); + auto traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); - out << "couldn't register adapter `" << adapterId << "' endpoints:\n" << toString(ex); + out << "couldn't register adapter `" << adapterId << "' endpoints:\n" << toString(current_exception()); } throw Ice::AdapterNotFoundException(); } @@ -413,7 +396,7 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect try { _database->setAdapterDirectProxy(adapterId, replicaGroupId, proxy); - amiCB->response(); + response(); return; } catch(const AdapterExistsException&) @@ -422,7 +405,7 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect } catch(const DeploymentException& ex) { - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); + auto traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); @@ -433,10 +416,10 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect } else { - ReplicaSessionPrx session = _session.getSession(); + auto session = _session.getSession(); if(!session) { - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); + auto traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); @@ -449,7 +432,7 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect try { session->setAdapterDirectProxy(adapterId, replicaGroupId, proxy); - amiCB->response(); + response(); return; } catch(const AdapterExistsException&) @@ -460,13 +443,14 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect { throw Ice::AdapterNotFoundException(); // Dynamic registration not allowed on the master. } - catch(const Ice::LocalException& ex) + catch(const Ice::LocalException&) { - const TraceLevelsPtr traceLevels = _database->getTraceLevels(); + auto traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 0) { Ice::Trace out(traceLevels->logger, traceLevels->locatorCat); - out << "couldn't register adapter `" << adapterId << "' endpoints with master:\n" << toString(ex); + out << "couldn't register adapter `" << adapterId << "' endpoints with master:\n" + << toString(current_exception()); } throw Ice::AdapterNotFoundException(); } @@ -476,7 +460,7 @@ LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirect throw Ice::AdapterNotFoundException(); } -const TraceLevelsPtr& +const shared_ptr<TraceLevels>& LocatorRegistryI::getTraceLevels() const { return _database->getTraceLevels(); |