diff options
author | Joe George <joe@zeroc.com> | 2015-12-23 14:48:40 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2015-12-24 10:01:11 -0500 |
commit | e84da5f580821cae8dab292e19cc1296c07a8ed5 (patch) | |
tree | 1e8783c55c7dccd5adda2b87b47b8a7c118a1147 | |
parent | Fixes related to EnableSharedFromThis (diff) | |
download | ice-e84da5f580821cae8dab292e19cc1296c07a8ed5.tar.bz2 ice-e84da5f580821cae8dab292e19cc1296c07a8ed5.tar.xz ice-e84da5f580821cae8dab292e19cc1296c07a8ed5.zip |
ICE-6898 - "Delegate" functions for ACM callbacks
- Add delegate local interfaces CloseCallback and HeartbeatCallback and
remove ConnectionCallback.
- Replace setCallback by setCloseCallback and setHeartbeatCallback
38 files changed, 731 insertions, 502 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md index b260555c29d..8c22fe45811 100644 --- a/CHANGELOG-3.7.md +++ b/CHANGELOG-3.7.md @@ -44,3 +44,7 @@ These are the changes since Ice 3.6.1. `findValueFactory`. - Renamed local interface metadata `async` to `async-oneway`. + +- Replaced `ConnectionCallback` by delegates `CloseCallback` and `HeartbeatCallback`. + Also replaced `setCallback` by `setCloseCallback` and `setHeartbeatCallback` on + the `Connection` interface. diff --git a/cpp/include/IceUtil/Config.h b/cpp/include/IceUtil/Config.h index d204bd8f071..492839920bf 100644 --- a/cpp/include/IceUtil/Config.h +++ b/cpp/include/IceUtil/Config.h @@ -267,6 +267,8 @@ typedef long long Int64; # define ICE_CHECKED_CAST(T,V) Ice::checkedCast<T>(V) # define ICE_UNCHECKED_CAST(T,V) Ice::uncheckedCast<T>(V) # define ICE_VALUE_FACTORY ::std::function<::Ice::ValuePtr (const std::string& type)> +# define ICE_CLOSE_CALLBACK ::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> +# define ICE_HEARTBEAT_CALLBACK ::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> #else // C++98 mapping # define ICE_HANDLE ::IceUtil::Handle # define ICE_INTERNAL_HANDLE ::IceInternal::Handle @@ -280,6 +282,8 @@ typedef long long Int64; # define ICE_CHECKED_CAST(T,V) T::checkedCast(V) # define ICE_UNCHECKED_CAST(T,V) T::uncheckedCast(V) # define ICE_VALUE_FACTORY ::Ice::ValueFactoryPtr +# define ICE_CLOSE_CALLBACK ::Ice::CloseCallbackPtr +# define ICE_HEARTBEAT_CALLBACK ::Ice::HeartbeatCallbackPtr #endif #endif diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp index 47f18f7e8d7..d2c05f27e16 100644 --- a/cpp/src/Glacier2/SessionRouterI.cpp +++ b/cpp/src/Glacier2/SessionRouterI.cpp @@ -36,7 +36,7 @@ public: void finished(const Ice::AsyncResultPtr& r) - { + { Ice::ObjectPrx o = r->getProxy(); try { @@ -68,7 +68,7 @@ public: void finished(const Ice::AsyncResultPtr& r) - { + { Ice::ObjectPrx o = r->getProxy(); try { @@ -84,7 +84,7 @@ public: _router->destroySession(_connection); } } - + private: const SessionRouterIPtr _router; @@ -99,7 +99,7 @@ class SessionControlI : public SessionControl { public: - SessionControlI(const SessionRouterIPtr& sessionRouter, const ConnectionPtr& connection, + SessionControlI(const SessionRouterIPtr& sessionRouter, const ConnectionPtr& connection, const FilterManagerPtr& filterManager) : _sessionRouter(sessionRouter), _connection(connection), @@ -122,7 +122,7 @@ public: virtual IdentitySetPrx identities(const Current&) { - return _filters->identitiesPrx(); + return _filters->identitiesPrx(); } virtual int @@ -130,7 +130,7 @@ public: { return static_cast<int>(_sessionRouter->getSessionTimeout(current)); } - + virtual void destroy(const Current&) { @@ -153,7 +153,7 @@ public: _sessionRouter(sessionRouter) { } - + virtual ObjectPtr locate(const Current& current, LocalObjectPtr&) { @@ -183,7 +183,7 @@ public: _sessionRouter(sessionRouter) { } - + virtual ObjectPtr locate(const Current& current, LocalObjectPtr&) { @@ -209,14 +209,14 @@ class UserPasswordCreateSession : public CreateSession { public: - UserPasswordCreateSession(const AMD_Router_createSessionPtr& amdCB, const string& user, const string& password, + UserPasswordCreateSession(const AMD_Router_createSessionPtr& amdCB, const string& user, const string& password, const Ice::Current& current, const SessionRouterIPtr& sessionRouter) : CreateSession(sessionRouter, user, current), - _amdCB(amdCB), + _amdCB(amdCB), _password(password) { } - + void checkPermissionsResponse(bool ok, const string& reason) @@ -251,7 +251,7 @@ public: Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - + _sessionRouter->_verifier->begin_checkPermissions(_user, _password, ctx, newCallback_PermissionsVerifier_checkPermissions(this, &UserPasswordCreateSession::checkPermissionsResponse, @@ -272,10 +272,10 @@ public: _sessionRouter->_sessionManager->begin_create(_user, _control, ctx, newCallback_SessionManager_create( static_cast<CreateSession*>(this), - &CreateSession::sessionCreated, + &CreateSession::sessionCreated, &CreateSession::createException)); } - + virtual void finished(const SessionPrx& session) { @@ -298,14 +298,14 @@ class SSLCreateSession : public CreateSession { public: - SSLCreateSession(const AMD_Router_createSessionFromSecureConnectionPtr& amdCB, const string& user, + SSLCreateSession(const AMD_Router_createSessionFromSecureConnectionPtr& amdCB, const string& user, const SSLInfo& sslInfo, const Ice::Current& current, const SessionRouterIPtr& sessionRouter) : CreateSession(sessionRouter, user, current), - _amdCB(amdCB), + _amdCB(amdCB), _sslInfo(sslInfo) { } - + void authorizeResponse(bool ok, const string& reason) { @@ -339,9 +339,9 @@ public: Ice::Context ctx = _current.ctx; ctx.insert(_context.begin(), _context.end()); - _sessionRouter->_sslVerifier->begin_authorize(_sslInfo, ctx, + _sessionRouter->_sslVerifier->begin_authorize(_sslInfo, ctx, newCallback_SSLPermissionsVerifier_authorize(this, - &SSLCreateSession::authorizeResponse, + &SSLCreateSession::authorizeResponse, &SSLCreateSession::authorizeException)); } @@ -359,10 +359,10 @@ public: _sessionRouter->_sslSessionManager->begin_create(_sslInfo, _control, ctx, newCallback_SSLSessionManager_create( static_cast<CreateSession*>(this), - &CreateSession::sessionCreated, + &CreateSession::sessionCreated, &CreateSession::createException)); } - + virtual void finished(const SessionPrx& session) { @@ -381,32 +381,45 @@ private: const SSLInfo _sslInfo; }; -class ConnectionCallbackI : public Ice::ConnectionCallback +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(const SessionRouterIPtr& sessionRouter) : _sessionRouter(sessionRouter) + CloseCallbackI(const SessionRouterIPtr& sessionRouter) : _sessionRouter(sessionRouter) { } - + virtual void - heartbeat(const Ice::ConnectionPtr& connection) + closed(const Ice::ConnectionPtr& connection) { try { - _sessionRouter->refreshSession(connection); + _sessionRouter->destroySession(connection); } catch(const Ice::Exception&) { } } +private: + + const SessionRouterIPtr _sessionRouter; +}; + +class HeartbeatCallbackI : public Ice::HeartbeatCallback +{ +public: + + HeartbeatCallbackI(const SessionRouterIPtr& sessionRouter) : _sessionRouter(sessionRouter) + { + } + virtual void - closed(const Ice::ConnectionPtr& connection) + heartbeat(const Ice::ConnectionPtr& connection) { try { - _sessionRouter->destroySession(connection); + _sessionRouter->refreshSession(connection); } catch(const Ice::Exception&) { @@ -571,7 +584,7 @@ CreateSession::sessionCreated(const SessionPrx& session) // Notify the router that the creation is finished. // try - { + { _sessionRouter->finishCreateSession(_current.con, router); finished(session); } @@ -601,13 +614,13 @@ void CreateSession::exception(const Ice::Exception& ex) { try - { + { _sessionRouter->finishCreateSession(_current.con, 0); } catch(const Ice::Exception&) { } - + finished(ex); if(_control) @@ -640,7 +653,8 @@ SessionRouterI::SessionRouterI(const InstancePtr& instance, _sslVerifier(sslVerifier), _sslSessionManager(sslSessionManager), _sessionTimeout(IceUtil::Time::seconds(_instance->properties()->getPropertyAsInt("Glacier2.SessionTimeout"))), - _connectionCallback(new ConnectionCallbackI(this)), + _closeCallback(new CloseCallbackI(this)), + _heartbeatCallback(new HeartbeatCallbackI(this)), _sessionThread(_sessionTimeout > IceUtil::Time() ? new SessionThread(this, _sessionTimeout) : 0), _routersByConnectionHint(_routersByConnection.end()), _routersByCategoryHint(_routersByCategory.end()), @@ -671,7 +685,7 @@ SessionRouterI::SessionRouterI(const InstancePtr& instance, // a router servant based on connection information. // _instance->clientObjectAdapter()->addServantLocator(new ClientLocator(this), ""); - + // // If there is a server object adapter, all calls on this adapter // are dispatched to a router servant based on the category field @@ -709,21 +723,22 @@ SessionRouterI::destroy() Callback_Session_destroyPtr destroyCallback; { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + assert(!_destroy); _destroy = true; notify(); - + _routersByConnection.swap(routers); _routersByConnectionHint = _routersByConnection.end(); - + _routersByCategory.clear(); _routersByCategoryHint = _routersByCategory.end(); - + sessionThread = _sessionThread; _sessionThread = 0; - _connectionCallback = 0; + _closeCallback = 0; + _heartbeatCallback = 0; swap(destroyCallback, _sessionDestroyCallback); // Break cyclic reference count. } @@ -780,7 +795,7 @@ SessionRouterI::getCategoryForClient(const Ice::Current& current) const } void -SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& amdCB, const std::string& userId, +SessionRouterI::createSession_async(const AMD_Router_createSessionPtr& amdCB, const std::string& userId, const std::string& password, const Current& current) { if(!_verifier) @@ -913,14 +928,14 @@ SessionRouterI::destroySession(const ConnectionPtr& connection) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + if(_destroy) { throw ObjectNotExistException(__FILE__, __LINE__); } - - map<ConnectionPtr, RouterIPtr>::iterator p; - + + map<ConnectionPtr, RouterIPtr>::iterator p; + if(_routersByConnectionHint != _routersByConnection.end() && _routersByConnectionHint->first == connection) { p = _routersByConnectionHint; @@ -929,17 +944,17 @@ SessionRouterI::destroySession(const ConnectionPtr& connection) { p = _routersByConnection.find(connection); } - + if(p == _routersByConnection.end()) { throw SessionNotExistException(); } - + router = p->second; _routersByConnection.erase(p++); _routersByConnectionHint = p; - + if(_instance->serverObjectAdapter()) { string category = router->getServerProxy(Current())->ice_getIdentity().category; @@ -973,7 +988,7 @@ SessionRouterI::getACMTimeout(const Ice::Current& current) const return current.con->getACM().timeout; } -void +void SessionRouterI::updateSessionObservers() { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); @@ -984,7 +999,7 @@ SessionRouterI::updateSessionObservers() for(map<ConnectionPtr, RouterIPtr>::iterator p = _routersByConnection.begin(); p != _routersByConnection.end(); ++p) { p->second->updateObserver(observer); - } + } } RouterIPtr @@ -1017,7 +1032,7 @@ SessionRouterI::getServerBlobject(const string& category) const { return _routersByCategoryHint->second->getServerBlobject(); } - + map<string, RouterIPtr>::iterator p = routers.find(category); if(p != routers.end()) @@ -1035,30 +1050,30 @@ void SessionRouterI::expireSessions() { vector<RouterIPtr> routers; - + { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + if(_destroy) { return; } - + assert(_sessionTimeout > IceUtil::Time()); IceUtil::Time minTimestamp = IceUtil::Time::now(IceUtil::Time::Monotonic) - _sessionTimeout; - + map<ConnectionPtr, RouterIPtr>::iterator p = _routersByConnection.begin(); - + while(p != _routersByConnection.end()) { if(p->second->getTimestamp() < minTimestamp) { RouterIPtr router = p->second; routers.push_back(router); - + _routersByConnection.erase(p++); _routersByConnectionHint = p; - + if(_instance->serverObjectAdapter()) { string category = router->getServerProxy(Current())->ice_getIdentity().category; @@ -1073,7 +1088,7 @@ SessionRouterI::expireSessions() } } } - + // // We destroy the expired routers outside the thread // synchronization, to avoid deadlocks. @@ -1104,7 +1119,7 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi _routersByConnectionHint->second->updateTimestamp(); return _routersByConnectionHint->second; } - + map<ConnectionPtr, RouterIPtr>::iterator p = routers.find(connection); if(p != routers.end()) @@ -1141,7 +1156,7 @@ bool SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionPtr& connection) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + if(_destroy) { CannotCreateSessionException exc; @@ -1153,7 +1168,7 @@ SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionP // Check whether a session already exists for the connection. // { - map<ConnectionPtr, RouterIPtr>::iterator p; + map<ConnectionPtr, RouterIPtr>::iterator p; if(_routersByConnectionHint != _routersByConnection.end() && _routersByConnectionHint->first == connection) { @@ -1163,7 +1178,7 @@ SessionRouterI::startCreateSession(const CreateSessionPtr& cb, const ConnectionP { p = _routersByConnection.find(connection); } - + if(p != _routersByConnection.end()) { CannotCreateSessionException exc; @@ -1198,14 +1213,14 @@ void SessionRouterI::finishCreateSession(const ConnectionPtr& connection, const RouterIPtr& router) { IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - + // // Signal other threads that we are done with trying to // establish a session for our connection; // _pending.erase(connection); notify(); - + if(!router) { return; @@ -1219,21 +1234,22 @@ SessionRouterI::finishCreateSession(const ConnectionPtr& connection, const Route exc.reason = "router is shutting down"; throw exc; } - + _routersByConnectionHint = _routersByConnection.insert( _routersByConnectionHint, pair<const ConnectionPtr, RouterIPtr>(connection, router)); - + if(_instance->serverObjectAdapter()) { string category = router->getServerProxy()->ice_getIdentity().category; assert(!category.empty()); - pair<map<string, RouterIPtr>::iterator, bool> rc = + pair<map<string, RouterIPtr>::iterator, bool> rc = _routersByCategory.insert(pair<const string, RouterIPtr>(category, router)); assert(rc.second); _routersByCategoryHint = rc.first; } - connection->setCallback(_connectionCallback); + connection->setCloseCallback(_closeCallback); + connection->setHeartbeatCallback(_heartbeatCallback); if(_sessionTraceLevel >= 1) { @@ -1277,7 +1293,7 @@ SessionRouterI::SessionThread::run() { return; } - + assert(_sessionTimeout > IceUtil::Time()); timedWait(_sessionTimeout / 4); diff --git a/cpp/src/Glacier2/SessionRouterI.h b/cpp/src/Glacier2/SessionRouterI.h index 10b784fdadb..252bfcc0c4e 100644 --- a/cpp/src/Glacier2/SessionRouterI.h +++ b/cpp/src/Glacier2/SessionRouterI.h @@ -148,7 +148,8 @@ private: const SSLSessionManagerPrx _sslSessionManager; IceUtil::Time _sessionTimeout; - Ice::ConnectionCallbackPtr _connectionCallback; + Ice::CloseCallbackPtr _closeCallback; + Ice::HeartbeatCallbackPtr _heartbeatCallback; class SessionThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> { diff --git a/cpp/src/Glacier2Lib/Application.cpp b/cpp/src/Glacier2Lib/Application.cpp index 729f90aa21f..dbd21240c8f 100644 --- a/cpp/src/Glacier2Lib/Application.cpp +++ b/cpp/src/Glacier2Lib/Application.cpp @@ -13,7 +13,7 @@ using namespace std; using namespace Ice; - + Ice::ObjectAdapterPtr Glacier2::Application::_adapter; Glacier2::RouterPrxPtr Glacier2::Application::_router; Glacier2::SessionPrxPtr Glacier2::Application::_session; @@ -22,31 +22,26 @@ string Glacier2::Application::_category; namespace { - -class ConnectionCallbackI : public Ice::ConnectionCallback +#ifndef ICE_CPP11_MAPPING // C++98 +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(Glacier2::Application* app) : _app(app) + CloseCallbackI(Glacier2::Application* app) : _app(app) { } virtual void - heartbeat(const Ice::ConnectionPtr&) - { - } - - virtual void closed(const Ice::ConnectionPtr&) { _app->sessionDestroyed(); } - + private: Glacier2::Application* _app; }; - +#endif } string @@ -154,7 +149,7 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat { IceInternal::Application::_communicator = Ice::initialize(args, initData); _router = ICE_UNCHECKED_CAST(Glacier2::RouterPrx, communicator()->getDefaultRouter()); - + if(!_router) { Error out(getProcessLogger()); @@ -188,7 +183,7 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat { Ice::Int acmTimeout = 0; try - { + { acmTimeout = _router->getACMTimeout(); } catch(const Ice::OperationNotExistException&) @@ -204,7 +199,14 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat Ice::ConnectionPtr connection = _router->ice_getCachedConnection(); assert(connection); connection->setACM(acmTimeout, IceUtil::None, Ice::HeartbeatAlways); - connection->setCallback(ICE_MAKE_SHARED(ConnectionCallbackI, this)); +#ifdef ICE_CPP11_MAPPING + connection->setCloseCallback([this](const Ice::ConnectionPtr&) + { + this->sessionDestroyed(); + }); +#else + connection->setCloseCallback(ICE_MAKE_SHARED(CloseCallbackI, this)); +#endif } _category = _router->getCategoryForClient(); diff --git a/cpp/src/Glacier2Lib/SessionHelper.cpp b/cpp/src/Glacier2Lib/SessionHelper.cpp index 541267fb51f..5ef4a88db88 100644 --- a/cpp/src/Glacier2Lib/SessionHelper.cpp +++ b/cpp/src/Glacier2Lib/SessionHelper.cpp @@ -707,16 +707,12 @@ private: const Glacier2::SessionHelperPtr _session; }; -class ConnectionCallbackI : public Ice::ConnectionCallback +#ifndef ICE_CPP11_MAPPING // C++98 +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(const SessionHelperIPtr& sessionHelper) : _sessionHelper(sessionHelper) - { - } - - virtual void - heartbeat(const Ice::ConnectionPtr&) + CloseCallbackI(const SessionHelperIPtr& sessionHelper) : _sessionHelper(sessionHelper) { } @@ -730,6 +726,7 @@ private: SessionHelperIPtr _sessionHelper; }; +#endif } @@ -792,7 +789,15 @@ SessionHelperI::connected(const Glacier2::RouterPrxPtr& router, const Glacier2:: Ice::ConnectionPtr connection = _router->ice_getCachedConnection(); assert(connection); connection->setACM(acmTimeout, IceUtil::None, Ice::HeartbeatAlways); - connection->setCallback(ICE_MAKE_SHARED(ConnectionCallbackI, shared_from_this())); +#ifdef ICE_CPP11_MAPPING + auto self(shared_from_this()); + connection->setCloseCallback([self](const Ice::ConnectionPtr&) + { + self->destroy(); + }); +#else + connection->setCloseCallback(ICE_MAKE_SHARED(CloseCallbackI, shared_from_this())); +#endif } } } @@ -1115,7 +1120,7 @@ Glacier2::SessionFactoryHelper::connect() map<string, string> context; { IceUtil::Mutex::Lock sync(_mutex); - session = ICE_MAKE_SHARED(SessionHelperI, + session = ICE_MAKE_SHARED(SessionHelperI, ICE_MAKE_SHARED(SessionThreadCallback, shared_from_this()), _callback, createInitData(), diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index afd26ebdb32..f86bf0e6b58 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -47,7 +47,6 @@ namespace const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - class TimeoutCallback : public IceUtil::TimerTask { public: @@ -74,7 +73,7 @@ public: DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, + const OutgoingAsyncBasePtr& outAsync, const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream) : DispatchWorkItem(connection), _connection(connection), @@ -110,7 +109,7 @@ private: const ServantManagerPtr _servantManager; const ObjectAdapterPtr _adapter; const OutgoingAsyncBasePtr _outAsync; - const ConnectionCallbackPtr _heartbeatCallback; + const ICE_HEARTBEAT_CALLBACK _heartbeatCallback; BasicStream _stream; }; @@ -540,7 +539,7 @@ Ice::ConnectionI::updateObserver() } assert(_instance->initializationData().observer); - + ConnectionObserverPtr o = _instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), _endpoint, toConnectionState(_state), @@ -959,50 +958,66 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) #endif void -Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) +#ifdef ICE_CPP11_MAPPING +Ice::ConnectionI::setHeartbeatCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback) +#else +Ice::ConnectionI::setHeartbeatCallback(const Ice::HeartbeatCallbackPtr& callback) +#endif { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _heartbeatCallback = callback; +} +void +#ifdef ICE_CPP11_MAPPING +Ice::ConnectionI::setCloseCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback) +#else +Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback) +#endif +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) + if(callback) { - if(callback) + class CallbackWorkItem : public DispatchWorkItem { - class CallbackWorkItem : public DispatchWorkItem - { - public: + public: - CallbackWorkItem(const ConnectionIPtr& connection, const ConnectionCallbackPtr& callback) : - _connection(connection), - _callback(callback) - { - } + CallbackWorkItem(const ConnectionIPtr& connection, const ICE_CLOSE_CALLBACK& callback) : + _connection(connection), + _callback(callback) + { + } - virtual void run() - { - _connection->closeCallback(_callback); - } + virtual void run() + { + _connection->closeCallback(_callback); + } - private: + private: - const ConnectionIPtr _connection; - const ConnectionCallbackPtr _callback; - }; - _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback)); - } - } - else - { - _callback = callback; + const ConnectionIPtr _connection; + const ICE_CLOSE_CALLBACK _callback; + }; + _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback)); } } + else + { + _closeCallback = callback; + } } void -Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback) +Ice::ConnectionI::closeCallback(const ICE_CLOSE_CALLBACK& callback) { try { +#ifdef ICE_CPP11_MAPPING + callback(shared_from_this()); +#else callback->closed(shared_from_this()); +#endif } catch(const std::exception& ex) { @@ -1537,7 +1552,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ServantManagerPtr servantManager; ObjectAdapterPtr adapter; OutgoingAsyncBasePtr outAsync; - ConnectionCallbackPtr heartbeatCallback; + ICE_HEARTBEAT_CALLBACK heartbeatCallback; int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); @@ -1807,7 +1822,7 @@ void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync, - const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) + const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream) { int dispatchedCount = 0; @@ -1862,7 +1877,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess { try { +#ifdef ICE_CPP11_MAPPING + heartbeatCallback(shared_from_this()); +#else heartbeatCallback->heartbeat(shared_from_this()); +#endif } catch(const std::exception& ex) { @@ -1941,7 +1960,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_closeCallback && !_heartbeatCallback) { finish(close); return; @@ -2080,12 +2099,14 @@ Ice::ConnectionI::finish(bool close) _readStream.clear(); _readStream.b.clear(); - if(_callback) + if(_closeCallback) { - closeCallback(_callback); - _callback = 0; + closeCallback(_closeCallback); + _closeCallback = 0; } + _heartbeatCallback = 0; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -2264,7 +2285,8 @@ Ice::ConnectionI::create(const CommunicatorPtr& communicator, Ice::ConnectionI::~ConnectionI() { assert(!_startCallback); - assert(!_callback); + assert(!_closeCallback); + assert(!_heartbeatCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); @@ -3206,7 +3228,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncBasePtr& outAsync, ICE_HEARTBEAT_CALLBACK& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3433,9 +3455,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case validateConnectionMsg: { traceRecv(stream, _logger, _traceLevels); - if(_callback) + if(_heartbeatCallback) { - heartbeatCallback = _callback; + heartbeatCallback = _heartbeatCallback; ++dispatchCount; } break; diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index e920b894a62..6911b4fea1c 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -204,7 +204,13 @@ public: virtual void end_flushBatchRequests(const AsyncResultPtr&); #endif - virtual void setCallback(const ConnectionCallbackPtr&); +#ifdef ICE_CPP11_MAPPING + virtual void setCloseCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)>); + virtual void setHeartbeatCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)>); +#else + virtual void setCloseCallback(const Ice::CloseCallbackPtr&); + virtual void setHeartbeatCallback(const Ice::HeartbeatCallbackPtr&); +#endif virtual void setACM(const IceUtil::Optional<int>&, const IceUtil::Optional<ACMClose>&, const IceUtil::Optional<ACMHeartbeat>&); @@ -252,11 +258,10 @@ public: void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncBasePtr&, - const ConnectionCallbackPtr&, IceInternal::BasicStream&); + const ICE_HEARTBEAT_CALLBACK&, IceInternal::BasicStream&); void finish(bool); - void closeCallback(const ConnectionCallbackPtr&); - + void closeCallback(const ICE_CLOSE_CALLBACK&); virtual ~ConnectionI(); @@ -304,7 +309,7 @@ private: IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&, IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, - IceInternal::OutgoingAsyncBasePtr&, ConnectionCallbackPtr&, int&); + IceInternal::OutgoingAsyncBasePtr&, ICE_HEARTBEAT_CALLBACK&, int&); void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); @@ -384,7 +389,9 @@ private: bool _initialized; bool _validated; - Ice::ConnectionCallbackPtr _callback; + ICE_CLOSE_CALLBACK _closeCallback; + ICE_HEARTBEAT_CALLBACK _heartbeatCallback; + }; } diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp index 07cc86a0512..94f89b95df3 100644 --- a/cpp/src/IceGrid/ReapThread.cpp +++ b/cpp/src/IceGrid/ReapThread.cpp @@ -16,28 +16,41 @@ using namespace IceGrid; namespace { -class ConnectionCallbackI : public Ice::ConnectionCallback +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) + CloseCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) { } - + virtual void - heartbeat(const Ice::ConnectionPtr& con) + closed(const Ice::ConnectionPtr& con) + { + _reaper->connectionClosed(con); + } + +private: + + const ReapThreadPtr _reaper; +}; + +class HeartbeatCallbackI : public Ice::HeartbeatCallback +{ +public: + + HeartbeatCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) { - _reaper->connectionHeartbeat(con); } virtual void - closed(const Ice::ConnectionPtr& con) + heartbeat(const Ice::ConnectionPtr& con) { - _reaper->connectionClosed(con); + _reaper->connectionHeartbeat(con); } private: - + const ReapThreadPtr _reaper; }; @@ -45,7 +58,8 @@ private: ReapThread::ReapThread() : IceUtil::Thread("Icegrid reaper thread"), - _callback(new ConnectionCallbackI(this)), + _closeCallback(new CloseCallbackI(this)), + _heartbeatCallback(new HeartbeatCallbackI(this)), _terminated(false) { } @@ -76,7 +90,7 @@ ReapThread::run() { timedWait(_wakeInterval); } - + if(_terminated) { break; @@ -118,7 +132,8 @@ ReapThread::run() q->second.erase(p->item); if(q->second.empty()) { - p->connection->setCallback(0); + p->connection->setCloseCallback(0); + p->connection->setHeartbeatCallback(0); _connections.erase(q); } } @@ -152,10 +167,12 @@ ReapThread::terminate() for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p) { - p->first->setCallback(0); + p->first->setCloseCallback(0); + p->first->setHeartbeatCallback(0); } _connections.clear(); - _callback = 0; + _closeCallback = 0; + _heartbeatCallback = 0; } for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p) @@ -198,7 +215,9 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP if(p == _connections.end()) { p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first; - connection->setCallback(_callback); + connection->setCloseCallback(_closeCallback); + connection->setHeartbeatCallback(_heartbeatCallback); + } p->second.insert(reapable); } @@ -213,7 +232,7 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP { notify(); } - + // // Since we just added a new session with a non null timeout there // must be a non-zero wakeInterval. @@ -222,14 +241,15 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP } } -void +void ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con) { Lock sync(*this); map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con); if(p == _connections.end()) { - con->setCallback(0); + con->setCloseCallback(0); + con->setHeartbeatCallback(0); return; } @@ -246,7 +266,8 @@ ReapThread::connectionClosed(const Ice::ConnectionPtr& con) map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con); if(p == _connections.end()) { - con->setCallback(0); + con->setCloseCallback(0); + con->setHeartbeatCallback(0); return; } @@ -258,7 +279,7 @@ ReapThread::connectionClosed(const Ice::ConnectionPtr& con) } // -// Returns true if the calculated wake interval is less than the current wake +// Returns true if the calculated wake interval is less than the current wake // interval (or if the original wake interval was "forever"). // bool diff --git a/cpp/src/IceGrid/ReapThread.h b/cpp/src/IceGrid/ReapThread.h index 94c4dd9d1e7..b5ac79643de 100644 --- a/cpp/src/IceGrid/ReapThread.h +++ b/cpp/src/IceGrid/ReapThread.h @@ -42,10 +42,10 @@ template<class T> class SessionReapable : public Reapable { typedef IceUtil::Handle<T> TPtr; - + public: - - SessionReapable(const Ice::LoggerPtr& logger, const TPtr& session) : + + SessionReapable(const Ice::LoggerPtr& logger, const TPtr& session) : _logger(logger), _session(session) { } @@ -53,7 +53,7 @@ public: virtual ~SessionReapable() { } - + virtual IceUtil::Time timestamp() const { @@ -94,15 +94,15 @@ template<class T> class SessionReapableWithHeartbeat : public SessionReapable<T> { typedef IceUtil::Handle<T> TPtr; - + public: - SessionReapableWithHeartbeat(const Ice::LoggerPtr& logger, const TPtr& session) : + SessionReapableWithHeartbeat(const Ice::LoggerPtr& logger, const TPtr& session) : SessionReapable<T>(logger, session) { } - virtual void + virtual void heartbeat() const { try @@ -121,7 +121,7 @@ class ReapThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mute public: ReapThread(); - + virtual void run(); void terminate(); void add(const ReapablePtr&, int, const Ice::ConnectionPtr& = Ice::ConnectionPtr()); @@ -132,8 +132,9 @@ public: private: bool calcWakeInterval(); - - Ice::ConnectionCallbackPtr _callback; + + Ice::CloseCallbackPtr _closeCallback; + Ice::HeartbeatCallbackPtr _heartbeatCallback; IceUtil::Time _wakeInterval; bool _terminated; struct ReapableItem diff --git a/cpp/src/Slice/ObjCUtil.cpp b/cpp/src/Slice/ObjCUtil.cpp index 5655b34139e..77a4a9af86c 100644 --- a/cpp/src/Slice/ObjCUtil.cpp +++ b/cpp/src/Slice/ObjCUtil.cpp @@ -488,7 +488,7 @@ Slice::ObjCGenerator::mapsToPointerType(const TypePtr& type) ClassDeclPtr cl = ClassDeclPtr::dynamicCast(type); if(cl && cl->isInterface()) { - if(cl->isLocal()) + if(cl->isLocal() || (cl->definition() && cl->definition()->isDelegate())) { return false; } diff --git a/cpp/src/slice2objc/Gen.cpp b/cpp/src/slice2objc/Gen.cpp index eae190d4a5f..f31146e05ab 100644 --- a/cpp/src/slice2objc/Gen.cpp +++ b/cpp/src/slice2objc/Gen.cpp @@ -450,6 +450,32 @@ Slice::ObjCVisitor::getParams(const OperationPtr& op) const } string +Slice::ObjCVisitor::getBlockParams(const OperationPtr& op) const +{ + string result; + ParamDeclList paramList = op->parameters(); + for(ParamDeclList::const_iterator q = paramList.begin(); q != paramList.end(); ++q) + { + TypePtr type = (*q)->type(); + string typeString; + if((*q)->isOutParam()) + { + typeString = outTypeToString(type, (*q)->optional(), false, true); + } + else + { + typeString = inTypeToString(type, (*q)->optional()); + } + if(q != paramList.begin()) + { + result += " " + getParamId(*q); + } + result += "(" + typeString + ")"; + } + return result; +} + +string Slice::ObjCVisitor::getMarshalParams(const OperationPtr& op) const { ParamDeclList paramList = op->parameters(); @@ -868,6 +894,11 @@ Slice::Gen::ObjectDeclVisitor::ObjectDeclVisitor(Output& H, Output& M, const str void Slice::Gen::ObjectDeclVisitor::visitClassDecl(const ClassDeclPtr& p) { + if(p->definition() && p->definition()->isDelegate()) + { + return; + } + _H << sp; if(!p->isLocal() || !p->isInterface()) { @@ -925,7 +956,16 @@ Slice::Gen::TypesVisitor::visitClassDefStart(const ClassDefPtr& p) string name = fixName(p); ClassList bases = p->bases(); + if(p->isDelegate()) + { + OperationPtr o = p->allOperations().front(); + _H << sp << nl << "typedef " << typeToString(o->returnType()); + _H << " (^" << name << ")" << getBlockParams(o) << ";"; + return false; + } + _H << sp << nl << _dllExport << "@protocol " << name; + if(!bases.empty()) { _H << " <"; diff --git a/cpp/src/slice2objc/Gen.h b/cpp/src/slice2objc/Gen.h index 70a42ad37f7..ec9657d21c5 100644 --- a/cpp/src/slice2objc/Gen.h +++ b/cpp/src/slice2objc/Gen.h @@ -30,6 +30,7 @@ protected: virtual std::string getName(const OperationPtr&) const; virtual std::string getSelector(const OperationPtr&) const; virtual std::string getParams(const OperationPtr&) const; + virtual std::string getBlockParams(const OperationPtr&) const; virtual std::string getMarshalParams(const OperationPtr&) const; virtual std::string getUnmarshalParams(const OperationPtr&) const; virtual std::string getServerParams(const OperationPtr&) const; diff --git a/cpp/test/Ice/acm/AllTests.cpp b/cpp/test/Ice/acm/AllTests.cpp index 7a1982df020..bbebe5183ae 100644 --- a/cpp/test/Ice/acm/AllTests.cpp +++ b/cpp/test/Ice/acm/AllTests.cpp @@ -25,7 +25,7 @@ toString(int value) } class LoggerI : public Ice::EnableSharedFromThis<LoggerI>, - public Ice::Logger, + public Ice::Logger, private IceUtil::Mutex { public: @@ -115,13 +115,13 @@ private: }; ICE_DEFINE_PTR(LoggerIPtr, LoggerI); -class TestCase : +class TestCase : #ifdef ICE_CPP11_MAPPING - public enable_shared_from_this<Ice::ConnectionCallback>, + public enable_shared_from_this<TestCase>, #else public IceUtil::Thread, + public Ice::CloseCallback, public Ice::HeartbeatCallback, #endif - public Ice::ConnectionCallback, protected IceUtil::Monitor<IceUtil::Mutex> { public: @@ -198,7 +198,20 @@ public: _adapter->getTestIntf()->ice_toString())); try { - proxy->ice_getConnection()->setCallback(ICE_SHARED_FROM_THIS); +#ifdef ICE_CPP11_MAPPING + auto self(shared_from_this()); + proxy->ice_getConnection()->setCloseCallback([this, self](const Ice::ConnectionPtr& connection) + { + closed(connection); + }); + proxy->ice_getConnection()->setHeartbeatCallback([this, self](const Ice::ConnectionPtr& connection) + { + heartbeat(connection); + }); +#else + proxy->ice_getConnection()->setCloseCallback(ICE_SHARED_FROM_THIS); + proxy->ice_getConnection()->setHeartbeatCallback(ICE_SHARED_FROM_THIS); +#endif runTestCase(_adapter, proxy); } catch(const std::exception& ex) @@ -579,7 +592,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { (*p)->init(); } - + #ifdef ICE_CPP11_MAPPING vector<pair<thread, TestCasePtr>> threads; for(auto p = tests.begin(); p != tests.end(); ++p) diff --git a/cpp/test/Ice/acm/TestI.cpp b/cpp/test/Ice/acm/TestI.cpp index aa0ce6dd9af..a0e45d50d12 100644 --- a/cpp/test/Ice/acm/TestI.cpp +++ b/cpp/test/Ice/acm/TestI.cpp @@ -26,9 +26,11 @@ toString(int value) return os.str(); } -class ConnectionCallbackI : public Ice::ConnectionCallback, +class HeartbeatCallbackI : #ifdef ICE_CPP11_MAPPING - public enable_shared_from_this<Ice::ConnectionCallback>, + public enable_shared_from_this<HeartbeatCallbackI>, +#else + public Ice::HeartbeatCallback, #endif private IceUtil::Monitor<IceUtil::Mutex> { @@ -45,8 +47,6 @@ public: } } -private: - virtual void heartbeat(const Ice::ConnectionPtr&) { @@ -55,14 +55,11 @@ private: notifyAll(); } - virtual void - closed(const Ice::ConnectionPtr&) - { - } +private: int _count; }; -ICE_DEFINE_PTR(ConnectionCallbackIPtr, ConnectionCallbackI); +ICE_DEFINE_PTR(HeartbeatCallbackIPtr, HeartbeatCallbackI); } @@ -93,7 +90,7 @@ RemoteCommunicatorI::createObjectAdapter(int timeout, int close, int heartbeat, } properties->setProperty(name + ".ThreadPool.Size", "2"); ObjectAdapterPtr adapter = com->createObjectAdapterWithEndpoints(name, protocol + opts); - + return ICE_UNCHECKED_CAST(RemoteObjectAdapterPrx, current.adapter->addWithUUID( ICE_MAKE_SHARED(RemoteObjectAdapterI, adapter))); } @@ -167,7 +164,14 @@ TestI::interruptSleep(const Ice::Current& current) void TestI::waitForHeartbeat(int count, const Ice::Current& current) { - ConnectionCallbackIPtr callback = ICE_MAKE_SHARED(ConnectionCallbackI); - current.con->setCallback(callback); + HeartbeatCallbackIPtr callback = ICE_MAKE_SHARED(HeartbeatCallbackI); +#ifdef ICE_CPP11_MAPPING + current.con->setHeartbeatCallback([callback](const Ice::ConnectionPtr& connection) + { + callback->heartbeat(connection); + }); +#else + current.con->setHeartbeatCallback(callback); +#endif callback->waitForCount(count); } diff --git a/csharp/src/Glacier2/Application.cs b/csharp/src/Glacier2/Application.cs index b5d4aa604c2..403aa13018d 100644 --- a/csharp/src/Glacier2/Application.cs +++ b/csharp/src/Glacier2/Application.cs @@ -205,26 +205,6 @@ public abstract class Application : Ice.Application return _adapter; } - private class ConnectionCallbackI : Ice.ConnectionCallback - { - internal ConnectionCallbackI(Application application) - { - _application = application; - } - - public void heartbeat(Ice.Connection con) - { - - } - - public void closed(Ice.Connection con) - { - _application.sessionDestroyed(); - } - - private readonly Application _application; - } - protected override int doMain(string[] originArgs, Ice.InitializationData initData) { @@ -319,7 +299,7 @@ public abstract class Application : Ice.Application Ice.Connection connection = _router.ice_getCachedConnection(); Debug.Assert(connection != null); connection.setACM((int)acmTimeout, Ice.Util.None, Ice.ACMHeartbeat.HeartbeatAlways); - connection.setCallback(new ConnectionCallbackI(this)); + connection.setCloseCallback(_ => sessionDestroyed()); } _category = _router.getCategoryForClient(); status = runWithSession(args); diff --git a/csharp/src/Glacier2/SessionHelper.cs b/csharp/src/Glacier2/SessionHelper.cs index ec7b026e2af..d4e55e26fe1 100644 --- a/csharp/src/Glacier2/SessionHelper.cs +++ b/csharp/src/Glacier2/SessionHelper.cs @@ -20,26 +20,6 @@ namespace Glacier2 /// </summary> public class SessionHelper { - private class ConnectionCallbackI : Ice.ConnectionCallback - { - internal ConnectionCallbackI(SessionHelper sessionHelper) - { - _sessionHelper = sessionHelper; - } - - public void heartbeat(Ice.Connection con) - { - - } - - public void closed(Ice.Connection con) - { - _sessionHelper.destroy(); - } - - private readonly SessionHelper _sessionHelper; - } - /// <summary> /// Creates a Glacier2 session. /// </summary> @@ -320,7 +300,7 @@ public class SessionHelper Ice.Connection connection = _router.ice_getCachedConnection(); Debug.Assert(connection != null); connection.setACM(acmTimeout, Ice.Util.None, Ice.ACMHeartbeat.HeartbeatAlways); - connection.setCallback(new ConnectionCallbackI(this)); + connection.setCloseCallback(_ => destroy()); } } diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index e8d355a08a1..5a32202f437 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -507,7 +507,7 @@ namespace Ice return result; } - public void setCallback(ConnectionCallback callback) + public void setCloseCallback(CloseCallback callback) { lock(this) { @@ -519,7 +519,7 @@ namespace Ice { try { - callback.closed(this); + callback(this); } catch(System.Exception ex) { @@ -530,11 +530,19 @@ namespace Ice } else { - _callback = callback; + _closeCallback = callback; } } } + public void setHeartbeatCallback(HeartbeatCallback callback) + { + lock(this) + { + _heartbeatCallback = callback; + } + } + public void setACM(Optional<int> timeout, Optional<ACMClose> close, Optional<ACMHeartbeat> heartbeat) { lock(this) @@ -1249,7 +1257,7 @@ namespace Ice { try { - info.heartbeatCallback.heartbeat(this); + info.heartbeatCallback(this); } catch(System.Exception ex) { @@ -1324,7 +1332,8 @@ namespace Ice // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _callback == null) + if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && + _closeCallback == null && _heartbeatCallback == null) { finish(); return; @@ -1453,19 +1462,21 @@ namespace Ice _readStream.getBuffer().clear(); _incomingCache = null; - if(_callback != null) + if(_closeCallback != null) { try { - _callback.closed(this); + _closeCallback(this); } catch(System.Exception ex) { _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } - _callback = null; + _closeCallback = null; } + _heartbeatCallback = null; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -2353,7 +2364,7 @@ namespace Ice public ObjectAdapter adapter; public IceInternal.OutgoingAsyncBase outAsync; public Ice.AsyncCallback completedCallback; - public ConnectionCallback heartbeatCallback; + public HeartbeatCallback heartbeatCallback; public int messageDispatchCount; } @@ -2511,9 +2522,9 @@ namespace Ice case IceInternal.Protocol.validateConnectionMsg: { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); - if(_callback != null) + if(_heartbeatCallback != null) { - info.heartbeatCallback = _callback; + info.heartbeatCallback = _heartbeatCallback; ++info.messageDispatchCount; } break; @@ -2986,7 +2997,8 @@ namespace Ice private Ice.ConnectionInfo _info; - private Ice.ConnectionCallback _callback; + private Ice.CloseCallback _closeCallback; + private Ice.HeartbeatCallback _heartbeatCallback; private static ConnectionState[] connectionStateMap = new ConnectionState[] { ConnectionState.ConnectionStateValidating, // StateNotInitialized diff --git a/csharp/test/Ice/acm/AllTests.cs b/csharp/test/Ice/acm/AllTests.cs index ccce971e443..160c396d2ca 100644 --- a/csharp/test/Ice/acm/AllTests.cs +++ b/csharp/test/Ice/acm/AllTests.cs @@ -107,7 +107,7 @@ class LoggerI : Ice.Logger private List<string> _messages = new List<string>(); }; -abstract class TestCase : Ice.ConnectionCallback +abstract class TestCase { public TestCase(string name, RemoteCommunicatorPrx com) { @@ -187,7 +187,23 @@ abstract class TestCase : Ice.ConnectionCallback _adapter.getTestIntf().ToString())); try { - proxy.ice_getConnection().setCallback(this); + proxy.ice_getConnection().setCloseCallback(_=> + { + lock(this) + { + _closed = true; + Monitor.Pulse(this); + } + }); + + proxy.ice_getConnection().setHeartbeatCallback(_=> + { + lock(this) + { + ++_heartbeat; + } + }); + runTestCase(_adapter, proxy); } catch(Exception ex) @@ -196,23 +212,6 @@ abstract class TestCase : Ice.ConnectionCallback } } - public void heartbeat(Ice.Connection con) - { - lock(this) - { - ++_heartbeat; - } - } - - public void closed(Ice.Connection con) - { - lock(this) - { - _closed = true; - Monitor.Pulse(this); - } - } - public void waitForClosed() { lock(this) diff --git a/csharp/test/Ice/acm/TestI.cs b/csharp/test/Ice/acm/TestI.cs index c67bc7736e0..fbcf0b1c53b 100644 --- a/csharp/test/Ice/acm/TestI.cs +++ b/csharp/test/Ice/acm/TestI.cs @@ -111,7 +111,7 @@ public class TestI : TestIntfDisp_ } } - class ConnectionCallbackI : Ice.ConnectionCallback + class HeartbeatCallbackI { public void heartbeat(Ice.Connection c) { @@ -122,10 +122,6 @@ public class TestI : TestIntfDisp_ } } - public void closed(Ice.Connection c) - { - } - public void waitForCount(int count) { lock(this) @@ -145,8 +141,8 @@ public class TestI : TestIntfDisp_ { - ConnectionCallbackI callback = new ConnectionCallbackI(); - current.con.setCallback(callback); + HeartbeatCallbackI callback = new HeartbeatCallbackI(); + current.con.setHeartbeatCallback(callback.heartbeat); callback.waitForCount(count); } }; diff --git a/csharp/test/Ice/background/AllTests.cs b/csharp/test/Ice/background/AllTests.cs index 37c287d9b9c..27db77ebf64 100644 --- a/csharp/test/Ice/background/AllTests.cs +++ b/csharp/test/Ice/background/AllTests.cs @@ -603,11 +603,8 @@ public class AllTests thread2.Join(); } - private sealed class CloseCallback : Callback, Ice.ConnectionCallback + private sealed class CloseCallback : Callback { - public void heartbeat(Ice.Connection con) - { - } public void closed(Ice.Connection con) { @@ -621,7 +618,7 @@ public class AllTests private static void closeConnection(Ice.ObjectPrx prx) { CloseCallback cb = new CloseCallback(); - prx.ice_getConnection().setCallback(cb); + prx.ice_getConnection().setCloseCallback(cb.closed); prx.ice_getConnection().close(false); cb.check(); } diff --git a/java/src/Glacier2/src/main/java/Glacier2/Application.java b/java/src/Glacier2/src/main/java/Glacier2/Application.java index 400b57b28b6..b26028bee01 100644 --- a/java/src/Glacier2/src/main/java/Glacier2/Application.java +++ b/java/src/Glacier2/src/main/java/Glacier2/Application.java @@ -233,15 +233,9 @@ public abstract class Application extends Ice.Application return _adapter; } - private class ConnectionCallbackI implements Ice.ConnectionCallback + private class CloseCallbackI implements Ice.CloseCallback { @Override - public void heartbeat(Ice.Connection con) - { - - } - - @Override public void closed(Ice.Connection con) { sessionDestroyed(); @@ -346,7 +340,7 @@ public abstract class Application extends Ice.Application connection.setACM(new Ice.IntOptional(acmTimeout), null, new Ice.Optional<Ice.ACMHeartbeat>(Ice.ACMHeartbeat.HeartbeatAlways)); - connection.setCallback(new ConnectionCallbackI()); + connection.setCloseCallback(new CloseCallbackI()); } _category = _router.getCategoryForClient(); status.value = runWithSession(argHolder.value); diff --git a/java/src/Glacier2/src/main/java/Glacier2/SessionHelper.java b/java/src/Glacier2/src/main/java/Glacier2/SessionHelper.java index 83adf1fc6e1..1d936f45b31 100644 --- a/java/src/Glacier2/src/main/java/Glacier2/SessionHelper.java +++ b/java/src/Glacier2/src/main/java/Glacier2/SessionHelper.java @@ -327,19 +327,14 @@ public class SessionHelper connection.setACM(new Ice.IntOptional(acmTimeout), null, new Ice.Optional<Ice.ACMHeartbeat>(Ice.ACMHeartbeat.HeartbeatAlways)); - connection.setCallback(new Ice.ConnectionCallback() - { - @Override - public void heartbeat(Ice.Connection con) - { - } - - @Override - public void closed(Ice.Connection con) - { - destroy(); - } - }); + connection.setCloseCallback(new Ice.CloseCallback() + { + @Override + public void closed(Ice.Connection con) + { + destroy(); + } + }); } _shutdownHook = new Thread("Shutdown hook") diff --git a/java/src/Ice/src/main/java/Ice/CommunicatorI.java b/java/src/Ice/src/main/java/Ice/CommunicatorI.java index de854848e34..18ec854f19d 100644 --- a/java/src/Ice/src/main/java/Ice/CommunicatorI.java +++ b/java/src/Ice/src/main/java/Ice/CommunicatorI.java @@ -122,11 +122,11 @@ public final class CommunicatorI implements Communicator return _instance.objectAdapterFactory().createObjectAdapter(name, router); } - @Override + @Override @SuppressWarnings("deprecation") public void addObjectFactory(ObjectFactory factory, String id) { _instance.servantFactoryManager().add(factory, id); } - @Override + @Override @SuppressWarnings("deprecation") public ObjectFactory findObjectFactory(String id) { return _instance.servantFactoryManager().findObjectFactory(id); } @@ -271,9 +271,9 @@ public final class CommunicatorI implements Communicator // This callback object receives the results of all invocations // of Connection.begin_flushBatchRequests. // - IceInternal.CommunicatorFlushBatch result = new IceInternal.CommunicatorFlushBatch(this, - _instance, - __flushBatchRequests_name, + IceInternal.CommunicatorFlushBatch result = new IceInternal.CommunicatorFlushBatch(this, + _instance, + __flushBatchRequests_name, cb); connectionFactory.flushAsyncBatchRequests(result); @@ -332,7 +332,7 @@ public final class CommunicatorI implements Communicator { return _instance.findAdminFacet(facet); } - + @Override public java.util.Map<String, Ice.Object> findAllAdminFacets() diff --git a/java/src/Ice/src/main/java/Ice/ConnectionI.java b/java/src/Ice/src/main/java/Ice/ConnectionI.java index d33ae1d3c20..b126bf2a858 100644 --- a/java/src/Ice/src/main/java/Ice/ConnectionI.java +++ b/java/src/Ice/src/main/java/Ice/ConnectionI.java @@ -482,36 +482,39 @@ public final class ConnectionI extends IceInternal.EventHandler } @Override - synchronized public void setCallback(final ConnectionCallback callback) + synchronized public void setCloseCallback(final CloseCallback callback) { - synchronized(this) + if(_state >= StateClosed) { - if(_state >= StateClosed) + if(callback != null) { - if(callback != null) + _threadPool.dispatch(new IceInternal.DispatchWorkItem(this) { - _threadPool.dispatch(new IceInternal.DispatchWorkItem(this) + @Override + public void run() { - @Override - public void run() + try { - try - { - callback.closed(ConnectionI.this); - } - catch(Exception ex) - { - _logger.error("connection callback exception:\n" + ex + '\n' + _desc); - } + callback.closed(ConnectionI.this); } - }); - } - } - else - { - _callback = callback; + catch(Exception ex) + { + _logger.error("connection callback exception:\n" + ex + '\n' + _desc); + } + } + }); } } + else + { + _closeCallback = callback; + } + } + + @Override + synchronized public void setHeartbeatCallback(final HeartbeatCallback callback) + { + _heartbeatCallback = callback; } @Override @@ -1243,7 +1246,8 @@ public final class ConnectionI extends IceInternal.EventHandler // promoting a new leader and unecessary thread creation, especially if // this is called on shutdown). // - if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null) + if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && + _closeCallback == null && _heartbeatCallback == null) { finish(close); return; @@ -1378,19 +1382,21 @@ public final class ConnectionI extends IceInternal.EventHandler _readStream.clear(); _readStream.getBuffer().clear(); - if(_callback != null) + if(_closeCallback != null) { try { - _callback.closed(this); + _closeCallback.closed(this); } catch(Exception ex) { _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } - _callback = null; + _closeCallback = null; } + _heartbeatCallback = null; + // // This must be done last as this will cause waitUntilFinished() to // return (and communicator objects such as the timer might be destroyed @@ -2342,7 +2348,7 @@ public final class ConnectionI extends IceInternal.EventHandler IceInternal.ServantManager servantManager; ObjectAdapter adapter; IceInternal.OutgoingAsyncBase outAsync; - ConnectionCallback heartbeatCallback; + HeartbeatCallback heartbeatCallback; int messageDispatchCount; } @@ -2482,9 +2488,9 @@ public final class ConnectionI extends IceInternal.EventHandler case IceInternal.Protocol.validateConnectionMsg: { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); - if(_callback != null) + if(_heartbeatCallback != null) { - info.heartbeatCallback = _callback; + info.heartbeatCallback = _heartbeatCallback; ++info.messageDispatchCount; } break; @@ -3001,7 +3007,8 @@ public final class ConnectionI extends IceInternal.EventHandler private Ice.ConnectionInfo _info; - private ConnectionCallback _callback; + private CloseCallback _closeCallback; + private HeartbeatCallback _heartbeatCallback; private static Ice.Instrumentation.ConnectionState connectionStateMap[] = { Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized diff --git a/java/src/Ice/src/main/java/IceInternal/ValueFactoryManager.java b/java/src/Ice/src/main/java/IceInternal/ValueFactoryManager.java index f685d088b9a..c0bf96148c9 100644 --- a/java/src/Ice/src/main/java/IceInternal/ValueFactoryManager.java +++ b/java/src/Ice/src/main/java/IceInternal/ValueFactoryManager.java @@ -9,6 +9,7 @@ package IceInternal; +@SuppressWarnings("deprecation") public final class ValueFactoryManager { public synchronized void diff --git a/java/src/IceGridGUI/src/main/java/IceGridGUI/SessionKeeper.java b/java/src/IceGridGUI/src/main/java/IceGridGUI/SessionKeeper.java index abbeae1f98c..83120e71740 100644 --- a/java/src/IceGridGUI/src/main/java/IceGridGUI/SessionKeeper.java +++ b/java/src/IceGridGUI/src/main/java/IceGridGUI/SessionKeeper.java @@ -224,17 +224,11 @@ public class SessionKeeper null, new Ice.Optional<Ice.ACMHeartbeat>(Ice.ACMHeartbeat.HeartbeatAlways)); - _session.ice_getConnection().setCallback( - new Ice.ConnectionCallback() + _session.ice_getConnection().setCloseCallback( + new Ice.CloseCallback() { @Override public void - heartbeat(Ice.Connection con) - { - } - - @Override - public void closed(Ice.Connection con) { try @@ -437,7 +431,7 @@ public class SessionKeeper _adapter = null; } - _session.ice_getConnection().setCallback(null); + _session.ice_getConnection().setCloseCallback(null); if(destroySession) { @@ -1190,16 +1184,16 @@ public class SessionKeeper { Ice.LocatorPrx prx = Ice.LocatorPrxHelper.uncheckedCast( communicator.stringToProxy( - communicator.identityToString(locator.ice_getIdentity()) + + communicator.identityToString(locator.ice_getIdentity()) + ":" + e.toString())); - + if(_directDiscoveryEndpointModel.indexOf(prx) == -1) { _directDiscoveryEndpointModel.addElement(prx); } } - if(_directDiscoveryEndpointModel.size() > 0 && + if(_directDiscoveryEndpointModel.size() > 0 && _directDiscoveryEndpointList.getSelectedIndex() == -1) { _directDiscoveryEndpointList.setSelectedIndex(0); @@ -1231,7 +1225,7 @@ public class SessionKeeper final String intf = properties.getProperty("IceGridAdmin.Discovery.Interface"); String lookupEndpoints = properties.getProperty("IceGridAdmin.Discovery.Lookup"); String address; - if(properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0 && + if(properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0 && properties.getPropertyAsInt("Ice.PreferIPv6Address") <= 0) { address = "239.255.0.1"; @@ -1257,9 +1251,9 @@ public class SessionKeeper try { final LookupPrx lookupPrx = LookupPrxHelper.uncheckedCast( - communicator.stringToProxy("IceLocatorDiscovery/Lookup -d:" + + communicator.stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints).ice_collocationOptimized(false).ice_router(null)); - + new Thread(new Runnable() { @Override @@ -1268,15 +1262,15 @@ public class SessionKeeper synchronized(SessionKeeper.this) { // - // If search is in progress when refresh is hit, cancel the + // If search is in progress when refresh is hit, cancel the // finish task we will schedule a new one with this new // search. - // + // if(_discoveryFinishTask != null) { _discoveryFinishTask.cancel(); } - + if(properties.getProperty("IceGridAdmin.Discovery.Reply.Endpoints").isEmpty()) { StringBuilder s = new StringBuilder(); @@ -1287,7 +1281,7 @@ public class SessionKeeper } properties.setProperty("IceGridAdmin.Discovery.Reply.Endpoints", s.toString()); } - + try { if(_discoveryAdapter == null) @@ -1295,11 +1289,11 @@ public class SessionKeeper _discoveryAdapter = communicator.createObjectAdapter( "IceGridAdmin.Discovery.Reply"); _discoveryAdapter.activate(); - _discoveryReplyPrx = + _discoveryReplyPrx = LookupReplyPrxHelper.uncheckedCast( _discoveryAdapter.addWithUUID(_discoveryLookupReply).ice_datagram()); } - + lookupPrx.findLocator("", _discoveryReplyPrx); } catch(final Ice.LocalException ex) @@ -1318,7 +1312,7 @@ public class SessionKeeper } }); } - + // // We schedule a timer task to destroy the discovery adapter after 2 // seconds, the user doesn't need to wait, discovered proxies are @@ -1360,7 +1354,7 @@ public class SessionKeeper _directInstanceName = new JLabel(); _routedInstanceName = new JLabel(); - + // Connection type panel { FormLayout layout = new FormLayout("pref", "pref"); @@ -1420,7 +1414,7 @@ public class SessionKeeper } } }); - + _directDiscoveryEndpointList.addListSelectionListener(new ListSelectionListener() { @Override @@ -1429,7 +1423,7 @@ public class SessionKeeper validatePanel(); } }); - + ButtonGroup group = new ButtonGroup(); _directDiscoveryDiscoveredEndpoint = new JRadioButton(new AbstractAction("Discovered Endpoints") { @@ -1465,7 +1459,7 @@ public class SessionKeeper builder.append(_discoveryStatus, _discoveryRefresh); discoveryStatus = builder.getPanel(); } - + _directDiscoveryManualEndpoint = new JRadioButton(new AbstractAction("Manual Endpoint") { @Override @@ -1479,7 +1473,7 @@ public class SessionKeeper } }); group.add(_directDiscoveryManualEndpoint); - + { FormLayout layout = new FormLayout("pref:grow", "pref"); DefaultFormBuilder builder = new DefaultFormBuilder(layout); @@ -2166,7 +2160,7 @@ public class SessionKeeper } break; } - + case DirectMasterStep: { _cardLayout.show(_cardPanel, WizardStep.DirectDiscoveryChooseStep.toString()); @@ -2184,20 +2178,20 @@ public class SessionKeeper { _cardLayout.show(_cardPanel, WizardStep.DirectEndpointStep.toString()); _wizardSteps.push(WizardStep.DirectEndpointStep); - } - else + } + else { Ice.LocatorPrx locator = _directDiscoveryEndpointList.getSelectedValue(); _directInstanceName.setText(locator.ice_getIdentity().category); _directCustomEndpointValue.setText(locator.ice_getEndpoints()[0].toString()); _directCustomEndpoints.setSelected(true); - + _cardLayout.show(_cardPanel, WizardStep.DirectCustomEnpointStep.toString()); _wizardSteps.push(WizardStep.DirectCustomEnpointStep); } break; } - + case DirectEndpointStep: { if(_directDefaultEndpoints.isSelected()) @@ -2237,7 +2231,7 @@ public class SessionKeeper } else { - _cardLayout.show(_cardPanel, + _cardLayout.show(_cardPanel, WizardStep.DirectUsernamePasswordCredentialsStep.toString()); _wizardSteps.push(WizardStep.DirectUsernamePasswordCredentialsStep); } @@ -2256,7 +2250,7 @@ public class SessionKeeper } break; } - + case RoutedDefaultEndpointStep: { if(_routedDefaultEndpointSSL.isSelected()) @@ -2266,7 +2260,7 @@ public class SessionKeeper } else { - _cardLayout.show(_cardPanel, + _cardLayout.show(_cardPanel, WizardStep.RoutedUsernamePasswordCredentialsStep.toString()); _wizardSteps.push(WizardStep.RoutedUsernamePasswordCredentialsStep); } @@ -2285,7 +2279,7 @@ public class SessionKeeper } break; } - + case DirectCustomEnpointStep: { try @@ -2305,7 +2299,7 @@ public class SessionKeeper } else { - _cardLayout.show(_cardPanel, + _cardLayout.show(_cardPanel, WizardStep.DirectUsernamePasswordCredentialsStep.toString()); _wizardSteps.push(WizardStep.DirectUsernamePasswordCredentialsStep); } @@ -2362,7 +2356,7 @@ public class SessionKeeper } else { - _cardLayout.show(_cardPanel, + _cardLayout.show(_cardPanel, WizardStep.RoutedUsernamePasswordCredentialsStep.toString()); _wizardSteps.push(WizardStep.RoutedUsernamePasswordCredentialsStep); } @@ -2400,7 +2394,7 @@ public class SessionKeeper } break; } - + case X509CertificateStep: { if(_x509CertificateYesButton.isSelected()) @@ -2461,7 +2455,7 @@ public class SessionKeeper } break; } - + default: { break; @@ -2691,7 +2685,7 @@ public class SessionKeeper { if(_directDiscoveryManualEndpoint.isSelected()) { - _directDiscoveryManualEndpoint.requestFocusInWindow(); + _directDiscoveryManualEndpoint.requestFocusInWindow(); } else { @@ -2699,7 +2693,7 @@ public class SessionKeeper } break; } - + case DirectEndpointStep: { if(_directDefaultEndpoints.isSelected()) @@ -2791,7 +2785,7 @@ public class SessionKeeper _routedUsername.requestFocusInWindow(); break; } - + default: { break; @@ -2923,7 +2917,7 @@ public class SessionKeeper validated = _routedUsername.getText() != null && _routedUsername.getText().length() > 0; break; } - + case DirectMasterStep: case RoutedEndpointStep: case DirectEndpointStep: @@ -3242,7 +3236,7 @@ public class SessionKeeper private JRadioButton _directDiscoveryDiscoveredEndpoint; private JLabel _discoveryStatus; private JButton _discoveryRefresh; - + private java.util.TimerTask _discoveryFinishTask; private Ice.ObjectAdapter _discoveryAdapter; private LookupReplyPrx _discoveryReplyPrx; @@ -5594,7 +5588,7 @@ public class SessionKeeper private static AuthDialog _authDialog; private final Coordinator _coordinator; - + private Session _session; private boolean _connectedToMaster = false; private String _replicaName = ""; diff --git a/java/test/src/main/java/test/Ice/acm/AllTests.java b/java/test/src/main/java/test/Ice/acm/AllTests.java index 3a496ec0de5..b0f19a192f6 100644 --- a/java/test/src/main/java/test/Ice/acm/AllTests.java +++ b/java/test/src/main/java/test/Ice/acm/AllTests.java @@ -118,7 +118,7 @@ public class AllTests private java.io.PrintWriter _out; }; - static abstract class TestCase implements Ice.ConnectionCallback + static abstract class TestCase { public TestCase(Application app, String name, RemoteCommunicatorPrx com, PrintWriter out) { @@ -215,7 +215,25 @@ public class AllTests _adapter.getTestIntf().toString())); try { - proxy.ice_getConnection().setCallback(this); + proxy.ice_getConnection().setCloseCallback(new Ice.CloseCallback() + { + @Override + synchronized public void closed(Ice.Connection con) + { + _closed = true; + notify(); + } + }); + + proxy.ice_getConnection().setHeartbeatCallback(new Ice.HeartbeatCallback() + { + @Override + synchronized public void heartbeat(Ice.Connection con) + { + ++_heartbeat; + } + }); + runTestCase(_adapter, proxy); } catch(Exception ex) @@ -224,17 +242,6 @@ public class AllTests } } - synchronized public void heartbeat(Ice.Connection con) - { - ++_heartbeat; - } - - synchronized public void closed(Ice.Connection con) - { - _closed = true; - notify(); - } - public synchronized void waitForClosed() { while(!_closed) diff --git a/java/test/src/main/java/test/Ice/acm/TestI.java b/java/test/src/main/java/test/Ice/acm/TestI.java index 4d06e74876b..3ca29c4b5f1 100644 --- a/java/test/src/main/java/test/Ice/acm/TestI.java +++ b/java/test/src/main/java/test/Ice/acm/TestI.java @@ -52,7 +52,7 @@ public class TestI extends _TestIntfDisp public void waitForHeartbeat(int count, Ice.Current current) { final Ice.Holder<Integer> c = new Ice.Holder<Integer>(count); - Ice.ConnectionCallback callback = new Ice.ConnectionCallback() + Ice.HeartbeatCallback callback = new Ice.HeartbeatCallback() { synchronized public void heartbeat(Ice.Connection connection) { @@ -60,11 +60,8 @@ public class TestI extends _TestIntfDisp notifyAll(); } - public void closed(Ice.Connection connection) - { - } }; - current.con.setCallback(callback); + current.con.setHeartbeatCallback(callback); synchronized(callback) { diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js index 0768a89ff15..2c1711f0edc 100644 --- a/js/src/Ice/ConnectionI.js +++ b/js/src/Ice/ConnectionI.js @@ -140,7 +140,8 @@ var ConnectionI = Class({ { this._servantManager = null; } - this._callback = null; + this._closeCallback = null; + this._heartbeatCallback = null; }, start: function() { @@ -465,7 +466,7 @@ var ConnectionI = Class({ result.__invoke(); return result; }, - setCallback: function(callback) + setCloseCallback: function(callback) { if(this._state >= StateClosed) { @@ -475,7 +476,7 @@ var ConnectionI = Class({ Timer.setImmediate(function() { try { - callback.closed(this); + callback(this); } catch(ex) { @@ -486,9 +487,13 @@ var ConnectionI = Class({ } else { - this._callback = callback; + this._closeCallback = callback; } }, + setHeartbeatCallback: function(callback) + { + this._heartbeatCallback = callback; + }, setACM: function(timeout, close, heartbeat) { if(this._monitor === null || this._state >= StateClosed) @@ -920,7 +925,7 @@ var ConnectionI = Class({ { try { - info.heartbeatCallback.heartbeat(this); + info.heartbeatCallback(this); } catch(ex) { @@ -1063,19 +1068,21 @@ var ConnectionI = Class({ this._writeStream.clear(); this._writeStream.buffer.clear(); - if(this._callback !== null) + if(this._closeCallback !== null) { try { - this._callback.closed(this); + this._closeCallback(this); } catch(ex) { this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc); } - this._callback = null; + this._closeCallback = null; } + _heartbeatCallback = null; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -1843,9 +1850,9 @@ var ConnectionI = Class({ case Protocol.validateConnectionMsg: { TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels); - if(this._callback !== null) + if(this._heartbeatCallback !== null) { - info.heartbeatCallback = this._callback; + info.heartbeatCallback = this._heartbeatCallback; ++this._dispatchCount; } break; diff --git a/js/test/Ice/acm/Client.js b/js/test/Ice/acm/Client.js index 81379ad2507..8a94c51f313 100644 --- a/js/test/Ice/acm/Client.js +++ b/js/test/Ice/acm/Client.js @@ -141,7 +141,14 @@ ).then( function(con) { - con.setCallback(self); + con.setCloseCallback(function(connection) + { + self._closed = true; + }); + con.setHeartbeatCallback(function(connection) + { + ++self._heartbeat; + }); return self.runTestCase(self._adapter, proxy); } ).exception( @@ -151,14 +158,6 @@ } ); }, - heartbeat: function(con) - { - ++this._heartbeat; - }, - closed: function(con) - { - this._closed = true; - }, runTestCase: function(adapter, proxy) { test(false); // Abstract diff --git a/objective-c/src/Ice/ConnectionI.mm b/objective-c/src/Ice/ConnectionI.mm index 0b937452ad5..d84b9685e76 100644 --- a/objective-c/src/Ice/ConnectionI.mm +++ b/objective-c/src/Ice/ConnectionI.mm @@ -92,32 +92,31 @@ namespace { -class ConnectionCallbackI : public Ice::ConnectionCallback +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(id<ICEConnection> connection, id<ICEConnectionCallback> callback) : - _connection(connection), _callback(callback) + CloseCallbackI(id<ICEConnection> connection, ICECloseCallback callback) : + _connection(connection), _callback(Block_copy(callback)) { [_connection retain]; - [_callback retain]; } - ~ConnectionCallbackI() + ~CloseCallbackI() { [_connection release]; [_callback release]; } void - heartbeat(const Ice::ConnectionPtr& connection) + closed(const Ice::ConnectionPtr& connection) { NSException* ex = nil; @autoreleasepool { @try { - [_callback heartbeat:_connection]; + _callback(_connection); } @catch(id e) { @@ -130,15 +129,37 @@ public: } } +private: + + id<ICEConnection> _connection; + ICECloseCallback _callback; +}; + +class HeartbeatCallbackI : public Ice::HeartbeatCallback +{ +public: + + HeartbeatCallbackI(id<ICEConnection> connection, ICEHeartbeatCallback callback) : + _connection(connection), _callback(Block_copy(callback)) + { + [_connection retain]; + } + + ~HeartbeatCallbackI() + { + [_connection release]; + [_callback release]; + } + void - closed(const Ice::ConnectionPtr& connection) + heartbeat(const Ice::ConnectionPtr& connection) { NSException* ex = nil; @autoreleasepool { @try { - [_callback closed:_connection]; + _callback(_connection); } @catch(id e) { @@ -154,7 +175,7 @@ public: private: id<ICEConnection> _connection; - id<ICEConnectionCallback> _callback; + ICEHeartbeatCallback _callback; }; } @@ -267,9 +288,13 @@ private: CONNECTION->end_flushBatchRequests(r); }, result); } --(void) setCallback:(id<ICEConnectionCallback>)callback +-(void) setCloseCallback:(ICECloseCallback)callback; +{ + CONNECTION->setCloseCallback(new CloseCallbackI(self, callback)); +} +-(void) setHeartbeatCallback:(ICEHeartbeatCallback)callback { - CONNECTION->setCallback(new ConnectionCallbackI(self, callback)); + CONNECTION->setHeartbeatCallback(new HeartbeatCallbackI(self, callback)); } -(void) setACM:(id)timeout close:(id)close heartbeat:(id)heartbeat { diff --git a/objective-c/test/Ice/acm/AllTests.m b/objective-c/test/Ice/acm/AllTests.m index 4499108ab4b..c0008c3dc47 100644 --- a/objective-c/test/Ice/acm/AllTests.m +++ b/objective-c/test/Ice/acm/AllTests.m @@ -109,7 +109,7 @@ @class TestCaseThread; -@interface TestCase : NSObject<ICEConnectionCallback> +@interface TestCase : ICELocalObject { NSCondition* _cond; @@ -262,7 +262,20 @@ @try { - [[proxy ice_getConnection] setCallback:self]; + [[proxy ice_getConnection] setCloseCallback:^(id<ICEConnection> connection) + { + [_cond lock]; + _closed = YES; + [_cond signal]; + [_cond unlock]; + }]; + [[proxy ice_getConnection] setHeartbeatCallback:^(id<ICEConnection> connection) + { + [_cond lock]; + ++_heartbeat; + [_cond unlock]; + }]; + [self runTestCase:_adapter proxy:proxy]; } @catch(ICEException* ex) @@ -282,20 +295,6 @@ NSAssert(NO, @"Subclasses need to overwrite this method"); } --(void)closed:(id<ICEConnection>)connection -{ - [_cond lock]; - _closed = YES; - [_cond signal]; - [_cond unlock]; -} --(void)heartbeat:(id<ICEConnection>)connection -{ - [_cond lock]; - ++_heartbeat; - [_cond unlock]; -} - -(void) waitForClosed { [_cond lock]; diff --git a/objective-c/test/Ice/acm/TestI.m b/objective-c/test/Ice/acm/TestI.m index 155bbd154c1..b1e159e2ae4 100644 --- a/objective-c/test/Ice/acm/TestI.m +++ b/objective-c/test/Ice/acm/TestI.m @@ -9,7 +9,7 @@ #import <acm/TestI.h> -@interface ConnectionCallbackI : NSObject<ICEConnectionCallback> +@interface ConnectionCallbackI : NSObject { NSCondition* _cond; int _count; @@ -37,9 +37,6 @@ [_cond signal]; [_cond unlock]; } --(void) closed:(id<ICEConnection>)c -{ -} -(void) waitForCount:(int)count { [_cond lock]; @@ -198,7 +195,11 @@ -(void) waitForHeartbeat:(int)count current:(ICECurrent*)current { ConnectionCallbackI* callback = [ConnectionCallbackI new]; - [current.con setCallback:callback]; + + [current.con setHeartbeatCallback:^(id<ICEConnection> c) + { + [callback heartbeat:c]; + }]; [callback waitForCount:count]; ICE_RELEASE(callback); } diff --git a/python/modules/IcePy/Connection.cpp b/python/modules/IcePy/Connection.cpp index c761419993a..2aa28f893df 100644 --- a/python/modules/IcePy/Connection.cpp +++ b/python/modules/IcePy/Connection.cpp @@ -37,18 +37,18 @@ struct ConnectionObject Ice::CommunicatorPtr* communicator; }; -class ConnectionCallbackI : public Ice::ConnectionCallback +class CloseCallbackI : public Ice::CloseCallback { public: - ConnectionCallbackI(PyObject* cb, PyObject* con) : + CloseCallbackI(PyObject* cb, PyObject* con) : _cb(cb), _con(con) { Py_INCREF(cb); Py_INCREF(con); } - virtual ~ConnectionCallbackI() + virtual ~CloseCallbackI() { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. @@ -56,52 +56,94 @@ public: Py_DECREF(_con); } - virtual void heartbeat(const Ice::ConnectionPtr& con) - { - invoke("heartbeat", con); - } - virtual void closed(const Ice::ConnectionPtr& con) { - invoke("closed", con); + invoke(con); } private: - void invoke(const string& methodName, const Ice::ConnectionPtr& con) + void invoke(const Ice::ConnectionPtr& con) { AdoptThread adoptThread; // Ensure the current thread is able to call into Python. #ifndef NDEBUG ConnectionObject* c = reinterpret_cast<ConnectionObject*>(_con); assert(con == *(c->connection)); #endif - if(!PyObject_HasAttrString(_cb, STRCAST(methodName.c_str()))) + + PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), _con); + assert(_cb); + PyObjectHandle tmp = PyObject_Call(_cb, args.get(), 0); + if(PyErr_Occurred()) { - ostringstream ostr; - ostr << "connection callback object does not define " << methodName << "()"; - string str = ostr.str(); - PyErr_Warn(PyExc_RuntimeWarning, const_cast<char*>(str.c_str())); + PyException ex; // Retrieve it before another Python API call clears it. + + // + // A callback that calls sys.exit() will raise the SystemExit exception. + // This is normally caught by the interpreter, causing it to exit. + // However, we have no way to pass this exception to the interpreter, + // so we act on it directly. + // + ex.checkSystemExit(); + + ex.raise(); } - else + } + + PyObject* _cb; + PyObject* _con; +}; + +class HeartbeatCallbackI : public Ice::HeartbeatCallback +{ +public: + + HeartbeatCallbackI(PyObject* cb, PyObject* con) : + _cb(cb), _con(con) + { + Py_INCREF(cb); + Py_INCREF(con); + } + + virtual ~HeartbeatCallbackI() + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. + + Py_DECREF(_cb); + Py_DECREF(_con); + } + + virtual void heartbeat(const Ice::ConnectionPtr& con) + { + invoke(con); + } + +private: + + void invoke(const Ice::ConnectionPtr& con) + { + AdoptThread adoptThread; // Ensure the current thread is able to call into Python. +#ifndef NDEBUG + ConnectionObject* c = reinterpret_cast<ConnectionObject*>(_con); + assert(con == *(c->connection)); +#endif + + PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), _con); + assert(_cb); + PyObjectHandle tmp = PyObject_Call(_cb, args.get(), 0); + if(PyErr_Occurred()) { - PyObjectHandle args = Py_BuildValue(STRCAST("(O)"), _con); - PyObjectHandle method = PyObject_GetAttrString(_cb, STRCAST(methodName.c_str())); - assert(method.get()); - PyObjectHandle tmp = PyObject_Call(method.get(), args.get(), 0); - if(PyErr_Occurred()) - { - PyException ex; // Retrieve it before another Python API call clears it. - - // - // A callback that calls sys.exit() will raise the SystemExit exception. - // This is normally caught by the interpreter, causing it to exit. - // However, we have no way to pass this exception to the interpreter, - // so we act on it directly. - // - ex.checkSystemExit(); - - ex.raise(); - } + PyException ex; // Retrieve it before another Python API call clears it. + + // + // A callback that calls sys.exit() will raise the SystemExit exception. + // This is normally caught by the interpreter, causing it to exit. + // However, we have no way to pass this exception to the interpreter, + // so we act on it directly. + // + ex.checkSystemExit(); + + ex.raise(); } } @@ -437,22 +479,53 @@ connectionEndFlushBatchRequests(ConnectionObject* self, PyObject* args) extern "C" #endif static PyObject* -connectionSetCallback(ConnectionObject* self, PyObject* args) +connectionSetCloseCallback(ConnectionObject* self, PyObject* args) +{ + assert(self->connection); + + PyObject* callbackType = lookupType("types.FunctionType"); + PyObject* cb; + if(!PyArg_ParseTuple(args, STRCAST("O!"), callbackType, &cb)) + { + return 0; + } + + Ice::CloseCallbackPtr wrapper = new CloseCallbackI(cb, reinterpret_cast<PyObject*>(self)); + try + { + AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. + (*self->connection)->setCloseCallback(wrapper); + } + catch(const Ice::Exception& ex) + { + setPythonException(ex); + return 0; + } + + Py_INCREF(Py_None); + return Py_None; +} + +#ifdef WIN32 +extern "C" +#endif +static PyObject* +connectionSetHeartbeatCallback(ConnectionObject* self, PyObject* args) { assert(self->connection); - PyObject* callbackType = lookupType("Ice.ConnectionCallback"); + PyObject* callbackType = lookupType("types.FunctionType"); PyObject* cb; if(!PyArg_ParseTuple(args, STRCAST("O!"), callbackType, &cb)) { return 0; } - Ice::ConnectionCallbackPtr wrapper = new ConnectionCallbackI(cb, reinterpret_cast<PyObject*>(self)); + Ice::HeartbeatCallbackPtr wrapper = new HeartbeatCallbackI(cb, reinterpret_cast<PyObject*>(self)); try { AllowThreads allowThreads; // Release Python's global interpreter lock during blocking invocations. - (*self->connection)->setCallback(wrapper); + (*self->connection)->setHeartbeatCallback(wrapper); } catch(const Ice::Exception& ex) { @@ -754,8 +827,10 @@ static PyMethodDef ConnectionMethods[] = METH_VARARGS | METH_KEYWORDS, PyDoc_STR(STRCAST("begin_flushBatchRequests([_ex][, _sent]) -> Ice.AsyncResult")) }, { STRCAST("end_flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionEndFlushBatchRequests), METH_VARARGS, PyDoc_STR(STRCAST("end_flushBatchRequests(Ice.AsyncResult) -> None")) }, - { STRCAST("setCallback"), reinterpret_cast<PyCFunction>(connectionSetCallback), METH_VARARGS, - PyDoc_STR(STRCAST("setCallback(Ice.ConnectionCallback) -> None")) }, + { STRCAST("setCloseCallback"), reinterpret_cast<PyCFunction>(connectionSetCloseCallback), METH_VARARGS, + PyDoc_STR(STRCAST("setCloseCallback(Ice.CloseCallback) -> None")) }, + { STRCAST("setHeartbeatCallback"), reinterpret_cast<PyCFunction>(connectionSetHeartbeatCallback), METH_VARARGS, + PyDoc_STR(STRCAST("setHeartbeatCallback(Ice.HeartbeatCallback) -> None")) }, { STRCAST("setACM"), reinterpret_cast<PyCFunction>(connectionSetACM), METH_VARARGS, PyDoc_STR(STRCAST("setACM(int, Ice.ACMClose, Ice.ACMHeartbeat) -> None")) }, { STRCAST("getACM"), reinterpret_cast<PyCFunction>(connectionGetACM), METH_NOARGS, diff --git a/python/test/Ice/acm/AllTests.py b/python/test/Ice/acm/AllTests.py index 60844870090..816c9b94ab4 100644 --- a/python/test/Ice/acm/AllTests.py +++ b/python/test/Ice/acm/AllTests.py @@ -74,7 +74,7 @@ class LoggerI(Ice.Logger): print(p) self._messages = [] -class TestCase(threading.Thread, Ice.ConnectionCallback): +class TestCase(threading.Thread): def __init__(self, name, com): threading.Thread.__init__(self) self._name = name @@ -128,7 +128,9 @@ class TestCase(threading.Thread, Ice.ConnectionCallback): proxy = Test.TestIntfPrx.uncheckedCast(self._communicator.stringToProxy( self._adapter.getTestIntf().ice_toString())) try: - proxy.ice_getConnection().setCallback(self) + proxy.ice_getConnection().setCloseCallback(lambda conn: self.closed(conn)) + proxy.ice_getConnection().setHeartbeatCallback(lambda conn: self.heartbeat(conn)) + self.runTestCase(self._adapter, proxy) except Exception as ex: self._msg = "unexpected exception:\n" + traceback.format_exc() diff --git a/python/test/Ice/acm/TestI.py b/python/test/Ice/acm/TestI.py index 024899b8640..ee0625dc057 100644 --- a/python/test/Ice/acm/TestI.py +++ b/python/test/Ice/acm/TestI.py @@ -79,7 +79,7 @@ class TestIntfI(Test.TestIntf): def waitForHeartbeat(self, count, current=None): - class ConnectionCallbackI(Ice.ConnectionCallback): + class ConnectionCallbackI(): def __init__(self): self.m = threading.Condition() @@ -93,9 +93,6 @@ class TestIntfI(Test.TestIntf): finally: self.m.release() - def closed(self, con): - pass - def waitForCount(self, count): self.m.acquire() self.count = count @@ -106,6 +103,6 @@ class TestIntfI(Test.TestIntf): self.m.release() callback = ConnectionCallbackI() - current.con.setCallback(callback) + current.con.setHeartbeatCallback(lambda con: callback.heartbeat(con)) callback.waitForCount(2) diff --git a/slice/Ice/Connection.ice b/slice/Ice/Connection.ice index 3e032d08fa1..6931b093d1b 100644 --- a/slice/Ice/Connection.ice +++ b/slice/Ice/Connection.ice @@ -68,28 +68,41 @@ local interface Connection; /** * * An application can implement this interface to receive notifications when - * a connection closes or receives a heartbeat message. + * a connection closes. * - * @see Connection#setCallback + * @see Connection#setCloseCallback * **/ -local interface ConnectionCallback +["delegate"] +local interface CloseCallback { /** * - * This method is called by the the connection when a heartbeat is - * received from the peer. + * This method is called by the the connection when the connection + * is closed. * **/ - void heartbeat(Connection con); + void closed(Connection con); +}; +/** + * + * An application can implement this interface to receive notifications when + * a connection receives a heartbeat message. + * + * @see Connection#setHeartbeatCallback + * + **/ +["delegate"] +local interface HeartbeatCallback +{ /** * - * This method is called by the the connection when the connection - * is closed. + * This method is called by the the connection when a heartbeat is + * received from the peer. * **/ - void closed(Connection con); + void heartbeat(Connection con); }; ["cpp:unscoped"] @@ -215,10 +228,21 @@ local interface Connection * connection when it's closed. The callback is called from the * Ice thread pool associated with the connection. * - * @param callback The connection callback object. + * @param callback The closed callback object. + * + **/ + void setCloseCallback(CloseCallback callback); + + /** + * + * Set callback on the connection. The callback is called by the + * connection when a heartbeat is received. The callback is called + * from the Ice thread pool associated with the connection. + * + * @param callback The heartbeat callback object. * **/ - void setCallback(ConnectionCallback callback); + void setHeartbeatCallback(HeartbeatCallback callback); /** * |