summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-02-17 15:18:50 +0000
committerMarc Laukien <marc@zeroc.com>2004-02-17 15:18:50 +0000
commit188cf85f56f900ee7eef607e3d4d4dcd271a59d3 (patch)
treed1cde4fff235f3e0269eeaa5a4d6026b2f3c4204 /cpp/src/Ice/Connection.cpp
parentmore ami (diff)
downloadice-188cf85f56f900ee7eef607e3d4d4dcd271a59d3.tar.bz2
ice-188cf85f56f900ee7eef607e3d4d4dcd271a59d3.tar.xz
ice-188cf85f56f900ee7eef607e3d4d4dcd271a59d3.zip
more ami
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp260
1 files changed, 122 insertions, 138 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 78e29294a75..7fc15aa0cd9 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -58,7 +58,6 @@ IceInternal::Connection::validate()
if(_adapter)
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
- assert(_transceiver); // The transceiver cannot be closed already.
//
// Incoming connections play the active role with respect to
@@ -290,8 +289,8 @@ IceInternal::Connection::waitUntilFinished()
}
//
- // Now we must wait for connection closure. If there is a timeout,
- // we force the connection closure.
+ // Now we must wait until close() has been called on the
+ // transceiver.
//
while(_transceiver)
{
@@ -321,8 +320,8 @@ IceInternal::Connection::waitUntilFinished()
}
//
- // No return here, we must still wait until _transceiver
- // becomes null.
+ // No return here, we must still wait until close() is
+ // called on the _transceiver.
//
}
else
@@ -337,8 +336,8 @@ IceInternal::Connection::waitUntilFinished()
void
IceInternal::Connection::monitor()
{
-
IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
+
if(!sync.acquired())
{
return;
@@ -364,10 +363,7 @@ IceInternal::Connection::monitor()
//
// Active connection management for idle connections.
//
- // TODO: Hack: ACM for incoming connections doesn't work right
- // with AMI.
- //
- if(_acmTimeout > 0 && closingOK() && !_adapter)
+ if(_acmTimeout > 0 && closingOK())
{
if(IceUtil::Time::now() >= _acmAbsoluteTimeout)
{
@@ -421,14 +417,38 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
-
- requestId = _nextRequestId++;
- if(requestId <= 0)
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out)
{
- _nextRequestId = 1;
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+
+ //
+ // Add to the requests map.
+ //
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
}
-
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
@@ -445,19 +465,6 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out)
_exception->ice_throw();
}
- //
- // Fill in the request ID.
- //
- if(!_endpoint->datagram() && out)
- {
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
- }
-
bool compress;
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
@@ -515,39 +522,42 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
- _exception->ice_throw();
- }
- //
- // Only add to the request map if this was a twoway call, and if
- // there was no exception above.
- //
- if(out)
- {
- auto_ptr<LocalException> exception;
-
+ if(out)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
//
- // If there is already an exception set, we call
- // finished() directly (below, outside the thread
- // synchronization), because it's possible that finished()
- // has already been called on the request map.
+ // If the request has already been removed from the
+ // request map, we are out of luck. It would mean that
+ // finished() has been called already, and therefore the
+ // exception has been set using the Outgoing::finished()
+ // callback. In this case, we cannot throw the exception
+ // here, because we must not both raise an exception and
+ // have Outgoing::finished() called with an
+ // exception. This means that in some rare cases, a
+ // request will not be retried even though it could. But I
+ // honestly don't know how I could avoid this, without a
+ // very elaborate and complex design, which would be bad
+ // for performance.
//
- if(_exception.get())
- {
- exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone()));
- }
- else
+ map<Int, Outgoing*>::iterator p = _requests.find(requestId);
+ if(p != _requests.end())
{
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+
+ _exception->ice_throw();
}
}
-
- if(exception.get())
+ else
{
- out->finished(*exception.get());
+ _exception->ice_throw();
}
}
}
@@ -570,6 +580,9 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
if(requestId <= 0)
{
@@ -577,6 +590,22 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
requestId = _nextRequestId++;
}
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+
+ //
+ // Add to the async requests map.
+ //
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
+ pair<const Int, OutgoingAsyncPtr>(requestId, out));
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
@@ -593,16 +622,6 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
_exception->ice_throw();
}
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
-
bool compress;
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
@@ -640,7 +659,7 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
// No compression, just fill in the message size.
//
Int sz = static_cast<Int>(os->b.size());
- p = reinterpret_cast<const Byte*>(&sz);
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
@@ -660,38 +679,37 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
- _exception->ice_throw();
- }
- //
- // Only add to the request map if there was no exception above.
- //
- auto_ptr<LocalException> exception;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
//
- // If there is already an exception set, we call __finished()
- // directly (below, outside the thread synchronization),
- // because it's possible that __finished() has already been
- // called on the request map.
+ // If the request has already been removed from the async
+ // request map, we are out of luck. It would mean that
+ // finished() has been called already, and therefore the
+ // exception has been set using the
+ // OutgoingAsync::__finished() callback. In this case, we
+ // cannot throw the exception here, because we must not both
+ // raise an exception and have OutgoingAsync::__finished()
+ // called with an exception. This means that in some rare
+ // cases, a request will not be retried even though it
+ // could. But I honestly don't know how I could avoid this,
+ // without a very elaborate and complex design, which would be
+ // bad for performance.
//
- if(_exception.get())
- {
- exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone()));
- }
- else
+ map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.find(requestId);
+ if(p != _asyncRequests.end())
{
- _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
- pair<const Int, OutgoingAsyncPtr>(requestId, out));
+ if(p == _asyncRequestsHint)
+ {
+ _asyncRequests.erase(p++);
+ _asyncRequestsHint = p;
+ }
+ else
+ {
+ _asyncRequests.erase(p);
+ }
+
+ _exception->ice_throw();
}
}
-
- if(exception.get())
- {
- out->__finished(*exception.get());
- }
}
void
@@ -866,7 +884,7 @@ IceInternal::Connection::flushBatchRequest()
// No compression, just fill in the message size.
//
Int sz = static_cast<Int>(_batchStream.b.size());
- p = reinterpret_cast<const Byte*>(&sz);
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
#else
@@ -1174,8 +1192,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
if(_warn)
{
Warning out(_logger);
- out << "ignoring close connection message for datagram connection:\n"
- << _transceiver->toString();
+ out << "ignoring close connection message for datagram connection:\n" << _desc;
}
}
else
@@ -1347,8 +1364,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
if(_warn)
{
Warning out(_logger);
- out << "ignoring unexpected validate connection message:\n"
- << _transceiver->toString();
+ out << "ignoring unexpected validate connection message:\n" << _desc;
}
break;
}
@@ -1440,7 +1456,7 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
threadPool->promoteFollower();
- auto_ptr<LocalException> closeException;
+ auto_ptr<LocalException> exception;
map<Int, Outgoing*> requests;
map<Int, OutgoingAsyncPtr> asyncRequests;
@@ -1466,13 +1482,11 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
}
catch(const LocalException& ex)
{
- closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+ exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
}
assert(_transceiver);
_transceiver = 0;
- assert(_threadPool);
- _threadPool = 0;
notifyAll();
}
@@ -1488,17 +1502,17 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
{
- p->second->finished(*_exception.get()); // Exception is immutable at this point.
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
}
for(map<Int, OutgoingAsyncPtr>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
{
- q->second->__finished(*_exception.get()); // Exception is immutable at this point.
+ q->second->__finished(*_exception.get()); // The exception is immutable at this point.
}
- if(closeException.get())
+ if(exception.get())
{
- closeException->ice_throw();
+ exception->ice_throw();
}
}
@@ -1512,7 +1526,7 @@ IceInternal::Connection::exception(const LocalException& ex)
string
IceInternal::Connection::toString() const
{
- return _transceiverToString;
+ return _desc;
}
bool
@@ -1539,7 +1553,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
const ObjectAdapterPtr& adapter) :
EventHandler(instance),
_transceiver(transceiver),
- _transceiverToString(transceiver->toString()),
+ _desc(transceiver->toString()),
_endpoint(endpoint),
_adapter(adapter),
_logger(_instance->logger()), // Cached for better performance.
@@ -1563,12 +1577,12 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
{
if(_adapter)
{
- _threadPool = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool();
+ const_cast<ThreadPoolPtr&>(_threadPool) = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool();
_servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager();
}
else
{
- _threadPool = _instance->clientThreadPool();
+ const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
_servantManager = 0;
}
@@ -1613,7 +1627,6 @@ IceInternal::Connection::~Connection()
{
assert(_state == StateClosed);
assert(!_transceiver);
- assert(!_threadPool);
assert(_dispatchCount == 0);
assert(_proxyCount == 0);
}
@@ -1653,7 +1666,7 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
(dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
{
Warning out(_logger);
- out << "connection exception:\n" << *_exception.get() << '\n' << _transceiver->toString();
+ out << "connection exception:\n" << *_exception.get() << '\n' << _desc;
}
}
}
@@ -1664,30 +1677,6 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
// that is not yet marked as closed or closing.
//
setState(state);
-
- //
- // We can't call __finished() on async requests here, because
- // it's possible that the callback will trigger another call,
- // which then might block on this connection. Calling
- // finished() at this place works for non-async calls, but in
- // order to keep the logic of sending exceptions to requests
- // in one place, we don't do this here either. Instead, we
- // move all the code into the finished() operation.
- /*
- for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- p->second->finished(*_exception.get());
- }
- _requests.clear();
- _requestsHint = _requests.end();
-
- for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
- {
- q->second->__finished(*_exception.get());
- }
- _asyncRequests.clear();
- _asyncRequestsHint = _asyncRequests.end();
- */
}
void
@@ -1785,8 +1774,6 @@ IceInternal::Connection::setState(State state)
assert(_transceiver);
_transceiver = 0;
- assert(_threadPool);
- _threadPool = 0;
//notifyAll(); // We notify already below.
}
else
@@ -1825,7 +1812,6 @@ IceInternal::Connection::initiateShutdown() const
if(!_endpoint->datagram())
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
- assert(_transceiver); // The transceiver cannot be closed already.
//
// Before we shut down, we send a close connection message.
@@ -1855,7 +1841,6 @@ IceInternal::Connection::registerWithPool()
{
if(!_registeredWithPool)
{
- assert(_threadPool);
_threadPool->_register(_transceiver->fd(), this);
_registeredWithPool = true;
@@ -1872,7 +1857,6 @@ IceInternal::Connection::unregisterWithPool()
{
if(_registeredWithPool)
{
- assert(_threadPool);
_threadPool->unregister(_transceiver->fd());
_registeredWithPool = false;