diff options
author | Joe George <joe@zeroc.com> | 2015-12-23 14:48:40 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2015-12-24 10:01:11 -0500 |
commit | e84da5f580821cae8dab292e19cc1296c07a8ed5 (patch) | |
tree | 1e8783c55c7dccd5adda2b87b47b8a7c118a1147 /cpp/src | |
parent | Fixes related to EnableSharedFromThis (diff) | |
download | ice-e84da5f580821cae8dab292e19cc1296c07a8ed5.tar.bz2 ice-e84da5f580821cae8dab292e19cc1296c07a8ed5.tar.xz ice-e84da5f580821cae8dab292e19cc1296c07a8ed5.zip |
ICE-6898 - "Delegate" functions for ACM callbacks
- Add delegate local interfaces CloseCallback and HeartbeatCallback and
remove ConnectionCallback.
- Replace setCallback by setCloseCallback and setHeartbeatCallback
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Glacier2/SessionRouterI.cpp | 152 | ||||
-rw-r--r-- | cpp/src/Glacier2/SessionRouterI.h | 3 | ||||
-rw-r--r-- | cpp/src/Glacier2Lib/Application.cpp | 30 | ||||
-rw-r--r-- | cpp/src/Glacier2Lib/SessionHelper.cpp | 23 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 104 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 19 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReapThread.cpp | 59 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReapThread.h | 21 | ||||
-rw-r--r-- | cpp/src/Slice/ObjCUtil.cpp | 2 | ||||
-rw-r--r-- | cpp/src/slice2objc/Gen.cpp | 40 | ||||
-rw-r--r-- | cpp/src/slice2objc/Gen.h | 1 |
11 files changed, 285 insertions, 169 deletions
diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp index 47f18f7e8d7..d2c05f27e16 100644 --- a/cpp/src/Glacier2/SessionRouterI.cpp +++ b/cpp/src/Glacier2/SessionRouterI.cpp @@ -36,7 +36,7 @@ public: void finished(const Ice::AsyncResultPtr& r) - { + { Ice::ObjectPrx o = r->getProxy(); try { @@ -68,7 +68,7 @@ public: void finished(const Ice::AsyncResultPtr& r) - { + { Ice::ObjectPrx o = r->getProxy(); try { @@ -84,7 +84,7 @@ public: _router->destroySession(_connection); } } - + private: const SessionRouterIPtr _router; @@ -99,7 +99,7 @@ class SessionControlI : public SessionControl { public: - SessionControlI(const SessionRouterIPtr& sessionRouter, const ConnectionPtr& connection, + SessionControlI(const SessionRouterIPtr& sessionRouter, const ConnectionPtr& connection, const FilterManagerPtr& filterManager) : _sessionRouter(sessionRouter), _connection(connection), @@ -122,7 +122,7 @@ public: virtual IdentitySetPrx identities(const Current&) { - return _filters->identitiesPrx(); + return _filters->identitiesPrx(); } virtual int @@ -130,7 +130,7 @@ public: { return static_cast<int>(_sessionRouter->getSessionTimeout(current)); } - + virtual void destroy(const Current&) { @@ -153,7 +153,7 @@ public: _sessionRouter(sessionRouter) { } - + virtual ObjectPtr locate(const Current& current, LocalObjectPtr&) { @@ -183,7 +183,7 @@ public: _sessionRouter(sessionRouter) { } - + virtual ObjectPtr locate(const Current& current, LocalObjectPtr&) { @@ -209,14 +209,14 @@ class UserPasswordCreateSession : public CreateSession { public: - UserPasswordCreateSession(const AMD_Router_createSessionPtr& amdCB, const string& user, const string& password, + UserPasswordCreateSession(const AMD_Router_createSessionPtr& amdCB, const string& user, const string& password, const Ice::Current& current, const SessionRouterIPtr& sessionRouter) : CreateSession(sessionRouter, user, current), - _amdCB(amdCB), + _amdCB(amdCB), _password(password) { } - + void checkPermissionsResponse(bool ok, const string& reason) @@ -251,7 +251,7 @@ public: Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - + _sessionRouter->_verifier->begin_checkPermissions(_user, _password, ctx, newCallback_PermissionsVerifier_checkPermissions(this, &UserPasswordCreateSession::checkPermissionsResponse, @@ -272,10 +272,10 @@ public: _sessionRouter->_sessionManager->begin_create(_user, _control, ctx, newCallback_SessionManager_create( static_cast<CreateSession*>(this), - &CreateSession::sessionCreated, + &CreateSession::sessionCreated, &CreateSession::createException)); } - + virtual void finished(const SessionPrx& session) { @@ -298,14 +298,14 @@ class SSLCreateSession : public CreateSession { public: - SSLCreateSession(const AMD_Router_createSessionFromSecureConnectionPtr& amdCB, const string& user, + SSLCreateSession(const AMD_Router_createSessionFromSecureConnectionPtr& amdCB, const string& user, const SSLInfo& sslInfo, const Ice::Current& current, const SessionRouterIPtr& sessionRouter) : CreateSession(sessionRouter, user, current), - _amdCB(amdCB), + _amdCB(amdCB), _sslInfo(sslInfo) { } - + void authorizeResponse(bool ok, const string& reason) { @@ -339,9 +339,9 @@ public: Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - _sessionRouter->_sslVerifier->begin_authorize(_sslInfo, ctx, + _sessionRouter->_sslVerifier->begin_authorize(_sslInfo, ctx, newCallback_SSLPermissionsVerifier_authorize(this, - &SSLCreateSession::authorizeResponse, + &SSLCreateSession::authorizeResponse, &SSLCreateSession::authorizeException)); } @@ -359,10 +359,10 @@ public: _sessionRouter->_sslSessionManager->begin_create(_sslInfo, _control, ctx, newCallback_SSLSessionManager_create( static_cast<CreateSession*>(this), - &CreateSession::sessionCreated, + &CreateSession::sessionCreated, &CreateSession::createException)); } - + virtual void finished(const SessionPrx& session) { @@ -381,32 +381,45 @@ private: const SSLInfo _sslInfo; }; -class ConnectionCallbackI : public Ice::ConnectionCallback +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(const SessionRouterIPtr& sessionRouter) : _sessionRouter(sessionRouter) + CloseCallbackI(const SessionRouterIPtr& sessionRouter) : _sessionRouter(sessionRouter) { } - + virtual void - heartbeat(const Ice::ConnectionPtr& connection) + closed(const Ice::ConnectionPtr& connection) { try { - _sessionRouter->refreshSession(connection); + _sessionRouter->destroySession(connection); } catch(const Ice::Exception&) { } } +private: + + const SessionRouterIPtr _sessionRouter; +}; + +class HeartbeatCallbackI : public Ice::HeartbeatCallback +{ +public: + + HeartbeatCallbackI(const SessionRouterIPtr& sessionRouter) : _sessionRouter(sessionRouter) + { + } + virtual void - closed(const Ice::ConnectionPtr& connection) + heartbeat(const Ice::ConnectionPtr& connection) { try { - _sessionRouter->destroySession(connection); + _sessionRouter->refreshSession(connection); } catch(const Ice::Exception&) { @@ -571,7 +584,7 @@ CreateSession::sessionCreated(const SessionPrx& session) // Notify the router that the creation is finished. // try - { + { _sessionRouter->finishCreateSession(_current.con, router); finished(session); } @@ -601,13 +614,13 @@ void CreateSession::exception(const Ice::Exception& ex) { try - { + { _sessionRouter->finishCreateSession(_current.con, 0); } catch(const Ice::Exception&) { } - + finished(ex); if(_control) @@ -640,7 +653,8 @@ SessionRouterI::SessionRouterI(const InstancePtr& instance, _sslVerifier(sslVerifier), _sslSessionManager(sslSessionManager), _sessionTimeout(IceUtil::Time::seconds(_instance->properties()->getPropertyAsInt("Glacier2.SessionTimeout"))), - _connectionCallback(new ConnectionCallbackI(this)), + _closeCallback(new CloseCallbackI(this)), + _heartbeatCallback(new HeartbeatCallbackI(this)), _sessionThread(_sessionTimeout > IceUtil::Time() ? new SessionThread(this, _sessionTimeout) : 0), _routersByConnectionHint(_routersByConnection.end()), _routersByCategoryHint(_routersByCategory.end()), @@ -671,7 +685,7 @@ SessionRouterI::SessionRouterI(const InstancePtr& instance, // a router servant based on connection information. // _instance->clientObjectAdapter()->addServantLocator(new ClientLocator(this), ""); - + // // If there is a server object adapter, all calls on this adapter // are dispatched to a router servant based on the category field @@ -709,21 +723,22 @@ SessionRouterI::destroy() Callback_Session_destroyPtr destroyCallback; { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + assert(!_destroy); _destroy = true; notify(); - + _routersByConnection.swap(routers); _routersByConnectionHint = _routersByConnection.end(); - + _routersByCategory.clear(); _routersByCategoryHint = _routersByCategory.end(); - + sessionThread = _sessionThread; _sessionThread = 0; - _connectionCallback = 0; + _closeCallback = 0; + _heartbeatCallback = 0; swap(destroyCallback, _sessionDestroyCallback); // Break cyclic reference count. } @@ -780,7 +795,7 @@ SessionRouterI::getCategoryForClient(const Ice::Current& current) const } void -SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& amdCB, const std::string& userId, +SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& amdCB, const std::string& userId, const std::string& password, const Current& current) { if(!_verifier) @@ -913,14 +928,14 @@ SessionRouterI::destroySession(const ConnectionPtr& connection) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + if(_destroy) { throw ObjectNotExistException(__FILE__, __LINE__); } - - map<ConnectionPtr, RouterIPtr>::iterator p; - + + map<ConnectionPtr, RouterIPtr>::iterator p; + if(_routersByConnectionHint != _routersByConnection.end() && _routersByConnectionHint->first == connection) { p = _routersByConnectionHint; @@ -929,17 +944,17 @@ SessionRouterI::destroySession(const ConnectionPtr& connection) { p = _routersByConnection.find(connection); } - + if(p == _routersByConnection.end()) { throw SessionNotExistException(); } - + router = p->second; _routersByConnection.erase(p++); _routersByConnectionHint = p; - + if(_instance->serverObjectAdapter()) { string category = router->getServerProxy(Current())->ice_getIdentity().category; @@ -973,7 +988,7 @@ SessionRouterI::getACMTimeout(const Ice::Current& current) const return current.con->getACM().timeout; } -void +void SessionRouterI::updateSessionObservers() { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -984,7 +999,7 @@ SessionRouterI::updateSessionObservers() for(map<ConnectionPtr, RouterIPtr>::iterator p = _routersByConnection.begin(); p != _routersByConnection.end(); ++p) { p->second->updateObserver(observer); - } + } } RouterIPtr @@ -1017,7 +1032,7 @@ SessionRouterI::getServerBlobject(const string& category) const { return _routersByCategoryHint->second->getServerBlobject(); } - + map<string, RouterIPtr>::iterator p = routers.find(category); if(p != routers.end()) @@ -1035,30 +1050,30 @@ void SessionRouterI::expireSessions() { vector<RouterIPtr> routers; - + { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + if(_destroy) { return; } - + assert(_sessionTimeout > IceUtil::Time()); IceUtil::Time minTimestamp = IceUtil::Time::now(IceUtil::Time::Monotonic) - _sessionTimeout; - + map<ConnectionPtr, RouterIPtr>::iterator p = _routersByConnection.begin(); - + while(p != _routersByConnection.end()) { if(p->second->getTimestamp() < minTimestamp) { RouterIPtr router = p->second; routers.push_back(router); - + _routersByConnection.erase(p++); _routersByConnectionHint = p; - + if(_instance->serverObjectAdapter()) { string category = router->getServerProxy(Current())->ice_getIdentity().category; @@ -1073,7 +1088,7 @@ SessionRouterI::expireSessions() } } } - + // // We destroy the expired routers outside the thread // synchronization, to avoid deadlocks. @@ -1104,7 +1119,7 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi _routersByConnectionHint->second->updateTimestamp(); return _routersByConnectionHint->second; } - + map<ConnectionPtr, RouterIPtr>::iterator p = routers.find(connection); if(p != routers.end()) @@ -1141,7 +1156,7 @@ bool SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionPtr& connection) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + if(_destroy) { CannotCreateSessionException exc; @@ -1153,7 +1168,7 @@ SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionP // Check whether a session already exists for the connection. // { - map<ConnectionPtr, RouterIPtr>::iterator p; + map<ConnectionPtr, RouterIPtr>::iterator p; if(_routersByConnectionHint != _routersByConnection.end() && _routersByConnectionHint->first == connection) { @@ -1163,7 +1178,7 @@ SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionP { p = _routersByConnection.find(connection); } - + if(p != _routersByConnection.end()) { CannotCreateSessionException exc; @@ -1198,14 +1213,14 @@ void SessionRouterI::finishCreateSession(const ConnectionPtr& connection, const RouterIPtr& router) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + // // Signal other threads that we are done with trying to // establish a session for our connection; // _pending.erase(connection); notify(); - + if(!router) { return; @@ -1219,21 +1234,22 @@ SessionRouterI::finishCreateSession(const ConnectionPtr& connection, const Route exc.reason = "router is shutting down"; throw exc; } - + _routersByConnectionHint = _routersByConnection.insert( _routersByConnectionHint, pair<const ConnectionPtr, RouterIPtr>(connection, router)); - + if(_instance->serverObjectAdapter()) { string category = router->getServerProxy()->ice_getIdentity().category; assert(!category.empty()); - pair<map<string, RouterIPtr>::iterator, bool> rc = + pair<map<string, RouterIPtr>::iterator, bool> rc = _routersByCategory.insert(pair<const string, RouterIPtr>(category, router)); assert(rc.second); _routersByCategoryHint = rc.first; } - connection->setCallback(_connectionCallback); + connection->setCloseCallback(_closeCallback); + connection->setHeartbeatCallback(_heartbeatCallback); if(_sessionTraceLevel >= 1) { @@ -1277,7 +1293,7 @@ SessionRouterI::SessionThread::run() { return; } - + assert(_sessionTimeout > IceUtil::Time()); timedWait(_sessionTimeout / 4); diff --git a/cpp/src/Glacier2/SessionRouterI.h b/cpp/src/Glacier2/SessionRouterI.h index 10b784fdadb..252bfcc0c4e 100644 --- a/cpp/src/Glacier2/SessionRouterI.h +++ b/cpp/src/Glacier2/SessionRouterI.h @@ -148,7 +148,8 @@ private: const SSLSessionManagerPrx _sslSessionManager; IceUtil::Time _sessionTimeout; - Ice::ConnectionCallbackPtr _connectionCallback; + Ice::CloseCallbackPtr _closeCallback; + Ice::HeartbeatCallbackPtr _heartbeatCallback; class SessionThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> { diff --git a/cpp/src/Glacier2Lib/Application.cpp b/cpp/src/Glacier2Lib/Application.cpp index 729f90aa21f..dbd21240c8f 100644 --- a/cpp/src/Glacier2Lib/Application.cpp +++ b/cpp/src/Glacier2Lib/Application.cpp @@ -13,7 +13,7 @@ using namespace std; using namespace Ice; - + Ice::ObjectAdapterPtr Glacier2::Application::_adapter; Glacier2::RouterPrxPtr Glacier2::Application::_router; Glacier2::SessionPrxPtr Glacier2::Application::_session; @@ -22,31 +22,26 @@ string Glacier2::Application::_category; namespace { - -class ConnectionCallbackI : public Ice::ConnectionCallback +#ifndef ICE_CPP11_MAPPING // C++98 +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(Glacier2::Application* app) : _app(app) + CloseCallbackI(Glacier2::Application* app) : _app(app) { } virtual void - heartbeat(const Ice::ConnectionPtr&) - { - } - - virtual void closed(const Ice::ConnectionPtr&) { _app->sessionDestroyed(); } - + private: Glacier2::Application* _app; }; - +#endif } string @@ -154,7 +149,7 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat { IceInternal::Application::_communicator = Ice::initialize(args, initData); _router = ICE_UNCHECKED_CAST(Glacier2::RouterPrx, communicator()->getDefaultRouter()); - + if(!_router) { Error out(getProcessLogger()); @@ -188,7 +183,7 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat { Ice::Int acmTimeout = 0; try - { + { acmTimeout = _router->getACMTimeout(); } catch(const Ice::OperationNotExistException&) @@ -204,7 +199,14 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat Ice::ConnectionPtr connection = _router->ice_getCachedConnection(); assert(connection); connection->setACM(acmTimeout, IceUtil::None, Ice::HeartbeatAlways); - connection->setCallback(ICE_MAKE_SHARED(ConnectionCallbackI, this)); +#ifdef ICE_CPP11_MAPPING + connection->setCloseCallback([this](const Ice::ConnectionPtr&) + { + this->sessionDestroyed(); + }); +#else + connection->setCloseCallback(ICE_MAKE_SHARED(CloseCallbackI, this)); +#endif } _category = _router->getCategoryForClient(); diff --git a/cpp/src/Glacier2Lib/SessionHelper.cpp b/cpp/src/Glacier2Lib/SessionHelper.cpp index 541267fb51f..5ef4a88db88 100644 --- a/cpp/src/Glacier2Lib/SessionHelper.cpp +++ b/cpp/src/Glacier2Lib/SessionHelper.cpp @@ -707,16 +707,12 @@ private: const Glacier2::SessionHelperPtr _session; }; -class ConnectionCallbackI : public Ice::ConnectionCallback +#ifndef ICE_CPP11_MAPPING // C++98 +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(const SessionHelperIPtr& sessionHelper) : _sessionHelper(sessionHelper) - { - } - - virtual void - heartbeat(const Ice::ConnectionPtr&) + CloseCallbackI(const SessionHelperIPtr& sessionHelper) : _sessionHelper(sessionHelper) { } @@ -730,6 +726,7 @@ private: SessionHelperIPtr _sessionHelper; }; +#endif } @@ -792,7 +789,15 @@ SessionHelperI::connected(const Glacier2::RouterPrxPtr& router, const Glacier2:: Ice::ConnectionPtr connection = _router->ice_getCachedConnection(); assert(connection); connection->setACM(acmTimeout, IceUtil::None, Ice::HeartbeatAlways); - connection->setCallback(ICE_MAKE_SHARED(ConnectionCallbackI, shared_from_this())); +#ifdef ICE_CPP11_MAPPING + auto self(shared_from_this()); + connection->setCloseCallback([self](const Ice::ConnectionPtr&) + { + self->destroy(); + }); +#else + connection->setCloseCallback(ICE_MAKE_SHARED(CloseCallbackI, shared_from_this())); +#endif } } } @@ -1115,7 +1120,7 @@ Glacier2::SessionFactoryHelper::connect() map<string, string> context; { IceUtil::Mutex::Lock sync(_mutex); - session = ICE_MAKE_SHARED(SessionHelperI, + session = ICE_MAKE_SHARED(SessionHelperI, ICE_MAKE_SHARED(SessionThreadCallback, shared_from_this()), _callback, createInitData(), diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index afd26ebdb32..f86bf0e6b58 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -47,7 +47,6 @@ namespace const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - class TimeoutCallback : public IceUtil::TimerTask { public: @@ -74,7 +73,7 @@ public: DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, + const OutgoingAsyncBasePtr& outAsync, const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream) : DispatchWorkItem(connection), _connection(connection), @@ -110,7 +109,7 @@ private: const ServantManagerPtr _servantManager; const ObjectAdapterPtr _adapter; const OutgoingAsyncBasePtr _outAsync; - const ConnectionCallbackPtr _heartbeatCallback; + const ICE_HEARTBEAT_CALLBACK _heartbeatCallback; BasicStream _stream; }; @@ -540,7 +539,7 @@ Ice::ConnectionI::updateObserver() } assert(_instance->initializationData().observer); - + ConnectionObserverPtr o = _instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), _endpoint, toConnectionState(_state), @@ -959,50 +958,66 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) #endif void -Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) +#ifdef ICE_CPP11_MAPPING +Ice::ConnectionI::setHeartbeatCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback) +#else +Ice::ConnectionI::setHeartbeatCallback(const Ice::HeartbeatCallbackPtr& callback) +#endif { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _heartbeatCallback = callback; +} +void +#ifdef ICE_CPP11_MAPPING +Ice::ConnectionI::setCloseCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback) +#else +Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback) +#endif +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) + if(callback) { - if(callback) + class CallbackWorkItem : public DispatchWorkItem { - class CallbackWorkItem : public DispatchWorkItem - { - public: + public: - CallbackWorkItem(const ConnectionIPtr& connection, const ConnectionCallbackPtr& callback) : - _connection(connection), - _callback(callback) - { - } + CallbackWorkItem(const ConnectionIPtr& connection, const ICE_CLOSE_CALLBACK& callback) : + _connection(connection), + _callback(callback) + { + } - virtual void run() - { - _connection->closeCallback(_callback); - } + virtual void run() + { + _connection->closeCallback(_callback); + } - private: + private: - const ConnectionIPtr _connection; - const ConnectionCallbackPtr _callback; - }; - _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback)); - } - } - else - { - _callback = callback; + const ConnectionIPtr _connection; + const ICE_CLOSE_CALLBACK _callback; + }; + _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback)); } } + else + { + _closeCallback = callback; + } } void -Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback) +Ice::ConnectionI::closeCallback(const ICE_CLOSE_CALLBACK& callback) { try { +#ifdef ICE_CPP11_MAPPING + callback(shared_from_this()); +#else callback->closed(shared_from_this()); +#endif } catch(const std::exception& ex) { @@ -1537,7 +1552,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ServantManagerPtr servantManager; ObjectAdapterPtr adapter; OutgoingAsyncBasePtr outAsync; - ConnectionCallbackPtr heartbeatCallback; + ICE_HEARTBEAT_CALLBACK heartbeatCallback; int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); @@ -1807,7 +1822,7 @@ void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync, - const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) + const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream) { int dispatchedCount = 0; @@ -1862,7 +1877,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess { try { +#ifdef ICE_CPP11_MAPPING + heartbeatCallback(shared_from_this()); +#else heartbeatCallback->heartbeat(shared_from_this()); +#endif } catch(const std::exception& ex) { @@ -1941,7 +1960,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_closeCallback && !_heartbeatCallback) { finish(close); return; @@ -2080,12 +2099,14 @@ Ice::ConnectionI::finish(bool close) _readStream.clear(); _readStream.b.clear(); - if(_callback) + if(_closeCallback) { - closeCallback(_callback); - _callback = 0; + closeCallback(_closeCallback); + _closeCallback = 0; } + _heartbeatCallback = 0; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -2264,7 +2285,8 @@ Ice::ConnectionI::create(const CommunicatorPtr& communicator, Ice::ConnectionI::~ConnectionI() { assert(!_startCallback); - assert(!_callback); + assert(!_closeCallback); + assert(!_heartbeatCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); @@ -3206,7 +3228,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncBasePtr& outAsync, ICE_HEARTBEAT_CALLBACK& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3433,9 +3455,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case validateConnectionMsg: { traceRecv(stream, _logger, _traceLevels); - if(_callback) + if(_heartbeatCallback) { - heartbeatCallback = _callback; + heartbeatCallback = _heartbeatCallback; ++dispatchCount; } break; diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index e920b894a62..6911b4fea1c 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -204,7 +204,13 @@ public: virtual void end_flushBatchRequests(const AsyncResultPtr&); #endif - virtual void setCallback(const ConnectionCallbackPtr&); +#ifdef ICE_CPP11_MAPPING + virtual void setCloseCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)>); + virtual void setHeartbeatCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)>); +#else + virtual void setCloseCallback(const Ice::CloseCallbackPtr&); + virtual void setHeartbeatCallback(const Ice::HeartbeatCallbackPtr&); +#endif virtual void setACM(const IceUtil::Optional<int>&, const IceUtil::Optional<ACMClose>&, const IceUtil::Optional<ACMHeartbeat>&); @@ -252,11 +258,10 @@ public: void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncBasePtr&, - const ConnectionCallbackPtr&, IceInternal::BasicStream&); + const ICE_HEARTBEAT_CALLBACK&, IceInternal::BasicStream&); void finish(bool); - void closeCallback(const ConnectionCallbackPtr&); - + void closeCallback(const ICE_CLOSE_CALLBACK&); virtual ~ConnectionI(); @@ -304,7 +309,7 @@ private: IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&, IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, - IceInternal::OutgoingAsyncBasePtr&, ConnectionCallbackPtr&, int&); + IceInternal::OutgoingAsyncBasePtr&, ICE_HEARTBEAT_CALLBACK&, int&); void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); @@ -384,7 +389,9 @@ private: bool _initialized; bool _validated; - Ice::ConnectionCallbackPtr _callback; + ICE_CLOSE_CALLBACK _closeCallback; + ICE_HEARTBEAT_CALLBACK _heartbeatCallback; + }; } diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp index 07cc86a0512..94f89b95df3 100644 --- a/cpp/src/IceGrid/ReapThread.cpp +++ b/cpp/src/IceGrid/ReapThread.cpp @@ -16,28 +16,41 @@ using namespace IceGrid; namespace { -class ConnectionCallbackI : public Ice::ConnectionCallback +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) + CloseCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) { } - + virtual void - heartbeat(const Ice::ConnectionPtr& con) + closed(const Ice::ConnectionPtr& con) + { + _reaper->connectionClosed(con); + } + +private: + + const ReapThreadPtr _reaper; +}; + +class HeartbeatCallbackI : public Ice::HeartbeatCallback +{ +public: + + HeartbeatCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) { - _reaper->connectionHeartbeat(con); } virtual void - closed(const Ice::ConnectionPtr& con) + heartbeat(const Ice::ConnectionPtr& con) { - _reaper->connectionClosed(con); + _reaper->connectionHeartbeat(con); } private: - + const ReapThreadPtr _reaper; }; @@ -45,7 +58,8 @@ private: ReapThread::ReapThread() : IceUtil::Thread("Icegrid reaper thread"), - _callback(new ConnectionCallbackI(this)), + _closeCallback(new CloseCallbackI(this)), + _heartbeatCallback(new HeartbeatCallbackI(this)), _terminated(false) { } @@ -76,7 +90,7 @@ ReapThread::run() { timedWait(_wakeInterval); } - + if(_terminated) { break; @@ -118,7 +132,8 @@ ReapThread::run() q->second.erase(p->item); if(q->second.empty()) { - p->connection->setCallback(0); + p->connection->setCloseCallback(0); + p->connection->setHeartbeatCallback(0); _connections.erase(q); } } @@ -152,10 +167,12 @@ ReapThread::terminate() for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p) { - p->first->setCallback(0); + p->first->setCloseCallback(0); + p->first->setHeartbeatCallback(0); } _connections.clear(); - _callback = 0; + _closeCallback = 0; + _heartbeatCallback = 0; } for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p) @@ -198,7 +215,9 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP if(p == _connections.end()) { p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first; - connection->setCallback(_callback); + connection->setCloseCallback(_closeCallback); + connection->setHeartbeatCallback(_heartbeatCallback); + } p->second.insert(reapable); } @@ -213,7 +232,7 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP { notify(); } - + // // Since we just added a new session with a non null timeout there // must be a non-zero wakeInterval. @@ -222,14 +241,15 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP } } -void +void ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con) { Lock sync(*this); map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con); if(p == _connections.end()) { - con->setCallback(0); + con->setCloseCallback(0); + con->setHeartbeatCallback(0); return; } @@ -246,7 +266,8 @@ ReapThread::connectionClosed(const Ice::ConnectionPtr& con) map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con); if(p == _connections.end()) { - con->setCallback(0); + con->setCloseCallback(0); + con->setHeartbeatCallback(0); return; } @@ -258,7 +279,7 @@ ReapThread::connectionClosed(const Ice::ConnectionPtr& con) } // -// Returns true if the calculated wake interval is less than the current wake +// Returns true if the calculated wake interval is less than the current wake // interval (or if the original wake interval was "forever"). // bool diff --git a/cpp/src/IceGrid/ReapThread.h b/cpp/src/IceGrid/ReapThread.h index 94c4dd9d1e7..b5ac79643de 100644 --- a/cpp/src/IceGrid/ReapThread.h +++ b/cpp/src/IceGrid/ReapThread.h @@ -42,10 +42,10 @@ template<class T> class SessionReapable : public Reapable { typedef IceUtil::Handle<T> TPtr; - + public: - - SessionReapable(const Ice::LoggerPtr& logger, const TPtr& session) : + + SessionReapable(const Ice::LoggerPtr& logger, const TPtr& session) : _logger(logger), _session(session) { } @@ -53,7 +53,7 @@ public: virtual ~SessionReapable() { } - + virtual IceUtil::Time timestamp() const { @@ -94,15 +94,15 @@ template<class T> class SessionReapableWithHeartbeat : public SessionReapable<T> { typedef IceUtil::Handle<T> TPtr; - + public: - SessionReapableWithHeartbeat(const Ice::LoggerPtr& logger, const TPtr& session) : + SessionReapableWithHeartbeat(const Ice::LoggerPtr& logger, const TPtr& session) : SessionReapable<T>(logger, session) { } - virtual void + virtual void heartbeat() const { try @@ -121,7 +121,7 @@ class ReapThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mute public: ReapThread(); - + virtual void run(); void terminate(); void add(const ReapablePtr&, int, const Ice::ConnectionPtr& = Ice::ConnectionPtr()); @@ -132,8 +132,9 @@ public: private: bool calcWakeInterval(); - - Ice::ConnectionCallbackPtr _callback; + + Ice::CloseCallbackPtr _closeCallback; + Ice::HeartbeatCallbackPtr _heartbeatCallback; IceUtil::Time _wakeInterval; bool _terminated; struct ReapableItem diff --git a/cpp/src/Slice/ObjCUtil.cpp b/cpp/src/Slice/ObjCUtil.cpp index 5655b34139e..77a4a9af86c 100644 --- a/cpp/src/Slice/ObjCUtil.cpp +++ b/cpp/src/Slice/ObjCUtil.cpp @@ -488,7 +488,7 @@ Slice::ObjCGenerator::mapsToPointerType(const TypePtr& type) ClassDeclPtr cl = ClassDeclPtr::dynamicCast(type); if(cl && cl->isInterface()) { - if(cl->isLocal()) + if(cl->isLocal() || (cl->definition() && cl->definition()->isDelegate())) { return false; } diff --git a/cpp/src/slice2objc/Gen.cpp b/cpp/src/slice2objc/Gen.cpp index eae190d4a5f..f31146e05ab 100644 --- a/cpp/src/slice2objc/Gen.cpp +++ b/cpp/src/slice2objc/Gen.cpp @@ -450,6 +450,32 @@ Slice::ObjCVisitor::getParams(const OperationPtr& op) const } string +Slice::ObjCVisitor::getBlockParams(const OperationPtr& op) const +{ + string result; + ParamDeclList paramList = op->parameters(); + for(ParamDeclList::const_iterator q = paramList.begin(); q != paramList.end(); ++q) + { + TypePtr type = (*q)->type(); + string typeString; + if((*q)->isOutParam()) + { + typeString = outTypeToString(type, (*q)->optional(), false, true); + } + else + { + typeString = inTypeToString(type, (*q)->optional()); + } + if(q != paramList.begin()) + { + result += " " + getParamId(*q); + } + result += "(" + typeString + ")"; + } + return result; +} + +string Slice::ObjCVisitor::getMarshalParams(const OperationPtr& op) const { ParamDeclList paramList = op->parameters(); @@ -868,6 +894,11 @@ Slice::Gen::ObjectDeclVisitor::ObjectDeclVisitor(Output& H, Output& M, const str void Slice::Gen::ObjectDeclVisitor::visitClassDecl(const ClassDeclPtr& p) { + if(p->definition() && p->definition()->isDelegate()) + { + return; + } + _H << sp; if(!p->isLocal() || !p->isInterface()) { @@ -925,7 +956,16 @@ Slice::Gen::TypesVisitor::visitClassDefStart(const ClassDefPtr& p) string name = fixName(p); ClassList bases = p->bases(); + if(p->isDelegate()) + { + OperationPtr o = p->allOperations().front(); + _H << sp << nl << "typedef " << typeToString(o->returnType()); + _H << " (^" << name << ")" << getBlockParams(o) << ";"; + return false; + } + _H << sp << nl << _dllExport << "@protocol " << name; + if(!bases.empty()) { _H << " <"; diff --git a/cpp/src/slice2objc/Gen.h b/cpp/src/slice2objc/Gen.h index 70a42ad37f7..ec9657d21c5 100644 --- a/cpp/src/slice2objc/Gen.h +++ b/cpp/src/slice2objc/Gen.h @@ -30,6 +30,7 @@ protected: virtual std::string getName(const OperationPtr&) const; virtual std::string getSelector(const OperationPtr&) const; virtual std::string getParams(const OperationPtr&) const; + virtual std::string getBlockParams(const OperationPtr&) const; virtual std::string getMarshalParams(const OperationPtr&) const; virtual std::string getUnmarshalParams(const OperationPtr&) const; virtual std::string getServerParams(const OperationPtr&) const; |