diff options
author | Marc Laukien <marc@zeroc.com> | 2003-01-17 03:58:50 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2003-01-17 03:58:50 +0000 |
commit | 8880977a608f0b735504f18a624de026cde62da7 (patch) | |
tree | a8fe5b082caf761337a165e1350a5f65f57bc95d /cpp/src/Ice/Connection.cpp | |
parent | fixed a bug with findObjectAdapter (diff) | |
download | ice-8880977a608f0b735504f18a624de026cde62da7.tar.bz2 ice-8880977a608f0b735504f18a624de026cde62da7.tar.xz ice-8880977a608f0b735504f18a624de026cde62da7.zip |
ConnectionMonitor
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 128 |
1 files changed, 126 insertions, 2 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index d35c4177995..916d2a5e24f 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -20,6 +20,7 @@ #include <Ice/DefaultsAndOverrides.h> #include <Ice/Transceiver.h> #include <Ice/ThreadPool.h> +#include <Ice/ConnectionMonitor.h> #include <Ice/ObjectAdapter.h> #include <Ice/Endpoint.h> #include <Ice/Outgoing.h> @@ -110,6 +111,51 @@ IceInternal::Connection::waitUntilFinished() const } void +IceInternal::Connection::monitor() +{ + try + { + IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this); + + if(_state != StateActive) + { + return; + } + + // + // Check for timed out async requests. + // + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + { + if(p->second->__timedOut()) + { + setState(StateClosed, TimeoutException(__FILE__, __LINE__)); + return; + } + } + + // + // Active connection management for idle connections. + // + if(_acmTimeout > 0) + { + if(_requests.empty() && _asyncRequests.empty() && _batchStream.b.empty() && _dispatchCount == 0) + { + if(IceUtil::Time::now() >= _acmAbsoluteTimeout) + { + setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + return; + } + } + } + } + catch(const IceUtil::ThreadLockedException&) + { + // Ignore. + } +} + +void IceInternal::Connection::validate() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -185,6 +231,21 @@ IceInternal::Connection::validate() // We only print warnings after successful connection validation. // _warn = _instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0; + + // + // We only use active connection management after successful + // connection validation. We don't use active connection + // management for datagram connections at all, because such + // "virtual connections" cannot be reestablished. + // + if(!_endpoint->datagram()) + { + _acmTimeout = _instance->properties()->getPropertyAsInt("Ice.ConnectionIdleTime"); + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + } } void @@ -320,6 +381,11 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool compress) { _requestsHint = _requests.insert(_requests.end(), make_pair(requestId, out)); } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } void @@ -414,6 +480,11 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp // Only add to the request map if there was no exception. // _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), make_pair(requestId, out)); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } void @@ -560,6 +631,11 @@ IceInternal::Connection::flushBatchRequest(bool compress) assert(_exception.get()); _exception->ice_throw(); } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } void @@ -639,6 +715,11 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool compress) { setState(StateClosed, ex); } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } void @@ -727,6 +808,12 @@ void IceInternal::Connection::read(BasicStream& stream) { _transceiver->read(stream, 0); + + // + // Updating _acmAbsoluteTimeout is to expensive here, because we + // would have to acquire a lock just for this purpose. Instead, we + // update _acmAbsoluteTimeout in message(). + // } void @@ -748,6 +835,11 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa IceUtil::ThreadControl::yield(); return; } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } Byte messageType; @@ -1088,6 +1180,24 @@ IceInternal::Connection::toString() const return _transceiver->toString(); } +bool +IceInternal::Connection::operator==(const Connection& r) const +{ + return this == &r; +} + +bool +IceInternal::Connection::operator!=(const Connection& r) const +{ + return this != &r; +} + +bool +IceInternal::Connection::operator<(const Connection& r) const +{ + return this < &r; +} + IceInternal::Connection::Connection(const InstancePtr& instance, const TransceiverPtr& transceiver, const EndpointPtr& endpoint, @@ -1101,6 +1211,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _defaultsAndOverrides(_instance->defaultsAndOverrides()), _registeredWithPool(false), _warn(false), + _acmTimeout(0), _requestHdr(headerSize + 4, 0), _requestBatchHdr(headerSize + 4, 0), _replyHdr(headerSize, 0), @@ -1155,6 +1266,7 @@ IceInternal::Connection::setState(State state, const LocalException& ex) // Don't warn about certain expected exceptions. // if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) @@ -1165,14 +1277,14 @@ IceInternal::Connection::setState(State state, const LocalException& ex) } } - for(std::map< ::Ice::Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) { p->second->finished(*_exception.get()); } _requests.clear(); _requestsHint = _requests.end(); - for(std::map< ::Ice::Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { q->second->__finished(*_exception.get()); } @@ -1317,6 +1429,12 @@ IceInternal::Connection::registerWithPool() } _registeredWithPool = true; + + ConnectionMonitorPtr connectionMonitor = _instance->connectionMonitor(); + if(connectionMonitor) + { + connectionMonitor->add(this); + } } } @@ -1337,6 +1455,12 @@ IceInternal::Connection::unregisterWithPool() } _registeredWithPool = false; + + ConnectionMonitorPtr connectionMonitor = _instance->connectionMonitor(); + if(connectionMonitor) + { + connectionMonitor->remove(this); + } } } |