diff options
author | Jose <jose@zeroc.com> | 2012-08-14 00:05:17 +0200 |
---|---|---|
committer | Jose <jose@zeroc.com> | 2012-08-14 00:05:17 +0200 |
commit | b3a890faf5841289df3299bac6ada7120b324c16 (patch) | |
tree | eacdb167a4d1e6310bfd49827f90fb2ddf2c144a /cpp/src | |
parent | missing filter properties (diff) | |
download | ice-b3a890faf5841289df3299bac6ada7120b324c16.tar.bz2 ice-b3a890faf5841289df3299bac6ada7120b324c16.tar.xz ice-b3a890faf5841289df3299bac6ada7120b324c16.zip |
ICE-4809 - AMI metadata in Ice services internal definitions
Diffstat (limited to 'cpp/src')
35 files changed, 744 insertions, 1260 deletions
diff --git a/cpp/src/Glacier2/Blobject.cpp b/cpp/src/Glacier2/Blobject.cpp index ac0571fd6de..c64e3def10e 100755 --- a/cpp/src/Glacier2/Blobject.cpp +++ b/cpp/src/Glacier2/Blobject.cpp @@ -26,122 +26,10 @@ const string clientTraceRequest = "Glacier2.Client.Trace.Request"; const string serverTraceOverride = "Glacier2.Server.Trace.Override"; const string clientTraceOverride = "Glacier2.Client.Trace.Override"; -class AMI_Array_Object_ice_invokeTwowayI : public AMI_Array_Object_ice_invoke -{ -public: - - AMI_Array_Object_ice_invokeTwowayI(const AMD_Object_ice_invokePtr& amdCB, - const InstancePtr& instance, - const ConnectionPtr& connection) : - _amdCB(amdCB), - _instance(instance), - _connection(connection) - { - } - - virtual void - ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) - { - _amdCB->ice_response(ok, outParams); - } - - virtual void - ice_exception(const Exception& ex) - { - // - // If the connection has been lost, destroy the session. - // - if(_connection) - { - if(dynamic_cast<const Ice::SocketException*>(&ex) || - dynamic_cast<const Ice::TimeoutException*>(&ex) || - dynamic_cast<const Ice::ProtocolException*>(&ex)) - { - try - { - _instance->sessionRouter()->destroySession(_connection); - } - catch(const Exception&) - { - } - } - } - - _amdCB->ice_exception(ex); - } - -private: - - const AMD_Object_ice_invokePtr _amdCB; - const InstancePtr _instance; - const ConnectionPtr _connection; -}; - -class AMI_Array_Object_ice_invokeOnewayI : public AMI_Array_Object_ice_invoke, public Ice::AMISentCallback -{ -public: - - AMI_Array_Object_ice_invokeOnewayI(const AMD_Object_ice_invokePtr& amdCB, - const InstancePtr& instance, - const ConnectionPtr& connection) : - _amdCB(amdCB), - _instance(instance), - _connection(connection) - { - } - - virtual void - ice_response(bool, const pair<const Byte*, const Byte*>&) - { - assert(false); - } - - virtual void - ice_sent() - { -#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); -#else - _amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); -#endif - } - - virtual void - ice_exception(const Exception& ex) - { - // - // If the connection has been lost, destroy the session. - // - if(_connection) - { - if(dynamic_cast<const Ice::SocketException*>(&ex) || - dynamic_cast<const Ice::TimeoutException*>(&ex) || - dynamic_cast<const Ice::ProtocolException*>(&ex)) - { - try - { - _instance->sessionRouter()->destroySession(_connection); - } - catch(const Exception&) - { - } - } - } - - _amdCB->ice_exception(ex); - } - -private: - - const AMD_Object_ice_invokePtr _amdCB; - const InstancePtr _instance; - const ConnectionPtr _connection; -}; - } Glacier2::Blobject::Blobject(const InstancePtr& instance, const ConnectionPtr& reverseConnection, - const Ice::Context& context) : + const Context& context) : _instance(instance), _reverseConnection(reverseConnection), _forwardContext(_reverseConnection ? @@ -180,8 +68,52 @@ Glacier2::Blobject::destroy() } void +Glacier2::Blobject::invokeResponse(bool ok, const pair<const Byte*, const Byte*>& outParams, + const InvokeCookiePtr& cookie) +{ + cookie->cb()->ice_response(ok, outParams); +} + +void +Glacier2::Blobject::invokeSent(bool sent, const InvokeCookiePtr& cookie) +{ + if(sent) + { +#if (defined(_MSC_VER) && (_MSC_VER >= 1600)) + cookie->cb()->ice_response(true, pair<const Byte*, const Byte*>(nullptr, nullptr)); +#else + cookie->cb()->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); +#endif + } +} + +void +Glacier2::Blobject::invokeException(const Exception& ex, const InvokeCookiePtr& cookie) +{ + // + // If the connection has been lost, destroy the session. + // + if(_reverseConnection) + { + if(dynamic_cast<const SocketException*>(&ex) || + dynamic_cast<const TimeoutException*>(&ex) || + dynamic_cast<const ProtocolException*>(&ex)) + { + try + { + _instance->sessionRouter()->destroySession(_reverseConnection); + } + catch(const Exception&) + { + } + } + } + cookie->cb()->ice_exception(ex); +} + +void Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amdCB, - const std::pair<const Ice::Byte*, const Ice::Byte*>& inParams, const Current& current) + const std::pair<const Byte*, const Byte*>& inParams, const Current& current) { // // Set the correct facet on the proxy. @@ -398,50 +330,44 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd try { - AMI_Array_Object_ice_invokePtr amiCB; - Ice::AMISentCallback* sentCB = 0; + Callback_Object_ice_invokePtr amiCB; if(proxy->ice_isTwoway()) { - amiCB = new AMI_Array_Object_ice_invokeTwowayI(amdCB, _instance, _reverseConnection); + amiCB = newCallback_Object_ice_invoke(this, &Blobject::invokeResponse, &Blobject::invokeException); } else { - AMI_Array_Object_ice_invokeOnewayI* cb = - new AMI_Array_Object_ice_invokeOnewayI(amdCB, _instance, _reverseConnection); - amiCB = cb; - sentCB = cb; + amiCB = newCallback_Object_ice_invoke(this, &Blobject::invokeException, &Blobject::invokeSent); } - bool sent; if(_forwardContext) { if(_context.size() > 0) { - Ice::Context ctx = current.ctx; + Context ctx = current.ctx; ctx.insert(_context.begin(), _context.end()); - sent = proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams, ctx); + proxy->begin_ice_invoke(current.operation, current.mode, inParams, ctx, amiCB, + new InvokeCookie(amdCB)); } else { - sent = proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams, current.ctx); + proxy->begin_ice_invoke(current.operation, current.mode, inParams, current.ctx, amiCB, + new InvokeCookie(amdCB)); } } else { if(_context.size() > 0) { - sent = proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams, _context); + proxy->begin_ice_invoke(current.operation, current.mode, inParams, _context, amiCB, + new InvokeCookie(amdCB)); } else { - sent = proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams); + proxy->begin_ice_invoke(current.operation, current.mode, inParams, amiCB, + new InvokeCookie(amdCB)); } } - - if(sent && sentCB) - { - sentCB->ice_sent(); - } } catch(const LocalException& ex) { diff --git a/cpp/src/Glacier2/Blobject.h b/cpp/src/Glacier2/Blobject.h index 05e6c63fe3b..dff1fbe91ca 100644 --- a/cpp/src/Glacier2/Blobject.h +++ b/cpp/src/Glacier2/Blobject.h @@ -25,6 +25,31 @@ public: void destroy(); + class InvokeCookie : public Ice::LocalObject + { + public: + + InvokeCookie(const Ice::AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + Ice::AMD_Object_ice_invokePtr cb() + { + return _cb; + } + + private: + + Ice::AMD_Object_ice_invokePtr _cb; + }; + typedef IceUtil::Handle<InvokeCookie> InvokeCookiePtr; + + void invokeResponse(bool, const std::pair<const Ice::Byte*, const Ice::Byte*>&, + const InvokeCookiePtr&); + void invokeSent(bool, const InvokeCookiePtr&); + void invokeException(const Ice::Exception&, const InvokeCookiePtr&); + protected: void invoke(Ice::ObjectPrx&, const Ice::AMD_Object_ice_invokePtr&, diff --git a/cpp/src/Glacier2/RouterI.cpp b/cpp/src/Glacier2/RouterI.cpp index 058b1bf7104..b0624578d41 100644 --- a/cpp/src/Glacier2/RouterI.cpp +++ b/cpp/src/Glacier2/RouterI.cpp @@ -65,7 +65,7 @@ Glacier2::RouterI::~RouterI() } void -Glacier2::RouterI::destroy(const AMI_Session_destroyPtr& amiCB) +Glacier2::RouterI::destroy(const Callback_Session_destroyPtr& asyncCB) { if(_session) { @@ -91,11 +91,11 @@ Glacier2::RouterI::destroy(const AMI_Session_destroyPtr& amiCB) if(_context.size() > 0) { - _session->destroy_async(amiCB, _context); + _session->begin_destroy(_context, asyncCB); } else { - _session->destroy_async(amiCB); + _session->begin_destroy(asyncCB); } } diff --git a/cpp/src/Glacier2/RouterI.h b/cpp/src/Glacier2/RouterI.h index 619ef513b48..30ddb91024e 100644 --- a/cpp/src/Glacier2/RouterI.h +++ b/cpp/src/Glacier2/RouterI.h @@ -32,7 +32,7 @@ public: const FilterManagerPtr&, const Ice::Context&); virtual ~RouterI(); - void destroy(const AMI_Session_destroyPtr&); + void destroy(const Callback_Session_destroyPtr&); virtual Ice::ObjectPrx getClientProxy(const Ice::Current& = Ice::Current()) const; virtual Ice::ObjectPrx getServerProxy(const Ice::Current& = Ice::Current()) const; diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp index 0764521248b..6f4546a5463 100644 --- a/cpp/src/Glacier2/SessionRouterI.cpp +++ b/cpp/src/Glacier2/SessionRouterI.cpp @@ -21,6 +21,7 @@ using namespace std; using namespace Ice; +using namespace Glacier2; namespace Glacier2 { @@ -146,48 +147,33 @@ public: _password(password) { } + - class CheckPermissionsCB : public AMI_PermissionsVerifier_checkPermissions + void + checkPermissionsResponse(bool ok, const string& reason) { - public: - - CheckPermissionsCB(const UserPasswordCreateSessionPtr& session, bool hasSessionManager) : - _session(session), - _hasSessionManager(hasSessionManager) + if(ok) { + authorized(_sessionRouter->_sessionManager); } - - virtual void - ice_response(bool ok, const string& reason) + else { - if(ok) - { - _session->authorized(_hasSessionManager); - } - else - { - _session->exception(PermissionDeniedException(reason.empty() ? string("permission denied") : reason)); - } + exception(PermissionDeniedException(reason.empty() ? string("permission denied") : reason)); } + } - virtual void - ice_exception(const Ice::Exception& ex) + void + checkPermissionsException(const Ice::Exception& ex) + { + if(dynamic_cast<const CollocationOptimizationException*>(&ex)) { - if(dynamic_cast<const CollocationOptimizationException*>(&ex)) - { - _session->authorizeCollocated(); - } - else - { - _session->unexpectedAuthorizeException(ex); - } + authorizeCollocated(); } - - private: - - const UserPasswordCreateSessionPtr _session; - const bool _hasSessionManager; - }; + else + { + unexpectedAuthorizeException(ex); + } + } virtual void authorize() @@ -196,8 +182,11 @@ public: Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - AMI_PermissionsVerifier_checkPermissionsPtr cb = new CheckPermissionsCB(this, _sessionRouter->_sessionManager); - _sessionRouter->_verifier->checkPermissions_async(cb, _user, _password, ctx); + + _sessionRouter->_verifier->begin_checkPermissions(_user, _password, ctx, + newCallback_PermissionsVerifier_checkPermissions(this, + &UserPasswordCreateSession::checkPermissionsResponse, + &UserPasswordCreateSession::checkPermissionsException)); } virtual void @@ -229,48 +218,16 @@ public: return FilterManager::create(_instance, _user, true); } - class CreateCB : public AMI_SessionManager_create - { - public: - - CreateCB(const CreateSessionPtr& session) : _session(session) - { - } - - virtual void - ice_response(const SessionPrx& session) - { - _session->sessionCreated(session); - } - - virtual void - ice_exception(const Ice::Exception& ex) - { - try - { - ex.ice_throw(); - } - catch(const CannotCreateSessionException& ex) - { - _session->exception(ex); - } - catch(const Ice::Exception& ex) - { - _session->unexpectedCreateSessionException(ex); - } - } - - private: - - const CreateSessionPtr _session; - }; - virtual void createSession() { Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - _sessionRouter->_sessionManager->create_async(new CreateCB(this), _user, _control, ctx); + _sessionRouter->_sessionManager->begin_create(_user, _control, ctx, + newCallback_SessionManager_create( + static_cast<CreateSession*>(this), + &CreateSession::sessionCreated, + &CreateSession::createException)); } virtual void @@ -302,49 +259,32 @@ public: _sslInfo(sslInfo) { } - - class AuthorizeCB : public AMI_SSLPermissionsVerifier_authorize + + void + authorizeResponse(bool ok, const string& reason) { - public: - - AuthorizeCB(const SSLCreateSessionPtr& session, bool hasSessionManager) : - _session(session), - _hasSessionManager(hasSessionManager) + if(ok) { + authorized(_sessionRouter->_sslSessionManager); } - - virtual void - ice_response(bool ok, const string& reason) + else { - if(ok) - { - _session->authorized(_hasSessionManager); - } - else - { - _session->exception(PermissionDeniedException(reason.empty() ? string("permission denied") - : reason)); - } + exception(PermissionDeniedException(reason.empty() ? string("permission denied") : reason)); } + } - virtual void - ice_exception(const Ice::Exception& ex) + void + authorizeException(const Ice::Exception& ex) + { + if(dynamic_cast<const CollocationOptimizationException*>(&ex)) { - if(dynamic_cast<const CollocationOptimizationException*>(&ex)) - { - _session->authorizeCollocated(); - } - else - { - _session->unexpectedAuthorizeException(ex); - } + authorizeCollocated(); } - - private: - - const SSLCreateSessionPtr _session; - const bool _hasSessionManager; - }; + else + { + unexpectedAuthorizeException(ex); + } + } virtual void authorize() @@ -353,9 +293,10 @@ public: Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - - AMI_SSLPermissionsVerifier_authorizePtr cb = new AuthorizeCB(this, _sessionRouter->_sslSessionManager); - _sessionRouter->_sslVerifier->authorize_async(cb, _sslInfo, ctx); + _sessionRouter->_sslVerifier->begin_authorize(_sslInfo, ctx, + newCallback_SSLPermissionsVerifier_authorize(this, + &SSLCreateSession::authorizeResponse, + &SSLCreateSession::authorizeException)); } virtual void @@ -387,48 +328,16 @@ public: return FilterManager::create(_instance, _user, false); } - class CreateCB : public AMI_SSLSessionManager_create - { - public: - - CreateCB(const CreateSessionPtr& session) : _session(session) - { - } - - virtual void - ice_response(const SessionPrx& session) - { - _session->sessionCreated(session); - } - - virtual void - ice_exception(const Ice::Exception& ex) - { - try - { - ex.ice_throw(); - } - catch(const CannotCreateSessionException& ex) - { - _session->exception(ex); - } - catch(const Ice::Exception& ex) - { - _session->unexpectedCreateSessionException(ex); - } - } - - private: - - const CreateSessionPtr _session; - }; - virtual void createSession() { Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - _sessionRouter->_sslSessionManager->create_async(new CreateCB(this), _sslInfo, _control, ctx); + _sessionRouter->_sslSessionManager->begin_create(_sslInfo, _control, ctx, + newCallback_SSLSessionManager_create( + static_cast<CreateSession*>(this), + &CreateSession::sessionCreated, + &CreateSession::createException)); } virtual void @@ -449,44 +358,9 @@ private: const SSLInfo _sslInfo; }; -class DestroyCB : public AMI_Session_destroy -{ -public: - - DestroyCB(int traceLevel, const LoggerPtr& logger) - { - if(traceLevel > 0) - { - _logger = logger; - } - } - - virtual void - ice_response() - { - } - - virtual void - ice_exception(const Ice::Exception& ex) - { - if(_logger) - { - Trace out(_logger, "Glacier2"); - out << "exception while destroying session\n" << ex; - } - } - -private: - - LoggerPtr _logger; -}; - } -using namespace Glacier2; - -Glacier2::CreateSession::CreateSession(const SessionRouterIPtr& sessionRouter, const string& user, - const Ice::Current& current) : +CreateSession::CreateSession(const SessionRouterIPtr& sessionRouter, const string& user, const Ice::Current& current) : _instance(sessionRouter->_instance), _sessionRouter(sessionRouter), _user(user), @@ -523,7 +397,7 @@ Glacier2::CreateSession::CreateSession(const SessionRouterIPtr& sessionRouter, c } void -Glacier2::CreateSession::create() +CreateSession::create() { try { @@ -539,13 +413,13 @@ Glacier2::CreateSession::create() } void -Glacier2::CreateSession::addPendingCallback(const CreateSessionPtr& callback) +CreateSession::addPendingCallback(const CreateSessionPtr& callback) { _pendingCallbacks.push_back(callback); } void -Glacier2::CreateSession::authorized(bool createSession) +CreateSession::authorized(bool createSession) { // // Create the filter manager now as it's required for the session control object. @@ -572,7 +446,7 @@ Glacier2::CreateSession::authorized(bool createSession) } void -Glacier2::CreateSession::unexpectedAuthorizeException(const Ice::Exception& ex) +CreateSession::unexpectedAuthorizeException(const Ice::Exception& ex) { if(_sessionRouter->sessionTraceLevel() >= 1) { @@ -583,7 +457,24 @@ Glacier2::CreateSession::unexpectedAuthorizeException(const Ice::Exception& ex) } void -Glacier2::CreateSession::sessionCreated(const SessionPrx& session) +CreateSession::createException(const Ice::Exception& ex) +{ + try + { + ex.ice_throw(); + } + catch(const CannotCreateSessionException& ex) + { + exception(ex); + } + catch(const Ice::Exception& ex) + { + unexpectedCreateSessionException(ex); + } +} + +void +CreateSession::sessionCreated(const SessionPrx& session) { // // Create the session router object. @@ -633,7 +524,7 @@ Glacier2::CreateSession::sessionCreated(const SessionPrx& session) { if(session) { - session->destroy_async(new DestroyCB(0, 0)); + session->begin_destroy(); } unexpectedCreateSessionException(ex); return; @@ -659,7 +550,7 @@ Glacier2::CreateSession::sessionCreated(const SessionPrx& session) } void -Glacier2::CreateSession::unexpectedCreateSessionException(const Ice::Exception& ex) +CreateSession::unexpectedCreateSessionException(const Ice::Exception& ex) { if(_sessionRouter->sessionTraceLevel() >= 1) { @@ -670,7 +561,7 @@ Glacier2::CreateSession::unexpectedCreateSessionException(const Ice::Exception& } void -Glacier2::CreateSession::exception(const Ice::Exception& ex) +CreateSession::exception(const Ice::Exception& ex) { try { @@ -699,11 +590,11 @@ Glacier2::CreateSession::exception(const Ice::Exception& ex) } } -Glacier2::SessionRouterI::SessionRouterI(const InstancePtr& instance, - const PermissionsVerifierPrx& verifier, - const SessionManagerPrx& sessionManager, - const SSLPermissionsVerifierPrx& sslVerifier, - const SSLSessionManagerPrx& sslSessionManager) : +SessionRouterI::SessionRouterI(const InstancePtr& instance, + const PermissionsVerifierPrx& verifier, + const SessionManagerPrx& sessionManager, + const SSLPermissionsVerifierPrx& sslVerifier, + const SSLSessionManagerPrx& sslSessionManager) : _instance(instance), _sessionTraceLevel(_instance->properties()->getPropertyAsInt("Glacier2.Trace.Session")), _rejectTraceLevel(_instance->properties()->getPropertyAsInt("Glacier2.Client.Trace.Reject")), @@ -716,6 +607,7 @@ Glacier2::SessionRouterI::SessionRouterI(const InstancePtr& instance, _routersByConnectionHint(_routersByConnection.end()), _routersByCategoryHint(_routersByCategory.end()), _sessionPingCallback(newCallback_Object_ice_ping(this, &SessionRouterI::sessionPingException)), + _sessionDestroyCallback(newCallback_Session_destroy(this, &SessionRouterI::sessionDestroyException)), _destroy(false) { @@ -772,7 +664,7 @@ Glacier2::SessionRouterI::SessionRouterI(const InstancePtr& instance, _instance->setSessionRouter(this); } -Glacier2::SessionRouterI::~SessionRouterI() +SessionRouterI::~SessionRouterI() { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -784,11 +676,11 @@ Glacier2::SessionRouterI::~SessionRouterI() } void -Glacier2::SessionRouterI::destroy() +SessionRouterI::destroy() { map<ConnectionPtr, RouterIPtr> routers; SessionThreadPtr sessionThread; - + Callback_Session_destroyPtr destroyCallback; { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -805,6 +697,7 @@ Glacier2::SessionRouterI::destroy() sessionThread = _sessionThread; _sessionThread = 0; + swap(destroyCallback, _sessionDestroyCallback); // Break cyclic reference count. _sessionPingCallback = 0; // Break cyclic reference count. } @@ -814,7 +707,7 @@ Glacier2::SessionRouterI::destroy() // for(map<ConnectionPtr, RouterIPtr>::iterator p = routers.begin(); p != routers.end(); ++p) { - p->second->destroy(new DestroyCB(_sessionTraceLevel, _instance->logger())); + p->second->destroy(destroyCallback); } if(sessionThread) @@ -825,19 +718,19 @@ Glacier2::SessionRouterI::destroy() } ObjectPrx -Glacier2::SessionRouterI::getClientProxy(const Current& current) const +SessionRouterI::getClientProxy(const Current& current) const { return getRouter(current.con, current.id)->getClientProxy(current); // Forward to the per-client router. } ObjectPrx -Glacier2::SessionRouterI::getServerProxy(const Current& current) const +SessionRouterI::getServerProxy(const Current& current) const { return getRouter(current.con, current.id)->getServerProxy(current); // Forward to the per-client router. } void -Glacier2::SessionRouterI::addProxy(const ObjectPrx& proxy, const Current& current) +SessionRouterI::addProxy(const ObjectPrx& proxy, const Current& current) { ObjectProxySeq seq; seq.push_back(proxy); @@ -845,7 +738,7 @@ Glacier2::SessionRouterI::addProxy(const ObjectPrx& proxy, const Current& curren } ObjectProxySeq -Glacier2::SessionRouterI::addProxies(const ObjectProxySeq& proxies, const Current& current) +SessionRouterI::addProxies(const ObjectProxySeq& proxies, const Current& current) { // // Forward to the per-client router. @@ -854,7 +747,7 @@ Glacier2::SessionRouterI::addProxies(const ObjectProxySeq& proxies, const Curren } string -Glacier2::SessionRouterI::getCategoryForClient(const Ice::Current& current) const +SessionRouterI::getCategoryForClient(const Ice::Current& current) const { // Forward to the per-client router. if(_instance->serverObjectAdapter()) @@ -868,7 +761,7 @@ Glacier2::SessionRouterI::getCategoryForClient(const Ice::Current& current) cons } void -Glacier2::SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& amdCB, const std::string& userId, +SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& amdCB, const std::string& userId, const std::string& password, const Current& current) { if(!_verifier) @@ -882,7 +775,7 @@ Glacier2::SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& } void -Glacier2::SessionRouterI::createSessionFromSecureConnection_async( +SessionRouterI::createSessionFromSecureConnection_async( const AMD_Router_createSessionFromSecureConnectionPtr& amdCB, const Current& current) { if(!_sslVerifier) @@ -932,13 +825,13 @@ Glacier2::SessionRouterI::createSessionFromSecureConnection_async( } void -Glacier2::SessionRouterI::destroySession(const Current& current) +SessionRouterI::destroySession(const Current& current) { destroySession(current.con); } void -Glacier2::SessionRouterI::refreshSession(const Ice::Current& current) +SessionRouterI::refreshSession(const Ice::Current& current) { RouterIPtr router = getRouter(current.con, current.id, false); if(!router) @@ -951,7 +844,7 @@ Glacier2::SessionRouterI::refreshSession(const Ice::Current& current) // Ping the session to ensure it does not timeout. // assert(_sessionPingCallback); - Glacier2::SessionPrx session = router->getSession(); + SessionPrx session = router->getSession(); if(session) { session->begin_ice_ping(_sessionPingCallback, current.con); @@ -959,7 +852,7 @@ Glacier2::SessionRouterI::refreshSession(const Ice::Current& current) } void -Glacier2::SessionRouterI::destroySession(const ConnectionPtr& connection) +SessionRouterI::destroySession(const ConnectionPtr& connection) { RouterIPtr router; @@ -1010,18 +903,17 @@ Glacier2::SessionRouterI::destroySession(const ConnectionPtr& connection) Trace out(_instance->logger(), "Glacier2"); out << "destroying session\n" << router->toString(); } - - router->destroy(new DestroyCB(_sessionTraceLevel, _instance->logger())); + router->destroy(_sessionDestroyCallback); } Ice::Long -Glacier2::SessionRouterI::getSessionTimeout(const Ice::Current&) const +SessionRouterI::getSessionTimeout(const Ice::Current&) const { return _sessionTimeout.toSeconds(); } RouterIPtr -Glacier2::SessionRouterI::getRouter(const ConnectionPtr& connection, const Ice::Identity& id, bool close) const +SessionRouterI::getRouter(const ConnectionPtr& connection, const Ice::Identity& id, bool close) const { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -1059,7 +951,7 @@ Glacier2::SessionRouterI::getRouter(const ConnectionPtr& connection, const Ice:: } RouterIPtr -Glacier2::SessionRouterI::getRouter(const string& category) const +SessionRouterI::getRouter(const string& category) const { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -1089,7 +981,7 @@ Glacier2::SessionRouterI::getRouter(const string& category) const } void -Glacier2::SessionRouterI::expireSessions() +SessionRouterI::expireSessions() { vector<RouterIPtr> routers; @@ -1142,19 +1034,29 @@ Glacier2::SessionRouterI::expireSessions() Trace out(_instance->logger(), "Glacier2"); out << "expiring session\n" << (*p)->toString(); } - - (*p)->destroy(new DestroyCB(_sessionTraceLevel, _instance->logger())); + (*p)->destroy(_sessionDestroyCallback); } } void -Glacier2::SessionRouterI::sessionPingException(const Ice::Exception&, const Ice::ConnectionPtr& con) +SessionRouterI::sessionPingException(const Ice::Exception&, const Ice::ConnectionPtr& con) { destroySession(con); } +void +SessionRouterI::sessionDestroyException(const Ice::Exception& ex) +{ + if(_sessionTraceLevel > 0) + { + Trace out(_instance->logger(), "Glacier2"); + out << "exception while destroying session\n" << ex; + } +} + + bool -Glacier2::SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionPtr& connection) +SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionPtr& connection) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -1211,7 +1113,7 @@ Glacier2::SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const C } void -Glacier2::SessionRouterI::finishCreateSession(const ConnectionPtr& connection, const RouterIPtr& router) +SessionRouterI::finishCreateSession(const ConnectionPtr& connection, const RouterIPtr& router) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -1229,7 +1131,7 @@ Glacier2::SessionRouterI::finishCreateSession(const ConnectionPtr& connection, c if(_destroy) { - router->destroy(new DestroyCB(0, 0)); + router->destroy(_sessionDestroyCallback); CannotCreateSessionException exc; exc.reason = "router is shutting down"; @@ -1256,7 +1158,7 @@ Glacier2::SessionRouterI::finishCreateSession(const ConnectionPtr& connection, c } } -Glacier2::SessionRouterI::SessionThread::SessionThread(const SessionRouterIPtr& sessionRouter, +SessionRouterI::SessionThread::SessionThread(const SessionRouterIPtr& sessionRouter, const IceUtil::Time& sessionTimeout) : IceUtil::Thread("Glacier2 session thread"), _sessionRouter(sessionRouter), @@ -1264,13 +1166,13 @@ Glacier2::SessionRouterI::SessionThread::SessionThread(const SessionRouterIPtr& { } -Glacier2::SessionRouterI::SessionThread::~SessionThread() +SessionRouterI::SessionThread::~SessionThread() { assert(!_sessionRouter); } void -Glacier2::SessionRouterI::SessionThread::destroy() +SessionRouterI::SessionThread::destroy() { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); _sessionRouter = 0; @@ -1278,7 +1180,7 @@ Glacier2::SessionRouterI::SessionThread::destroy() } void -Glacier2::SessionRouterI::SessionThread::run() +SessionRouterI::SessionThread::run() { while(true) { diff --git a/cpp/src/Glacier2/SessionRouterI.h b/cpp/src/Glacier2/SessionRouterI.h index a494d1158bf..ecf6ff120fc 100644 --- a/cpp/src/Glacier2/SessionRouterI.h +++ b/cpp/src/Glacier2/SessionRouterI.h @@ -51,6 +51,8 @@ public: void unexpectedCreateSessionException(const Ice::Exception&); void exception(const Ice::Exception&); + + void createException(const Ice::Exception&); virtual void authorize() = 0; virtual void createSession() = 0; @@ -110,6 +112,7 @@ public: private: void sessionPingException(const Ice::Exception&, const ::Ice::ConnectionPtr&); + void sessionDestroyException(const Ice::Exception&); bool startCreateSession(const CreateSessionPtr&, const Ice::ConnectionPtr&); void finishCreateSession(const Ice::ConnectionPtr&, const RouterIPtr&); @@ -153,6 +156,7 @@ private: std::map<Ice::ConnectionPtr, CreateSessionPtr> _pending; Ice::Callback_Object_ice_pingPtr _sessionPingCallback; + Callback_Session_destroyPtr _sessionDestroyCallback; bool _destroy; }; diff --git a/cpp/src/Glacier2Lib/Application.cpp b/cpp/src/Glacier2Lib/Application.cpp index ee748dcae32..806071206be 100644 --- a/cpp/src/Glacier2Lib/Application.cpp +++ b/cpp/src/Glacier2Lib/Application.cpp @@ -23,73 +23,44 @@ string Glacier2::Application::_category; namespace { -class SessionPingThread : virtual public IceUtil::Shared +class SessionPingThreadI : virtual public IceUtil::Thread { public: - virtual void done() = 0; -}; -typedef IceUtil::Handle<SessionPingThread> SessionPingThreadPtr; - -class AMI_Router_refreshSessionI : public Glacier2::AMI_Router_refreshSession -{ - -public: - - AMI_Router_refreshSessionI(Glacier2::Application* app, const SessionPingThreadPtr& pinger) : + SessionPingThreadI(Glacier2::Application* app, const Glacier2::RouterPrx& router, IceUtil::Int64 period) : _app(app), - _pinger(pinger) - { - } - - void - ice_response() + _router(router), + _period(period), + _done(false) { + assert(_period); } void - ice_exception(const Ice::Exception& ex) + exception(const Ice::Exception& ex) { // - // Here the session has gone. The thread - // terminates, and we notify the - // application that the session has been + // Here the session has been destroyed. The thread terminates, + // and we notify the application that the session has been // destroyed. // - _pinger->done(); + done(); _app->sessionDestroyed(); } -private: - - Glacier2::Application* _app; - SessionPingThreadPtr _pinger; -}; - -class SessionPingThreadI : virtual public IceUtil::Thread, virtual public SessionPingThread -{ - -public: - - SessionPingThreadI(Glacier2::Application* app, const Glacier2::RouterPrx& router, IceUtil::Int64 period) : - _app(app), - _router(router), - _period(period), - _done(false) - { - assert(_period); - } - void run() { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); + + Glacier2::Callback_Router_refreshSessionPtr callback = + Glacier2::newCallback_Router_refreshSession(this, &SessionPingThreadI::exception); while(true) { try { - _router->refreshSession_async(new AMI_Router_refreshSessionI(_app, this)); + _router->begin_refreshSession(callback); } catch(const Ice::CommunicatorDestroyedException&) { diff --git a/cpp/src/Glacier2Lib/SessionHelper.cpp b/cpp/src/Glacier2Lib/SessionHelper.cpp index e5a25ad4095..31cfc92fbee 100644 --- a/cpp/src/Glacier2Lib/SessionHelper.cpp +++ b/cpp/src/Glacier2Lib/SessionHelper.cpp @@ -70,7 +70,6 @@ public: private: - const Glacier2::Callback_Router_refreshSessionPtr _cb; const Glacier2::SessionHelperPtr _session; const Glacier2::RouterPrx _router; Ice::Long _period; @@ -130,8 +129,6 @@ typedef IceUtil::Handle<SessionHelperI> SessionHelperIPtr; SessionRefreshThread::SessionRefreshThread(const Glacier2::SessionHelperPtr& session, const Glacier2::RouterPrx& router, Ice::Long period) : - _cb(Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshThread::success, - &SessionRefreshThread::failure)), _session(session), _router(router), _period(period), @@ -142,12 +139,14 @@ SessionRefreshThread::SessionRefreshThread(const Glacier2::SessionHelperPtr& ses void SessionRefreshThread::run() { + Glacier2::Callback_Router_refreshSessionPtr cb = + Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshThread::failure); IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); while(true) { try { - _router->begin_refreshSession(_cb); + _router->begin_refreshSession(cb); } catch(const Ice::CommunicatorDestroyedException&) { @@ -181,11 +180,6 @@ SessionRefreshThread::done() } void -SessionRefreshThread::success() -{ -} - -void SessionRefreshThread::failure(const Ice::Exception&) { done(); diff --git a/cpp/src/Ice/LocatorInfo.cpp b/cpp/src/Ice/LocatorInfo.cpp index 15c78d1fcd7..ac693041b2e 100644 --- a/cpp/src/Ice/LocatorInfo.cpp +++ b/cpp/src/Ice/LocatorInfo.cpp @@ -31,7 +31,7 @@ IceUtil::Shared* IceInternal::upCast(LocatorTable* p) { return p; } namespace { -class ObjectRequest : public LocatorInfo::Request, public Ice::AMI_Locator_findObjectById +class ObjectRequest : public LocatorInfo::Request { public: @@ -40,37 +40,31 @@ public: assert(ref->isWellKnown()); } - virtual void ice_response(const Ice::ObjectPrx& proxy) - { - response(proxy); - } - - virtual void ice_exception(const Ice::Exception& ex) - { - exception(ex); - } - virtual void send(bool async) { try { if(async) { - _locatorInfo->getLocator()->findObjectById_async(this, _ref->getIdentity()); + _locatorInfo->getLocator()->begin_findObjectById( + _ref->getIdentity(), + newCallback_Locator_findObjectById(static_cast<LocatorInfo::Request*>(this), + &LocatorInfo::Request::response, + &LocatorInfo::Request::exception)); } else { - ice_response(_locatorInfo->getLocator()->findObjectById(_ref->getIdentity())); + response(_locatorInfo->getLocator()->findObjectById(_ref->getIdentity())); } } catch(const Ice::Exception& ex) { - ice_exception(ex); + exception(ex); } } }; -class AdapterRequest : public LocatorInfo::Request, public Ice::AMI_Locator_findAdapterById +class AdapterRequest : public LocatorInfo::Request { public: @@ -79,32 +73,26 @@ public: assert(ref->isIndirect() && !ref->isWellKnown()); } - virtual void ice_response(const Ice::ObjectPrx& proxy) - { - response(proxy); - } - - virtual void ice_exception(const Ice::Exception& ex) - { - exception(ex); - } - virtual void send(bool async) { try { if(async) { - _locatorInfo->getLocator()->findAdapterById_async(this, _ref->getAdapterId()); + _locatorInfo->getLocator()->begin_findAdapterById( + _ref->getAdapterId(), + newCallback_Locator_findAdapterById(static_cast<LocatorInfo::Request*>(this), + &LocatorInfo::Request::response, + &LocatorInfo::Request::exception)); } else { - ice_response(_locatorInfo->getLocator()->findAdapterById(_ref->getAdapterId())); + response(_locatorInfo->getLocator()->findAdapterById(_ref->getAdapterId())); } } catch(const Ice::Exception& ex) { - ice_exception(ex); + exception(ex); } } }; diff --git a/cpp/src/Ice/LocatorInfo.h b/cpp/src/Ice/LocatorInfo.h index 7ec892f1626..b33beadf8b0 100644 --- a/cpp/src/Ice/LocatorInfo.h +++ b/cpp/src/Ice/LocatorInfo.h @@ -108,14 +108,14 @@ public: void addCallback(const ReferencePtr&, const ReferencePtr&, int, const GetEndpointsCallbackPtr&); std::vector<EndpointIPtr> getEndpoints(const ReferencePtr&, const ReferencePtr&, int, bool&); + + void response(const Ice::ObjectPrx&); + void exception(const Ice::Exception&); protected: Request(const LocatorInfoPtr&, const ReferencePtr&); - void response(const Ice::ObjectPrx&); - void exception(const Ice::Exception&); - virtual void send(bool) = 0; const LocatorInfoPtr _locatorInfo; diff --git a/cpp/src/Ice/RouterInfo.cpp b/cpp/src/Ice/RouterInfo.cpp index a07ff0d6cf9..291e0ec03d9 100644 --- a/cpp/src/Ice/RouterInfo.cpp +++ b/cpp/src/Ice/RouterInfo.cpp @@ -166,6 +166,32 @@ IceInternal::RouterInfo::getClientEndpoints() } void +IceInternal::RouterInfo::getClientProxyResponse(const Ice::ObjectPrx& proxy, const GetClientEndpointsCallbackPtr& callback) +{ + callback->setEndpoints(setClientEndpoints(proxy)); +} + +void +IceInternal::RouterInfo::getClientProxyException(const Ice::Exception& ex, const GetClientEndpointsCallbackPtr& callback) +{ + if(dynamic_cast<const Ice::CollocationOptimizationException*>(&ex)) + { + try + { + callback->setEndpoints(getClientEndpoints()); + } + catch(const Ice::LocalException& e) + { + callback->setException(e); + } + } + else + { + callback->setException(dynamic_cast<const Ice::LocalException&>(ex)); + } +} + +void IceInternal::RouterInfo::getClientEndpoints(const GetClientEndpointsCallbackPtr& callback) { vector<EndpointIPtr> clientEndpoints; @@ -180,48 +206,10 @@ IceInternal::RouterInfo::getClientEndpoints(const GetClientEndpointsCallbackPtr& return; } - class Callback : public AMI_Router_getClientProxy - { - public: - - virtual void - ice_response(const Ice::ObjectPrx& clientProxy) - { - _callback->setEndpoints(_routerInfo->setClientEndpoints(clientProxy)); - } - - virtual void - ice_exception(const Ice::Exception& ex) - { - if(dynamic_cast<const Ice::CollocationOptimizationException*>(&ex)) - { - try - { - _callback->setEndpoints(_routerInfo->getClientEndpoints()); - } - catch(const Ice::LocalException& e) - { - _callback->setException(e); - } - } - else - { - _callback->setException(dynamic_cast<const Ice::LocalException&>(ex)); - } - } - - Callback(const RouterInfoPtr& routerInfo, const GetClientEndpointsCallbackPtr& callback) : - _routerInfo(routerInfo), _callback(callback) - { - } - - private: - - const RouterInfoPtr _routerInfo; - const GetClientEndpointsCallbackPtr _callback; - }; - - _router->getClientProxy_async(new Callback(this, callback)); + _router->begin_getClientProxy(newCallback_Router_getClientProxy(this, + &RouterInfo::getClientProxyResponse, + &RouterInfo::getClientProxyException), + callback); } vector<EndpointIPtr> @@ -259,6 +247,34 @@ IceInternal::RouterInfo::addProxy(const ObjectPrx& proxy) addAndEvictProxies(proxy, _router->addProxies(proxies)); } +void +IceInternal::RouterInfo::addProxyResponse(const Ice::ObjectProxySeq& proxies, const AddProxyCookiePtr& cookie) +{ + addAndEvictProxies(cookie->proxy(), proxies); + cookie->cb()->addedProxy(); +} + +void +IceInternal::RouterInfo::addProxyException(const Ice::Exception& ex, const AddProxyCookiePtr& cookie) +{ + if(dynamic_cast<const Ice::CollocationOptimizationException*>(&ex)) + { + try + { + addProxy(cookie->proxy()); + cookie->cb()->addedProxy(); + } + catch(const Ice::LocalException& e) + { + cookie->cb()->setException(e); + } + } + else + { + cookie->cb()->setException(dynamic_cast<const Ice::LocalException&>(ex)); + } +} + bool IceInternal::RouterInfo::addProxy(const Ice::ObjectPrx& proxy, const AddProxyCallbackPtr& callback) { @@ -274,53 +290,15 @@ IceInternal::RouterInfo::addProxy(const Ice::ObjectPrx& proxy, const AddProxyCal } } - class Callback : public AMI_Router_addProxies - { - public: - - virtual void - ice_response(const Ice::ObjectProxySeq& evictedProxies) - { - _routerInfo->addAndEvictProxies(_proxy, evictedProxies); - _callback->addedProxy(); - } - - virtual void - ice_exception(const Ice::Exception& ex) - { - if(dynamic_cast<const Ice::CollocationOptimizationException*>(&ex)) - { - try - { - _routerInfo->addProxy(_proxy); - _callback->addedProxy(); - } - catch(const Ice::LocalException& e) - { - _callback->setException(e); - } - } - else - { - _callback->setException(dynamic_cast<const Ice::LocalException&>(ex)); - } - } - - Callback(const RouterInfoPtr& routerInfo, const Ice::ObjectPrx& proxy, const AddProxyCallbackPtr& callback) : - _routerInfo(routerInfo), _proxy(proxy), _callback(callback) - { - } - - private: - - const RouterInfoPtr _routerInfo; - const Ice::ObjectPrx _proxy; - const AddProxyCallbackPtr _callback; - }; Ice::ObjectProxySeq proxies; proxies.push_back(proxy); - _router->addProxies_async(new Callback(this, proxy, callback), proxies); + AddProxyCookiePtr cookie = new AddProxyCookie(callback, proxy); + _router->begin_addProxies(proxies, + newCallback_Router_addProxies(this, + &RouterInfo::addProxyResponse, + &RouterInfo::addProxyException), + cookie); return false; } diff --git a/cpp/src/Ice/RouterInfo.h b/cpp/src/Ice/RouterInfo.h index 52801b57651..b1c80ed07fb 100644 --- a/cpp/src/Ice/RouterInfo.h +++ b/cpp/src/Ice/RouterInfo.h @@ -47,7 +47,7 @@ class RouterInfo : public IceUtil::Shared, public IceUtil::Mutex { public: - class GetClientEndpointsCallback : virtual public IceUtil::Shared + class GetClientEndpointsCallback : virtual public Ice::LocalObject { public: @@ -74,9 +74,41 @@ public: bool operator<(const RouterInfo&) const; Ice::RouterPrx getRouter() const; + void getClientProxyResponse(const Ice::ObjectPrx&, const GetClientEndpointsCallbackPtr&); + void getClientProxyException(const Ice::Exception&, const GetClientEndpointsCallbackPtr&); std::vector<EndpointIPtr> getClientEndpoints(); void getClientEndpoints(const GetClientEndpointsCallbackPtr&); std::vector<EndpointIPtr> getServerEndpoints(); + + class AddProxyCookie : public Ice::LocalObject + { + public: + + AddProxyCookie(const AddProxyCallbackPtr cb, const Ice::ObjectPrx& proxy) : + _cb(cb), + _proxy(proxy) + { + } + + AddProxyCallbackPtr cb() const + { + return _cb; + } + + Ice::ObjectPrx proxy() const + { + return _proxy; + } + + private: + + const AddProxyCallbackPtr _cb; + const Ice::ObjectPrx _proxy; + }; + typedef IceUtil::Handle<AddProxyCookie> AddProxyCookiePtr; + + void addProxyResponse(const Ice::ObjectProxySeq&, const AddProxyCookiePtr&); + void addProxyException(const Ice::Exception&, const AddProxyCookiePtr&); void addProxy(const Ice::ObjectPrx&); bool addProxy(const Ice::ObjectPrx&, const AddProxyCallbackPtr&); diff --git a/cpp/src/IceBox/ServiceManagerI.cpp b/cpp/src/IceBox/ServiceManagerI.cpp index 0a2a5908543..60f0959dcb5 100644 --- a/cpp/src/IceBox/ServiceManagerI.cpp +++ b/cpp/src/IceBox/ServiceManagerI.cpp @@ -25,35 +25,6 @@ typedef IceBox::Service* (*SERVICE_FACTORY)(CommunicatorPtr); namespace { -template<class T> -class AMICallbackT : public T -{ -public: - - AMICallbackT(const ServiceManagerIPtr& serviceManager, const ServiceObserverPrx& observer) : - _serviceManager(serviceManager), - _observer(observer) - { - } - - virtual void ice_response() - { - // ok, success - } - - virtual void ice_exception(const Ice::Exception& ex) - { - // - // Drop this observer - // - _serviceManager->removeObserver(_observer, ex); - } - -private: - ServiceManagerIPtr _serviceManager; - ServiceObserverPrx _observer; -}; - class PropertiesAdminI : public PropertiesAdmin { public: @@ -127,7 +98,8 @@ struct StartServiceInfo IceBox::ServiceManagerI::ServiceManagerI(CommunicatorPtr communicator, int& argc, char* argv[]) : _communicator(communicator), _pendingStatusChanges(false), - _traceServiceObserver(0) + _traceServiceObserver(0), + _observerCompletedCB(newCallback(this, &ServiceManagerI::observerCompleted)) { _logger = _communicator->getLogger(); _traceServiceObserver = _communicator->getProperties()->getPropertyAsInt("IceBox.Trace.ServiceObserver"); @@ -332,32 +304,12 @@ IceBox::ServiceManagerI::addObserver(const ServiceObserverPrx& observer, const I if(activeServices.size() > 0) { - observer->servicesStarted_async(new AMICallbackT<AMI_ServiceObserver_servicesStarted>(this, observer), - activeServices); + observer->begin_servicesStarted(activeServices, _observerCompletedCB); } } } void -IceBox::ServiceManagerI::removeObserver(const ServiceObserverPrx& observer, const Ice::Exception& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - - // - // It's possible to remove several times the same observer, e.g. multiple concurrent - // requests that fail - // - - set<ServiceObserverPrx>::iterator p = _observers.find(observer); - if(p != _observers.end()) - { - ServiceObserverPrx observer = *p; - _observers.erase(p); - observerRemoved(observer, ex); - } -} - -void IceBox::ServiceManagerI::shutdown(const Current&) { _communicator->shutdown(); @@ -933,6 +885,8 @@ IceBox::ServiceManagerI::stopAll() _services.clear(); servicesStopped(stoppedServices, _observers); + + _observerCompletedCB = 0; // Break cyclic reference count. } void @@ -942,9 +896,7 @@ IceBox::ServiceManagerI::servicesStarted(const vector<string>& services, const s { for(set<ServiceObserverPrx>::const_iterator p = observers.begin(); p != observers.end(); ++p) { - ServiceObserverPrx observer = *p; - observer->servicesStarted_async(new AMICallbackT<AMI_ServiceObserver_servicesStarted>(this, observer), - services); + (*p)->begin_servicesStarted(services, _observerCompletedCB); } } } @@ -956,9 +908,7 @@ IceBox::ServiceManagerI::servicesStopped(const vector<string>& services, const s { for(set<ServiceObserverPrx>::const_iterator p = observers.begin(); p != observers.end(); ++p) { - ServiceObserverPrx observer = *p; - observer->servicesStopped_async(new AMICallbackT<AMI_ServiceObserver_servicesStopped>(this, observer), - services); + (*p)->begin_servicesStopped(services, _observerCompletedCB); } } } @@ -1008,3 +958,29 @@ IceBox::ServiceManagerI::createServiceProperties(const string& service) } return properties; } + +void +ServiceManagerI::observerCompleted(const Ice::AsyncResultPtr& result) +{ + try + { + result->throwLocalException(); + } + catch(const Ice::LocalException& ex) + { + ServiceObserverPrx observer = ServiceObserverPrx::uncheckedCast(result->getProxy()); + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + + // + // It's possible to remove several times the same observer, e.g. multiple concurrent + // requests that fail + // + set<ServiceObserverPrx>::iterator p = _observers.find(observer); + if(p != _observers.end()) + { + ServiceObserverPrx observer = *p; + _observers.erase(p); + observerRemoved(observer, ex); + } + } +} diff --git a/cpp/src/IceBox/ServiceManagerI.h b/cpp/src/IceBox/ServiceManagerI.h index a613ae1b3f8..9c381941b49 100644 --- a/cpp/src/IceBox/ServiceManagerI.h +++ b/cpp/src/IceBox/ServiceManagerI.h @@ -39,7 +39,7 @@ public: bool start(); void stop(); - void removeObserver(const ServiceObserverPrx&, const Ice::Exception&); + void observerCompleted(const Ice::AsyncResultPtr&); private: @@ -80,6 +80,7 @@ private: std::set<ServiceObserverPrx> _observers; int _traceServiceObserver; + ::Ice::CallbackPtr _observerCompletedCB; }; typedef IceUtil::Handle<ServiceManagerI> ServiceManagerIPtr; diff --git a/cpp/src/IceGrid/Activator.cpp b/cpp/src/IceGrid/Activator.cpp index 628978b6488..10d0f20d66b 100644 --- a/cpp/src/IceGrid/Activator.cpp +++ b/cpp/src/IceGrid/Activator.cpp @@ -786,6 +786,40 @@ Activator::activate(const string& name, #endif } +namespace +{ + +class ShutdownCallback : public IceUtil::Shared +{ +public: + + ShutdownCallback(const ActivatorPtr& activator, const string& name, const TraceLevelsPtr& traceLevels) : + _activator(activator), _name(name), _traceLevels(traceLevels) + { + + } + + virtual void + exception(const Ice::Exception& ex) + { + Ice::Warning out(_traceLevels->logger); + out << "exception occurred while deactivating `" << _name << "' using process proxy:\n" << ex; + + // + // Send a SIGTERM to the process. + // + _activator->sendSignal(_name, SIGTERM); + } + +private: + + const ActivatorPtr _activator; + const string _name; + const TraceLevelsPtr _traceLevels; +}; + +} + void Activator::deactivate(const string& name, const Ice::ProcessPrx& process) { @@ -800,44 +834,6 @@ Activator::deactivate(const string& name, const Ice::ProcessPrx& process) } #endif - class ShutdownCallback : public Ice::AMI_Process_shutdown - { - public: - - ShutdownCallback(const ActivatorPtr& activator, const string& name, const TraceLevelsPtr& traceLevels) : - _activator(activator), _name(name), _traceLevels(traceLevels) - { - - } - - virtual void - ice_response() - { - // - // Nothing to do, server successfully shutdown, the activator will detect it - // once the pipe is closed. - // - } - - virtual void - ice_exception(const Ice::Exception& ex) - { - Ice::Warning out(_traceLevels->logger); - out << "exception occurred while deactivating `" << _name << "' using process proxy:\n" << ex; - - // - // Send a SIGTERM to the process. - // - _activator->sendSignal(_name, SIGTERM); - } - - private: - - const ActivatorPtr _activator; - const string _name; - const TraceLevelsPtr _traceLevels; - }; - // // Try to shut down the server gracefully using the process proxy. // @@ -848,8 +844,8 @@ Activator::deactivate(const string& name, const Ice::ProcessPrx& process) Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat); out << "deactivating `" << name << "' using process proxy"; } - - process->shutdown_async(new ShutdownCallback(this, name, _traceLevels)); + process->begin_shutdown(Ice::newCallback_Process_shutdown(new ShutdownCallback(this, name, _traceLevels), + &ShutdownCallback::exception)); return; } diff --git a/cpp/src/IceGrid/AdminCallbackRouter.cpp b/cpp/src/IceGrid/AdminCallbackRouter.cpp index 2fc00ab89c9..bb68190a233 100644 --- a/cpp/src/IceGrid/AdminCallbackRouter.cpp +++ b/cpp/src/IceGrid/AdminCallbackRouter.cpp @@ -12,35 +12,20 @@ using namespace Ice; using namespace std; -namespace +void +IceGrid::AdminCallbackRouter::invokeResponse(bool ok, const std::pair<const Byte*, const Byte*>& outParams, + const InvokeCookiePtr& cookie) { + cookie->cb()->ice_response(ok, outParams); +} -class InvokeAMICallback : public AMI_Array_Object_ice_invoke +void +IceGrid::AdminCallbackRouter::invokeException(const Ice::Exception&, const InvokeCookiePtr& cookie) { -public: - - InvokeAMICallback(const AMD_Object_ice_invokePtr& cb) : - _cb(cb) - { - } - - virtual void ice_response(bool ok, const std::pair<const Byte*, const Byte*>& outParams) - { - _cb->ice_response(ok, outParams); - } - - virtual void ice_exception(const Ice::Exception&) - { - _cb->ice_exception(ObjectNotExistException(__FILE__, __LINE__)); // Callback object is unreachable. - } - -private: - AMD_Object_ice_invokePtr _cb; -}; - + // Callback object is unreachable. + cookie->cb()->ice_exception(ObjectNotExistException(__FILE__, __LINE__)); } - void IceGrid::AdminCallbackRouter::addMapping(const string& category, const ConnectionPtr& con) { @@ -92,6 +77,11 @@ IceGrid::AdminCallbackRouter::ice_invoke_async(const AMD_Object_ice_invokePtr& c // // Call with AMI // - target->ice_invoke_async(new InvokeAMICallback(cb), current.operation, current.mode, inParams, current.ctx); + target->begin_ice_invoke(current.operation, current.mode, inParams, current.ctx, + newCallback_Object_ice_invoke( + this, + &AdminCallbackRouter::invokeResponse, + &AdminCallbackRouter::invokeException), + new InvokeCookie(cb)); } diff --git a/cpp/src/IceGrid/AdminCallbackRouter.h b/cpp/src/IceGrid/AdminCallbackRouter.h index 0affceba9ef..8ae022f5759 100644 --- a/cpp/src/IceGrid/AdminCallbackRouter.h +++ b/cpp/src/IceGrid/AdminCallbackRouter.h @@ -28,6 +28,30 @@ public: void addMapping(const std::string&, const Ice::ConnectionPtr&); void removeMapping(const std::string&); + class InvokeCookie : public Ice::LocalObject + { + public: + + InvokeCookie(const Ice::AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + Ice::AMD_Object_ice_invokePtr cb() + { + return _cb; + } + + private: + + Ice::AMD_Object_ice_invokePtr _cb; + }; + typedef IceUtil::Handle<InvokeCookie> InvokeCookiePtr; + + virtual void invokeResponse(bool, const std::pair<const ::Ice::Byte*, const ::Ice::Byte*>&, + const InvokeCookiePtr&); + + virtual void invokeException(const Ice::Exception&, const InvokeCookiePtr&); virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&, const std::pair<const Ice::Byte*, const Ice::Byte*>&, diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp index 5c832e4f4a5..7453a93902b 100644 --- a/cpp/src/IceGrid/AdminI.cpp +++ b/cpp/src/IceGrid/AdminI.cpp @@ -394,100 +394,115 @@ AdminI::getServerAdmin(const string& id, const Current& current) const return current.adapter->createProxy(adminId); } -void -AdminI::startServer_async(const AMD_Admin_startServerPtr& amdCB, const string& id, const Current&) +namespace { - ServerProxyWrapper proxy(_database, id); - proxy.useActivationTimeout(); - class StartCB : public AMI_Server_start +class StartCB : virtual public IceUtil::Shared +{ +public: + + StartCB(const ServerProxyWrapper& proxy, const AMD_Admin_startServerPtr& amdCB) : _proxy(proxy), _amdCB(amdCB) { - public: + } - StartCB(const ServerProxyWrapper& proxy, const AMD_Admin_startServerPtr& amdCB) : _proxy(proxy), _amdCB(amdCB) - { - } + virtual void + response() + { + _amdCB->ice_response(); + } - virtual void - ice_response() + virtual void + exception(const Ice::Exception& ex) + { + try { - _amdCB->ice_response(); + _proxy.handleException(ex); + assert(false); } - - virtual void - ice_exception(const Ice::Exception& ex) + catch(const Ice::Exception& ex) { - try - { - _proxy.handleException(ex); - assert(false); - } - catch(const Ice::Exception& ex) - { - _amdCB->ice_exception(ex); - } + _amdCB->ice_exception(ex); } + } - private: +private: + + const ServerProxyWrapper _proxy; + const AMD_Admin_startServerPtr _amdCB; +}; + +} + + +void +AdminI::startServer_async(const AMD_Admin_startServerPtr& amdCB, const string& id, const Current&) +{ + ServerProxyWrapper proxy(_database, id); + proxy.useActivationTimeout(); - const ServerProxyWrapper _proxy; - const AMD_Admin_startServerPtr _amdCB; - }; - // // Since the server might take a while to be activated, we use AMI. // - proxy->start_async(new StartCB(proxy, amdCB)); + proxy->begin_start(newCallback_Server_start(new StartCB(proxy, amdCB), + &StartCB::response, + &StartCB::exception)); } -void -AdminI::stopServer_async(const AMD_Admin_stopServerPtr& amdCB, const string& id, const Current&) +namespace { - ServerProxyWrapper proxy(_database, id); - proxy.useDeactivationTimeout(); - class StopCB : public AMI_Server_stop +class StopCB : virtual public IceUtil::Shared +{ +public: + + StopCB(const ServerProxyWrapper& proxy, const AMD_Admin_stopServerPtr& amdCB) : _proxy(proxy), _amdCB(amdCB) + { + } + + virtual void + response() { - public: + _amdCB->ice_response(); + } - StopCB(const ServerProxyWrapper& proxy, const AMD_Admin_stopServerPtr& amdCB) : _proxy(proxy), _amdCB(amdCB) + virtual void + exception(const Ice::Exception& ex) + { + try { + _proxy.handleException(ex); + assert(false); } - - virtual void - ice_response() + catch(const Ice::TimeoutException&) { _amdCB->ice_response(); } - - virtual void - ice_exception(const Ice::Exception& ex) + catch(const Ice::Exception& ex) { - try - { - _proxy.handleException(ex); - assert(false); - } - catch(const Ice::TimeoutException&) - { - _amdCB->ice_response(); - } - catch(const Ice::Exception& ex) - { - _amdCB->ice_exception(ex); - } + _amdCB->ice_exception(ex); } + } - private: +private: - const ServerProxyWrapper _proxy; - const AMD_Admin_stopServerPtr _amdCB; - }; + const ServerProxyWrapper _proxy; + const AMD_Admin_stopServerPtr _amdCB; +}; + +} + +void +AdminI::stopServer_async(const AMD_Admin_stopServerPtr& amdCB, const string& id, const Current&) +{ + ServerProxyWrapper proxy(_database, id); + proxy.useDeactivationTimeout(); // // Since the server might take a while to be deactivated, we use AMI. // - proxy->stop_async(new StopCB(proxy, amdCB)); + proxy->begin_stop(newCallback_Server_stop(new StopCB(proxy, amdCB), + &StopCB::response, + &StopCB::exception)); } void diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index ed90c4c757b..1362ff7bbe7 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -149,7 +149,7 @@ interface Adapter * adapter direct proxy it's active. * **/ - ["ami", "amd"] Object* activate(); + ["amd"] Object* activate(); /** * @@ -161,7 +161,7 @@ interface Adapter * endpoints if the adapter is already active. * **/ - ["ami", "nonmutating", "cpp:const"] idempotent Object* getDirectProxy() + ["nonmutating", "cpp:const"] idempotent Object* getDirectProxy() throws AdapterNotActiveException; /** @@ -177,7 +177,7 @@ interface Adapter * active adapter. * **/ - ["ami"] void setDirectProxy(Object* proxy) + void setDirectProxy(Object* proxy) throws AdapterActiveException; }; @@ -224,7 +224,7 @@ interface Server extends FileReader * otherwise. * **/ - ["amd", "ami"] void start() + ["amd"] void start() throws ServerStartException; /** @@ -234,7 +234,7 @@ interface Server extends FileReader * amount of time, it will be killed. * **/ - ["amd", "ami"] void stop() + ["amd"] void stop() throws ServerStopException; /** @@ -292,7 +292,7 @@ interface Server extends FileReader * Set the process proxy. * **/ - ["ami", "amd"] void setProcess(Ice::Process* proc); + ["amd"] void setProcess(Ice::Process* proc); }; interface InternalRegistry; @@ -350,7 +350,7 @@ interface Node extends FileReader, ReplicaObserver * they will be created. * **/ - ["amd", "ami"] idempotent Server* loadServer(InternalServerDescriptor svr, + ["amd"] idempotent Server* loadServer(InternalServerDescriptor svr, string replicaName, out AdapterPrxDict adapters, out int actTimeout, @@ -362,7 +362,7 @@ interface Node extends FileReader, ReplicaObserver * Destroy the given server. * **/ - ["amd", "ami"] idempotent void destroyServer(string name, string uuid, int revision, string replicaName) + ["amd"] idempotent void destroyServer(string name, string uuid, int revision, string replicaName) throws DeploymentException; /** @@ -386,7 +386,7 @@ interface Node extends FileReader, ReplicaObserver * replicaAdded below). * **/ - ["ami"] void registerWithReplica(InternalRegistry* replica); + void registerWithReplica(InternalRegistry* replica); /** * @@ -492,7 +492,7 @@ interface NodeSession * of the server. * **/ - ["amd", "ami", "cpp:const"] void waitForApplicationUpdate(string application, int revision); + ["amd", "cpp:const"] void waitForApplicationUpdate(string application, int revision); /** * @@ -572,7 +572,7 @@ interface ReplicaSession * the locator registry interface. * **/ - ["ami"] idempotent void setAdapterDirectProxy(string adapterId, string replicaGroupId, Object* proxy) + idempotent void setAdapterDirectProxy(string adapterId, string replicaGroupId, Object* proxy) throws AdapterNotExistException, AdapterExistsException; /** diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 6583980e948..ddcc8601758 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -22,22 +22,22 @@ namespace IceGrid // // Callback from asynchronous call to adapter->getDirectProxy() invoked in LocatorI::findAdapterById_async(). // -class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy +class AdapterGetDirectProxyCallback : virtual public IceUtil::Shared { public: - AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : + AdapterGetDirectProxyCallback(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : _locator(locator), _adapter(adapter) { } - virtual void ice_response(const ::Ice::ObjectPrx& obj) + virtual void response(const ::Ice::ObjectPrx& obj) { assert(obj); _locator->getDirectProxyResponse(_adapter, obj); } - virtual void ice_exception(const ::Ice::Exception& e) + virtual void exception(const ::Ice::Exception& e) { _locator->getDirectProxyException(_adapter, e); } @@ -48,21 +48,21 @@ private: const LocatorAdapterInfo _adapter; }; -class AMI_Adapter_activateI : public AMI_Adapter_activate +class AdapterActivateCallback : virtual public IceUtil::Shared { public: - AMI_Adapter_activateI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : + AdapterActivateCallback(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) : _locator(locator), _adapter(adapter) { } - virtual void ice_response(const ::Ice::ObjectPrx& obj) + virtual void response(const ::Ice::ObjectPrx& obj) { _locator->getDirectProxyResponse(_adapter, obj); } - virtual void ice_exception(const ::Ice::Exception& ex) + virtual void exception(const ::Ice::Exception& ex) { _locator->getDirectProxyException(_adapter, ex); } @@ -936,8 +936,10 @@ LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& re requests.push_back(request); _pendingRequests.insert(make_pair(adapter.id, requests)); } - - adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter)); + adapter.proxy->begin_getDirectProxy(newCallback_Adapter_getDirectProxy( + new AdapterGetDirectProxyCallback(this, adapter), + &AdapterGetDirectProxyCallback::response, + &AdapterGetDirectProxyCallback::exception)); return false; } @@ -1011,9 +1013,11 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice:: (*q)->activating(adapter.id); } - AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter); int timeout = adapter.activationTimeout + adapter.deactivationTimeout; - AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB); + AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->begin_activate( + newCallback_Adapter_activate(new AdapterActivateCallback(this, adapter), + &AdapterActivateCallback::response, + &AdapterActivateCallback::exception)); } else { diff --git a/cpp/src/IceGrid/LocatorRegistryI.cpp b/cpp/src/IceGrid/LocatorRegistryI.cpp index 994d84d0141..6620222da4a 100644 --- a/cpp/src/IceGrid/LocatorRegistryI.cpp +++ b/cpp/src/IceGrid/LocatorRegistryI.cpp @@ -20,7 +20,7 @@ namespace IceGrid { template<class AmdCB> -class SetDirectProxyCB : public AMI_Adapter_setDirectProxy +class SetDirectProxyCB : public LocatorRegistryI::AdapterSetDirectProxyCB { public: @@ -32,7 +32,7 @@ public: { } - virtual void ice_response() + virtual void response() { if(_traceLevels->locator > 1) { @@ -43,7 +43,7 @@ public: _cb->ice_response(); } - virtual void ice_exception(const ::Ice::Exception& ex) + virtual void exception(const ::Ice::Exception& ex) { if(_traceLevels->locator > 1) { @@ -88,14 +88,14 @@ newSetDirectProxyCB(const AmdCB& cb, const TraceLevelsPtr& traceLevels, const st return new SetDirectProxyCB<AmdCB>(cb, traceLevels, id, p); } -class AMI_Server_setProcessI : public AMI_Server_setProcess +class ServerSetProcessCB : virtual public IceUtil::Shared { public: - AMI_Server_setProcessI(const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr& cb, - const TraceLevelsPtr& traceLevels, - const string& id, - const Ice::ObjectPrx& proxy) : + ServerSetProcessCB(const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr& cb, + const TraceLevelsPtr& traceLevels, + const string& id, + const Ice::ObjectPrx& proxy) : _cb(cb), _traceLevels(traceLevels), _id(id), _proxy(proxy) { } @@ -145,13 +145,14 @@ private: const string _id; const Ice::ObjectPrx _proxy; }; +typedef IceUtil::Handle<ServerSetProcessCB> ServerSetProcessCBPtr; class SetAdapterDirectProxyCallback : public SynchronizationCallback { public: SetAdapterDirectProxyCallback(const LocatorRegistryIPtr& registry, - const AMI_Adapter_setDirectProxyPtr& amiCB, + const LocatorRegistryI::AdapterSetDirectProxyCBPtr& amiCB, const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) : @@ -168,7 +169,7 @@ public: } catch(const Ice::Exception& ex) { - _amiCB->ice_exception(ex); + _amiCB->exception(ex); } } @@ -181,14 +182,14 @@ public: } catch(const Ice::Exception& ex) { - _amiCB->ice_exception(ex); + _amiCB->exception(ex); } } private: const LocatorRegistryIPtr _registry; - const AMI_Adapter_setDirectProxyPtr _amiCB; + const LocatorRegistryI::AdapterSetDirectProxyCBPtr _amiCB; const string _adapterId; const string _replicaGroupId; const Ice::ObjectPrx _proxy; @@ -322,8 +323,10 @@ LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_set } } - AMI_Server_setProcessPtr amiCB = new AMI_Server_setProcessI(cb, _database->getTraceLevels(), id, proxy); - server->setProcess_async(amiCB, proxy); + server->begin_setProcess(proxy, IceGrid::newCallback_Server_setProcess( + new ServerSetProcessCB(cb, _database->getTraceLevels(), id, proxy), + &ServerSetProcessCB::ice_response, + &ServerSetProcessCB::ice_exception)); } catch(const ServerNotExistException&) { @@ -342,7 +345,7 @@ LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_set } void -LocatorRegistryI::setAdapterDirectProxy(const AMI_Adapter_setDirectProxyPtr& amiCB, +LocatorRegistryI::setAdapterDirectProxy(const LocatorRegistryI::AdapterSetDirectProxyCBPtr& amiCB, const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) @@ -352,7 +355,7 @@ LocatorRegistryI::setAdapterDirectProxy(const AMI_Adapter_setDirectProxyPtr& ami // if(adapterId.empty()) { - amiCB->ice_response(); + amiCB->response(); return; } @@ -386,7 +389,9 @@ LocatorRegistryI::setAdapterDirectProxy(const AMI_Adapter_setDirectProxyPtr& ami } } - adapter->setDirectProxy_async(amiCB, proxy); + adapter->begin_setDirectProxy(proxy, IceGrid::newCallback_Adapter_setDirectProxy(amiCB, + &LocatorRegistryI::AdapterSetDirectProxyCB::response, + &LocatorRegistryI::AdapterSetDirectProxyCB::exception)); return; } catch(const AdapterNotExistException&) @@ -413,7 +418,7 @@ LocatorRegistryI::setAdapterDirectProxy(const AMI_Adapter_setDirectProxyPtr& ami try { _database->setAdapterDirectProxy(adapterId, replicaGroupId, proxy); - amiCB->ice_response(); + amiCB->response(); return; } catch(const AdapterExistsException&) @@ -439,7 +444,7 @@ LocatorRegistryI::setAdapterDirectProxy(const AMI_Adapter_setDirectProxyPtr& ami try { session->setAdapterDirectProxy(adapterId, replicaGroupId, proxy); - amiCB->ice_response(); + amiCB->response(); return; } catch(const AdapterExistsException&) diff --git a/cpp/src/IceGrid/LocatorRegistryI.h b/cpp/src/IceGrid/LocatorRegistryI.h index 44bc8e7007d..b20ad40ccbc 100644 --- a/cpp/src/IceGrid/LocatorRegistryI.h +++ b/cpp/src/IceGrid/LocatorRegistryI.h @@ -11,6 +11,7 @@ #include <IceGrid/Internal.h> #include <Ice/Locator.h> +#include <IceUtil/Shared.h> namespace IceGrid { @@ -26,6 +27,15 @@ class ReplicaSessionManager; class LocatorRegistryI : public Ice::LocatorRegistry { public: + + class AdapterSetDirectProxyCB : virtual public IceUtil::Shared + { + public: + + virtual void response() = 0; + virtual void exception(const ::Ice::Exception&) = 0; + }; + typedef IceUtil::Handle<AdapterSetDirectProxyCB> AdapterSetDirectProxyCBPtr; LocatorRegistryI(const DatabasePtr&, bool, bool, ReplicaSessionManager&); @@ -39,7 +49,7 @@ public: virtual void setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_setServerProcessProxyPtr&, const ::std::string&, const ::Ice::ProcessPrx&, const ::Ice::Current&); - void setAdapterDirectProxy(const AMI_Adapter_setDirectProxyPtr&, const std::string&, const std::string&, + void setAdapterDirectProxy(const AdapterSetDirectProxyCBPtr&, const std::string&, const std::string&, const Ice::ObjectPrx&); const TraceLevelsPtr& getTraceLevels() const; diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index c3c14d92789..e03a74e51d9 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -178,7 +178,7 @@ struct ToInternalServerDescriptor : std::unary_function<CommunicatorDescriptorPt int _iceVersion; }; -class LoadCB : public AMI_Node_loadServer +class LoadCB : virtual public IceUtil::Shared { public: @@ -191,7 +191,7 @@ public: } void - ice_response(const ServerPrx& server, const AdapterPrxDict& adapters, int at, int dt) + response(const ServerPrx& server, const AdapterPrxDict& adapters, int at, int dt) { if(_traceLevels && _traceLevels->server > 1) { @@ -207,7 +207,7 @@ public: } void - ice_exception(const Ice::Exception& ex) + exception(const Ice::Exception& ex) { try { @@ -248,7 +248,7 @@ private: const int _timeout; }; -class DestroyCB : public AMI_Node_destroyServer +class DestroyCB : virtual public IceUtil::Shared { public: @@ -258,7 +258,7 @@ public: } void - ice_response() + response() { if(_traceLevels && _traceLevels->server > 1) { @@ -269,7 +269,7 @@ public: } void - ice_exception(const Ice::Exception& ex) + exception(const Ice::Exception& ex) { try { @@ -308,31 +308,7 @@ private: const string _node; }; -class RegisterCB : public AMI_Node_registerWithReplica -{ -public: - - RegisterCB(const NodeEntryPtr& node) : _node(node) - { - } - - void - ice_response() - { - _node->finishedRegistration(); - } - - void - ice_exception(const Ice::Exception& ex) - { - _node->finishedRegistration(ex); - } - -private: - const NodeEntryPtr _node; -}; - -}; +} NodeCache::NodeCache(const Ice::CommunicatorPtr& communicator, ReplicaCache& replicaCache, const string& replicaName) : _communicator(communicator), @@ -625,8 +601,11 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con } } - AMI_Node_loadServerPtr amiCB = new LoadCB(_cache.getTraceLevels(), entry, _name, sessionTimeout); - node->loadServer_async(amiCB, desc, _cache.getReplicaName()); + node->begin_loadServer(desc, _cache.getReplicaName(), + newCallback_Node_loadServer( + new LoadCB(_cache.getTraceLevels(), entry, _name, sessionTimeout), + &LoadCB::response, + &LoadCB::exception)); } catch(const NodeUnreachableException& ex) { @@ -663,8 +642,10 @@ NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, in out << "unloading `" << info.descriptor->id << "' on node `" << _name << "'"; } - AMI_Node_destroyServerPtr amiCB = new DestroyCB(_cache.getTraceLevels(), entry, _name); - node->destroyServer_async(amiCB, info.descriptor->id, info.uuid, info.revision, _cache.getReplicaName()); + node->begin_destroyServer(info.descriptor->id, info.uuid, info.revision, _cache.getReplicaName(), + newCallback_Node_destroyServer(new DestroyCB(_cache.getTraceLevels(), entry, _name), + &DestroyCB::response, + &DestroyCB::exception)); } catch(const NodeUnreachableException& ex) { @@ -794,7 +775,10 @@ NodeEntry::checkSession() const // _registering = true; NodeEntry* self = const_cast<NodeEntry*>(this); - _proxy->registerWithReplica_async(new RegisterCB(self), _cache.getReplicaCache().getInternalRegistry()); + _proxy->begin_registerWithReplica(_cache.getReplicaCache().getInternalRegistry(), + newCallback_Node_registerWithReplica(self, + &NodeEntry::finishedRegistration, + &NodeEntry::finishedRegistration)); _proxy = 0; // Registration with the proxy is only attempted once. } diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 2a44b98dbdb..ed33cc0fd36 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -188,7 +188,7 @@ private: string _dest; }; -class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp +class NodeUp : public NodeI::Update { public: @@ -202,7 +202,7 @@ public: { try { - _observer->nodeUp_async(this, _info); + _observer->begin_nodeUp(_info, newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed)); } catch(const Ice::LocalException&) { @@ -210,25 +210,13 @@ public: } return true; } - - virtual void - ice_response() - { - finished(true); - } - - virtual void - ice_exception(const Ice::Exception&) - { - finished(false); - } private: NodeDynamicInfo _info; }; -class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer +class UpdateServer : public NodeI::Update { public: @@ -242,7 +230,9 @@ public: { try { - _observer->updateServer_async(this, _node->getName(), _info); + _observer->begin_updateServer(_node->getName(), + _info, + newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed)); } catch(const Ice::LocalException&) { @@ -250,25 +240,13 @@ public: } return true; } - - virtual void - ice_response() - { - finished(true); - } - - virtual void - ice_exception(const Ice::Exception&) - { - finished(false); - } private: ServerDynamicInfo _info; }; -class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter +class UpdateAdapter : public NodeI::Update { public: @@ -282,7 +260,9 @@ public: { try { - _observer->updateAdapter_async(this, _node->getName(), _info); + _observer->begin_updateAdapter(_node->getName(), + _info, + newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed)); } catch(const Ice::LocalException&) { @@ -290,18 +270,6 @@ public: } return true; } - - virtual void - ice_response() - { - finished(true); - } - - virtual void - ice_exception(const Ice::Exception&) - { - finished(false); - } private: @@ -384,6 +352,20 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, } } +void +NodeI::Update::completed(const Ice::AsyncResultPtr& result) +{ + try + { + result->throwLocalException(); + finished(true); + } + catch(const Ice::LocalException& ex) + { + finished(false); + } +} + NodeI::~NodeI() { } diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index 4d29d853147..508de0f2a48 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -40,6 +40,7 @@ typedef IceUtil::Handle<NodeI> NodeIPtr; class NodeI : public Node, public IceUtil::Monitor<IceUtil::Mutex> { public: + class Update : virtual public IceUtil::Shared { public: @@ -50,6 +51,8 @@ public: virtual bool send() = 0; void finished(bool); + + void completed(const Ice::AsyncResultPtr&); protected: diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index 1eb80d965f3..4db7ef92f78 100755 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -248,31 +248,6 @@ private: const TraceLevelsPtr _traceLevels; }; -class WaitForApplicationUpdateCB : public AMI_NodeSession_waitForApplicationUpdate -{ -public: - - WaitForApplicationUpdateCB(const ServerIPtr& server) : _server(server) - { - } - - virtual void - ice_response() - { - _server->activate(); - } - - virtual void - ice_exception(const Ice::Exception&) - { - _server->activate(); - } - -private: - - const ServerIPtr _server; -}; - struct EnvironmentEval : std::unary_function<string, string> { @@ -697,6 +672,12 @@ ServerI::~ServerI() } void +ServerI::waitForApplicationUpdateCompleted(const Ice::AsyncResultPtr&) +{ + activate(); +} + +void ServerI::start_async(const AMD_Server_startPtr& amdCB, const Ice::Current&) { start(Manual, amdCB); @@ -1406,8 +1387,8 @@ ServerI::activate() NodeSessionPrx session = _node->getMasterNodeSession(); if(session) { - AMI_NodeSession_waitForApplicationUpdatePtr cb = new WaitForApplicationUpdateCB(this); - _node->getMasterNodeSession()->waitForApplicationUpdate_async(cb, desc->uuid, desc->revision); + _node->getMasterNodeSession()->begin_waitForApplicationUpdate( + desc->uuid, desc->revision, ::Ice::newCallback(this, &ServerI::waitForApplicationUpdateCompleted)); return; } } diff --git a/cpp/src/IceGrid/ServerI.h b/cpp/src/IceGrid/ServerI.h index f033a4d8bec..e40cd51c877 100644 --- a/cpp/src/IceGrid/ServerI.h +++ b/cpp/src/IceGrid/ServerI.h @@ -71,6 +71,8 @@ public: ServerI(const NodeIPtr&, const ServerPrx&, const std::string&, const std::string&, int); virtual ~ServerI(); + void waitForApplicationUpdateCompleted(const Ice::AsyncResultPtr&); + virtual void start_async(const AMD_Server_startPtr&, const ::Ice::Current& = Ice::Current()); virtual void stop_async(const AMD_Server_stopPtr&, const ::Ice::Current& = Ice::Current()); virtual void sendSignal(const std::string&, const ::Ice::Current&); diff --git a/cpp/src/IcePatch2Lib/ClientUtil.cpp b/cpp/src/IcePatch2Lib/ClientUtil.cpp index 4196200775b..78aa0776d3e 100755 --- a/cpp/src/IcePatch2Lib/ClientUtil.cpp +++ b/cpp/src/IcePatch2Lib/ClientUtil.cpp @@ -241,66 +241,6 @@ private: const PatcherFeedbackPtr _feedback; }; -class AMIGetFileInfoSeq : public AMI_FileServer_getFileInfoSeq, public IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - AMIGetFileInfoSeq() : - _done(false) - { - } - - FileInfoSeq - getFileInfoSeq() - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(!_done) - { - wait(); - } - - _done = false; - - if(_exception.get()) - { - auto_ptr<Exception> ex = _exception; - _fileInfoSeq.clear(); - ex->ice_throw(); - } - - FileInfoSeq fileInfoSeq; - fileInfoSeq.swap(_fileInfoSeq); - return fileInfoSeq; - } - - virtual void - ice_response(const FileInfoSeq& fileInfoSeq) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _fileInfoSeq = fileInfoSeq; - _done = true; - notify(); - } - - virtual void - ice_exception(const Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _exception.reset(ex.ice_clone()); - _done = true; - notify(); - } - -private: - - bool _done; - FileInfoSeq _fileInfoSeq; - auto_ptr<Exception> _exception; -}; - -typedef IceUtil::Handle<AMIGetFileInfoSeq> AMIGetFileInfoSeqPtr; - } bool @@ -363,8 +303,8 @@ IcePatch2::Patcher::prepare() throw string("server returned illegal value"); } - AMIGetFileInfoSeqPtr curCB; - AMIGetFileInfoSeqPtr nxtCB; + AsyncResultPtr curCB; + AsyncResultPtr nxtCB; for(int node0 = 0; node0 < 256; ++node0) { @@ -373,9 +313,7 @@ IcePatch2::Patcher::prepare() if(!curCB) { assert(!nxtCB); - curCB = new AMIGetFileInfoSeq; - nxtCB = new AMIGetFileInfoSeq; - _serverCompress->getFileInfoSeq_async(curCB, node0); + curCB = _serverCompress->begin_getFileInfoSeq(node0); } else { @@ -393,10 +331,10 @@ IcePatch2::Patcher::prepare() if(node0Nxt < 256) { - _serverCompress->getFileInfoSeq_async(nxtCB, node0Nxt); + nxtCB = _serverCompress->begin_getFileInfoSeq(node0Nxt); } - FileInfoSeq files = curCB->getFileInfoSeq(); + FileInfoSeq files = _serverCompress->end_getFileInfoSeq(curCB); sort(files.begin(), files.end(), FileInfoLess()); files.erase(unique(files.begin(), files.end(), FileInfoEqual()), files.end()); @@ -733,71 +671,6 @@ IcePatch2::Patcher::updateFiles(const FileInfoSeq& files) return result; } -namespace -{ - -class AMIGetFileCompressed : public AMI_FileServer_getFileCompressed, public IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - AMIGetFileCompressed() : - _done(false) - { - } - - ByteSeq - getFileCompressed() - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(!_done) - { - wait(); - } - - _done = false; - - if(_exception.get()) - { - auto_ptr<Exception> ex = _exception; - _bytes.clear(); - ex->ice_throw(); - } - - ByteSeq bytes; - bytes.swap(_bytes); - return bytes; - } - - virtual void - ice_response(const pair<const Ice::Byte*, const Ice::Byte*>& bytes) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - ByteSeq(bytes.first, bytes.second).swap(_bytes); - _done = true; - notify(); - } - - virtual void - ice_exception(const Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _exception.reset(ex.ice_clone()); - _done = true; - notify(); - } - -private: - - bool _done; - ByteSeq _bytes; - auto_ptr<Exception> _exception; -}; - -typedef IceUtil::Handle<AMIGetFileCompressed> AMIGetFileCompressedPtr; - -} - bool IcePatch2::Patcher::updateFilesInternal(const FileInfoSeq& files, const DecompressorPtr& decompressor) { @@ -814,8 +687,8 @@ IcePatch2::Patcher::updateFilesInternal(const FileInfoSeq& files, const Decompre } } - AMIGetFileCompressedPtr curCB; - AMIGetFileCompressedPtr nxtCB; + AsyncResultPtr curCB; + AsyncResultPtr nxtCB; for(p = files.begin(); p != files.end(); ++p) { @@ -877,9 +750,7 @@ IcePatch2::Patcher::updateFilesInternal(const FileInfoSeq& files, const Decompre if(!curCB) { assert(!nxtCB); - curCB = new AMIGetFileCompressed; - nxtCB = new AMIGetFileCompressed; - _serverNoCompress->getFileCompressed_async(curCB, p->path, pos, _chunkSize); + curCB = _serverNoCompress->begin_getFileCompressed(p->path, pos, _chunkSize); } else { @@ -889,7 +760,7 @@ IcePatch2::Patcher::updateFilesInternal(const FileInfoSeq& files, const Decompre if(pos + _chunkSize < p->size) { - _serverNoCompress->getFileCompressed_async(nxtCB, p->path, pos + _chunkSize, _chunkSize); + nxtCB = _serverNoCompress->begin_getFileCompressed(p->path, pos + _chunkSize, _chunkSize); } else { @@ -902,7 +773,7 @@ IcePatch2::Patcher::updateFilesInternal(const FileInfoSeq& files, const Decompre if(q != files.end()) { - _serverNoCompress->getFileCompressed_async(nxtCB, q->path, 0, _chunkSize); + nxtCB = _serverNoCompress->begin_getFileCompressed(q->path, 0, _chunkSize); } } @@ -910,7 +781,7 @@ IcePatch2::Patcher::updateFilesInternal(const FileInfoSeq& files, const Decompre try { - bytes = curCB->getFileCompressed(); + bytes = _serverNoCompress->end_getFileCompressed(curCB); } catch(const FileAccessException& ex) { diff --git a/cpp/src/IceStorm/Election.ice b/cpp/src/IceStorm/Election.ice index fc8a8025484..3e9c95f6321 100644 --- a/cpp/src/IceStorm/Election.ice +++ b/cpp/src/IceStorm/Election.ice @@ -68,7 +68,7 @@ interface ReplicaObserver * inconsisency was detected. * **/ - ["ami"] void createTopic(LogUpdate llu, string name) + void createTopic(LogUpdate llu, string name) throws ObserverInconsistencyException; /** @@ -83,7 +83,7 @@ interface ReplicaObserver * inconsisency was detected. * **/ - ["ami"] void destroyTopic(LogUpdate llu, string name) + void destroyTopic(LogUpdate llu, string name) throws ObserverInconsistencyException; /** @@ -100,7 +100,7 @@ interface ReplicaObserver * inconsisency was detected. * **/ - ["ami"] void addSubscriber(LogUpdate llu, string topic, IceStorm::SubscriberRecord record) + void addSubscriber(LogUpdate llu, string topic, IceStorm::SubscriberRecord record) throws ObserverInconsistencyException; /** @@ -117,7 +117,7 @@ interface ReplicaObserver * inconsisency was detected. * **/ - ["ami"] void removeSubscriber(LogUpdate llu, string topic, Ice::IdentitySeq subscribers) + void removeSubscriber(LogUpdate llu, string topic, Ice::IdentitySeq subscribers) throws ObserverInconsistencyException; }; diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice index e664f76abfb..3f89f9d3a03 100644 --- a/cpp/src/IceStorm/IceStormInternal.ice +++ b/cpp/src/IceStorm/IceStormInternal.ice @@ -59,7 +59,7 @@ interface TopicLink * @param events The events to forward. * **/ - ["ami"] void forward(EventDataSeq events); + void forward(EventDataSeq events); }; /** Thrown if the reap call would block. */ @@ -94,7 +94,7 @@ interface TopicInternal extends Topic * @throws ReapWouldBlock Raised if the reap call would block. * **/ - ["ami"] void reap(Ice::IdentitySeq id) throws ReapWouldBlock; + void reap(Ice::IdentitySeq id) throws ReapWouldBlock; }; /** diff --git a/cpp/src/IceStorm/Observers.cpp b/cpp/src/IceStorm/Observers.cpp index a937adf874c..92f74469421 100644 --- a/cpp/src/IceStorm/Observers.cpp +++ b/cpp/src/IceStorm/Observers.cpp @@ -15,82 +15,6 @@ using namespace std; using namespace IceStorm; using namespace IceStormElection; -namespace -{ - -class AMI_ReplicaObserver_createTopicI : public AMI_ReplicaObserver_createTopic, public AMICall -{ -public: - - virtual void ice_response() { response(); } - virtual void ice_exception(const Ice::Exception& e) { exception(e); } -}; -typedef IceUtil::Handle<AMI_ReplicaObserver_createTopicI> AMI_ReplicaObserver_createTopicIPtr; - -class AMI_ReplicaObserver_addSubscriberI : public AMI_ReplicaObserver_addSubscriber, public AMICall -{ -public: - - virtual void ice_response() { response(); } - virtual void ice_exception(const Ice::Exception& e) { exception(e); } -}; -typedef IceUtil::Handle<AMI_ReplicaObserver_addSubscriberI> AMI_ReplicaObserver_addSubscriberIPtr; - -class AMI_ReplicaObserver_removeSubscriberI : public AMI_ReplicaObserver_removeSubscriber, public AMICall -{ -public: - - virtual void ice_response() { response(); } - virtual void ice_exception(const Ice::Exception& e) { exception(e); } -}; -typedef IceUtil::Handle<AMI_ReplicaObserver_removeSubscriberI> AMI_ReplicaObserver_removeSubscriberIPtr; - -class AMI_ReplicaObserver_destroyTopicI : public AMI_ReplicaObserver_destroyTopic, public AMICall -{ -public: - - virtual void ice_response() { response(); } - virtual void ice_exception(const Ice::Exception& e) { exception(e); } -}; -typedef IceUtil::Handle<AMI_ReplicaObserver_destroyTopicI> AMI_ReplicaObserver_destroyTopicIPtr; - -} - -AMICall::AMICall() : - _response(false) -{ -} - -void -AMICall::response() -{ - Lock sync(*this); - _response = true; - notify(); -} -void -AMICall::exception(const IceUtil::Exception& e) -{ - Lock sync(*this); - _response = true; - _ex.reset(e.ice_clone()); - notify(); -} - -void -AMICall::waitResponse() -{ - Lock sync(*this); - while(!_response) - { - wait(); - } - if(_ex.get()) - { - _ex->ice_throw(); - } -} - Observers::Observers(const InstancePtr& instance) : _traceLevels(instance->traceLevels()), _majority(0) @@ -194,9 +118,7 @@ Observers::createTopic(const LogUpdate& llu, const string& name) Lock sync(*this); for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) { - AMI_ReplicaObserver_createTopicIPtr cb = new AMI_ReplicaObserver_createTopicI; - p->call = cb; - p->observer->createTopic_async(cb, llu, name); + p->result = p->observer->begin_createTopic(llu, name); } wait("createTopic"); } @@ -207,9 +129,7 @@ Observers::destroyTopic(const LogUpdate& llu, const string& id) Lock sync(*this); for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) { - AMI_ReplicaObserver_destroyTopicIPtr cb = new AMI_ReplicaObserver_destroyTopicI; - p->call = cb; - p->observer->destroyTopic_async(cb, llu, id); + p->result = p->observer->begin_destroyTopic(llu, id); } wait("destroyTopic"); } @@ -221,9 +141,7 @@ Observers::addSubscriber(const LogUpdate& llu, const string& name, const Subscri Lock sync(*this); for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) { - AMI_ReplicaObserver_addSubscriberIPtr cb = new AMI_ReplicaObserver_addSubscriberI; - p->call = cb; - p->observer->addSubscriber_async(cb, llu, name, rec); + p->result = p->observer->begin_addSubscriber(llu, name, rec); } wait("addSubscriber"); } @@ -234,9 +152,7 @@ Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice: Lock sync(*this); for(vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p) { - AMI_ReplicaObserver_removeSubscriberIPtr cb = new AMI_ReplicaObserver_removeSubscriberI; - p->call = cb; - p->observer->removeSubscriber_async(cb, llu, name, id); + p->result = p->observer->begin_removeSubscriber(llu, name, id); } wait("removeSubscriber"); } @@ -249,7 +165,8 @@ Observers::wait(const string& op) { try { - p->call->waitResponse(); + p->result->waitForCompleted(); + p->result->throwLocalException(); } catch(const Ice::Exception& ex) { @@ -261,11 +178,8 @@ Observers::wait(const string& op) int id = p->id; p = _observers.erase(p); - // COMPILERFIX: Just using following causes double unlock with C++Builder 2007 - //IceUtil::Mutex::Lock sync(_reapedMutex); - _reapedMutex.lock(); + IceUtil::Mutex::Lock sync(_reapedMutex); _reaped.push_back(id); - _reapedMutex.unlock(); continue; } ++p; diff --git a/cpp/src/IceStorm/Observers.h b/cpp/src/IceStorm/Observers.h index d7c51f9020f..1cac54a8b8e 100644 --- a/cpp/src/IceStorm/Observers.h +++ b/cpp/src/IceStorm/Observers.h @@ -25,23 +25,6 @@ typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr; namespace IceStormElection { -class AMICall : virtual public IceUtil::Shared, - virtual public IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - AMICall(); - - void response(); - void exception(const IceUtil::Exception& e); - void waitResponse(); - -private: - bool _response; - std::auto_ptr<IceUtil::Exception> _ex; -}; -typedef IceUtil::Handle<AMICall> AMICallPtr; - class Observers : public IceUtil::Shared, public IceUtil::Mutex { public: @@ -72,7 +55,7 @@ private: id(i), observer(o) {} int id; ReplicaObserverPrx observer; - AMICallPtr call; + ::Ice::AsyncResultPtr result; }; std::vector<ObserverInfo> _observers; IceUtil::Mutex _reapedMutex; diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index b854ff2a835..74b15583cc1 100755 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -93,6 +93,11 @@ public: virtual void flush(); + void exception(const Ice::Exception& ex) + { + error(false, ex); + } + void doFlush(); private: @@ -111,7 +116,11 @@ public: virtual void flush(); - void sent(); + void exception(const Ice::Exception& ex) + { + error(true, ex); + } + void sent(bool); private: @@ -146,84 +155,6 @@ private: const TopicLinkPrx _obj; }; -class OnewayIceInvokeI : public Ice::AMI_Object_ice_invoke, public Ice::AMISentCallback -{ -public: - - OnewayIceInvokeI(const SubscriberOnewayPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_response(bool, const std::vector<Ice::Byte>&) - { - assert(false); - } - - virtual void - ice_sent() - { - _subscriber->sent(); - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(true, e); - } - -private: - - const SubscriberOnewayPtr _subscriber; -}; - -class IceInvokeI : public Ice::AMI_Object_ice_invoke -{ -public: - - IceInvokeI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_response(bool, const std::vector<Ice::Byte>&) - { - _subscriber->response(); - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(true, e); - } - -private: - - const SubscriberPtr _subscriber; -}; - -class FlushBatchI : public Ice::AMI_Object_ice_flushBatchRequests -{ -public: - - FlushBatchI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(false, e); - } - -private: - - const SubscriberPtr _subscriber; -}; - class FlushTimerTask : public IceUtil::TimerTask { public: @@ -313,7 +244,8 @@ SubscriberBatch::doFlush() return; } - _obj->ice_flushBatchRequests_async(new FlushBatchI(this)); + _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(this, + &SubscriberBatch::exception)); // This is significantly faster than the async version, but it can // block the calling thread. Bad news! @@ -361,7 +293,11 @@ SubscriberOneway::flush() _events.erase(_events.begin()); try { - if(!_obj->ice_invoke_async(new OnewayIceInvokeI(this), e->op, e->mode, e->data, e->context)) + Ice::AsyncResultPtr result = _obj->begin_ice_invoke( + e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, + &SubscriberOneway::exception, + &SubscriberOneway::sent)); + if(!result->sentSynchronously()) { ++_outstanding; } @@ -380,8 +316,13 @@ SubscriberOneway::flush() } void -SubscriberOneway::sent() +SubscriberOneway::sent(bool sentSynchronously) { + if(sentSynchronously) + { + return; + } + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); // Decrement the _outstanding count. @@ -436,7 +377,8 @@ SubscriberTwoway::flush() try { - _obj->ice_invoke_async(new IceInvokeI(this), e->op, e->mode, e->data, e->context); + _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context, + Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); } catch(const Ice::Exception& ex) { @@ -449,34 +391,6 @@ SubscriberTwoway::flush() namespace { -class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward -{ -public: - - Topiclink_forwardI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_response() - { - _subscriber->response(); - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(true, e); - } - -private: - - const SubscriberPtr _subscriber; -}; - -} - SubscriberLink::SubscriberLink( const InstancePtr& instance, const SubscriberRecord& rec) : @@ -523,7 +437,7 @@ SubscriberLink::flush() try { ++_outstanding; - _obj->forward_async(new Topiclink_forwardI(this), v); + _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); } catch(const Ice::Exception& ex) { @@ -532,6 +446,8 @@ SubscriberLink::flush() } } +} + SubscriberPtr Subscriber::create( const InstancePtr& instance, @@ -850,30 +766,40 @@ Subscriber::error(bool dec, const Ice::Exception& e) } void -Subscriber::response() +Subscriber::completed(const Ice::AsyncResultPtr& result) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - // Decrement the _outstanding count. - --_outstanding; - assert(_outstanding >= 0 && _outstanding < _maxOutstanding); - - // - // A successful response means we're no longer retrying, we're - // back active. - // - _currentRetry = 0; - - if(_events.empty() && _outstanding == 0 && _shutdown) + try { - _lock.notify(); + result->throwLocalException(); + + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + + // + // A successful response means we're no longer retrying, we're + // back active. + // + _currentRetry = 0; + + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + else + { + flush(); + } } - else + catch(const Ice::LocalException& ex) { - flush(); + error(true, ex); } } + void Subscriber::shutdown() { diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h index 75b7e0ea056..52e07c2b137 100644 --- a/cpp/src/IceStorm/Subscriber.h +++ b/cpp/src/IceStorm/Subscriber.h @@ -43,8 +43,8 @@ public: void destroy(); // To be called by the AMI callbacks only. + void completed(const Ice::AsyncResultPtr&); void error(bool, const Ice::Exception&); - void response(); void shutdown(); diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 35e0fe455dc..a19ac8b7289 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -1147,20 +1147,16 @@ TopicImpl::proxy() const namespace { -class TopicInternal_reapI : public AMI_TopicInternal_reap +class TopicInternalReapCB : public IceUtil::Shared { public: - TopicInternal_reapI(const InstancePtr& instance, Ice::Long generation) : + TopicInternalReapCB(const InstancePtr& instance, Ice::Long generation) : _instance(instance), _generation(generation) { } - virtual void ice_response() - { - } - - virtual void ice_exception(const Ice::Exception& ex) + virtual void exception(const Ice::Exception& ex) { TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) @@ -1236,7 +1232,8 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) // call may raise an exception in the caller (that is directly // call ice_exception) which calls recover() on the node which // would result in a deadlock since the node is locked. - masterInternal->reap_async(new TopicInternal_reapI(_instance, generation), reap); + masterInternal->begin_reap(reap, newCallback_TopicInternal_reap(new TopicInternalReapCB(_instance, generation), + &TopicInternalReapCB::exception)); } void |