summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2003-01-17 03:58:50 +0000
committerMarc Laukien <marc@zeroc.com>2003-01-17 03:58:50 +0000
commit8880977a608f0b735504f18a624de026cde62da7 (patch)
treea8fe5b082caf761337a165e1350a5f65f57bc95d /cpp/src/Ice/Connection.cpp
parentfixed a bug with findObjectAdapter (diff)
downloadice-8880977a608f0b735504f18a624de026cde62da7.tar.bz2
ice-8880977a608f0b735504f18a624de026cde62da7.tar.xz
ice-8880977a608f0b735504f18a624de026cde62da7.zip
ConnectionMonitor
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp128
1 files changed, 126 insertions, 2 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index d35c4177995..916d2a5e24f 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -20,6 +20,7 @@
#include <Ice/DefaultsAndOverrides.h>
#include <Ice/Transceiver.h>
#include <Ice/ThreadPool.h>
+#include <Ice/ConnectionMonitor.h>
#include <Ice/ObjectAdapter.h>
#include <Ice/Endpoint.h>
#include <Ice/Outgoing.h>
@@ -110,6 +111,51 @@ IceInternal::Connection::waitUntilFinished() const
}
void
+IceInternal::Connection::monitor()
+{
+ try
+ {
+ IceUtil::Monitor<IceUtil::RecMutex>::TryLock sync(*this);
+
+ if(_state != StateActive)
+ {
+ return;
+ }
+
+ //
+ // Check for timed out async requests.
+ //
+ for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
+ {
+ if(p->second->__timedOut())
+ {
+ setState(StateClosed, TimeoutException(__FILE__, __LINE__));
+ return;
+ }
+ }
+
+ //
+ // Active connection management for idle connections.
+ //
+ if(_acmTimeout > 0)
+ {
+ if(_requests.empty() && _asyncRequests.empty() && _batchStream.b.empty() && _dispatchCount == 0)
+ {
+ if(IceUtil::Time::now() >= _acmAbsoluteTimeout)
+ {
+ setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
+ return;
+ }
+ }
+ }
+ }
+ catch(const IceUtil::ThreadLockedException&)
+ {
+ // Ignore.
+ }
+}
+
+void
IceInternal::Connection::validate()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
@@ -185,6 +231,21 @@ IceInternal::Connection::validate()
// We only print warnings after successful connection validation.
//
_warn = _instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0;
+
+ //
+ // We only use active connection management after successful
+ // connection validation. We don't use active connection
+ // management for datagram connections at all, because such
+ // "virtual connections" cannot be reestablished.
+ //
+ if(!_endpoint->datagram())
+ {
+ _acmTimeout = _instance->properties()->getPropertyAsInt("Ice.ConnectionIdleTime");
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+ }
}
void
@@ -320,6 +381,11 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool compress)
{
_requestsHint = _requests.insert(_requests.end(), make_pair(requestId, out));
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
void
@@ -414,6 +480,11 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp
// Only add to the request map if there was no exception.
//
_asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), make_pair(requestId, out));
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
void
@@ -560,6 +631,11 @@ IceInternal::Connection::flushBatchRequest(bool compress)
assert(_exception.get());
_exception->ice_throw();
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
void
@@ -639,6 +715,11 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool compress)
{
setState(StateClosed, ex);
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
void
@@ -727,6 +808,12 @@ void
IceInternal::Connection::read(BasicStream& stream)
{
_transceiver->read(stream, 0);
+
+ //
+ // Updating _acmAbsoluteTimeout is to expensive here, because we
+ // would have to acquire a lock just for this purpose. Instead, we
+ // update _acmAbsoluteTimeout in message().
+ //
}
void
@@ -748,6 +835,11 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
IceUtil::ThreadControl::yield();
return;
}
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
Byte messageType;
@@ -1088,6 +1180,24 @@ IceInternal::Connection::toString() const
return _transceiver->toString();
}
+bool
+IceInternal::Connection::operator==(const Connection& r) const
+{
+ return this == &r;
+}
+
+bool
+IceInternal::Connection::operator!=(const Connection& r) const
+{
+ return this != &r;
+}
+
+bool
+IceInternal::Connection::operator<(const Connection& r) const
+{
+ return this < &r;
+}
+
IceInternal::Connection::Connection(const InstancePtr& instance,
const TransceiverPtr& transceiver,
const EndpointPtr& endpoint,
@@ -1101,6 +1211,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_defaultsAndOverrides(_instance->defaultsAndOverrides()),
_registeredWithPool(false),
_warn(false),
+ _acmTimeout(0),
_requestHdr(headerSize + 4, 0),
_requestBatchHdr(headerSize + 4, 0),
_replyHdr(headerSize, 0),
@@ -1155,6 +1266,7 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
// Don't warn about certain expected exceptions.
//
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
(dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
@@ -1165,14 +1277,14 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
}
}
- for(std::map< ::Ice::Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
p->second->finished(*_exception.get());
}
_requests.clear();
_requestsHint = _requests.end();
- for(std::map< ::Ice::Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
q->second->__finished(*_exception.get());
}
@@ -1317,6 +1429,12 @@ IceInternal::Connection::registerWithPool()
}
_registeredWithPool = true;
+
+ ConnectionMonitorPtr connectionMonitor = _instance->connectionMonitor();
+ if(connectionMonitor)
+ {
+ connectionMonitor->add(this);
+ }
}
}
@@ -1337,6 +1455,12 @@ IceInternal::Connection::unregisterWithPool()
}
_registeredWithPool = false;
+
+ ConnectionMonitorPtr connectionMonitor = _instance->connectionMonitor();
+ if(connectionMonitor)
+ {
+ connectionMonitor->remove(this);
+ }
}
}