summaryrefslogtreecommitdiff
path: root/cppe/src/IceE/Connection.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-02-24 12:45:59 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-02-24 12:45:59 +0000
commit9d229c0dc974a718a6ccd84651afbbd44dacce01 (patch)
tree951361ad4842a5f597b3ba65c7aed5a64cdb2fe6 /cppe/src/IceE/Connection.cpp
parentFixed typo in Config.h wrt define to use select (diff)
downloadice-9d229c0dc974a718a6ccd84651afbbd44dacce01.tar.bz2
ice-9d229c0dc974a718a6ccd84651afbbd44dacce01.tar.xz
ice-9d229c0dc974a718a6ccd84651afbbd44dacce01.zip
Refactored Connection class.
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-xcppe/src/IceE/Connection.cpp1332
1 files changed, 574 insertions, 758 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index 7e493dbfbda..fd34a1308eb 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -22,6 +22,7 @@
#include <IceE/ReferenceFactory.h> // For createProxy().
#include <IceE/ProxyFactory.h> // For createProxy().
#include <IceE/BasicStream.h>
+#include <IceE/TraceLevels.h>
#ifndef ICEE_PURE_CLIENT
# include <IceE/Incoming.h>
@@ -69,7 +70,6 @@ Ice::Connection::waitForValidation()
}
}
-#ifndef ICEE_PURE_CLIENT
void
Ice::Connection::activate()
{
@@ -77,6 +77,7 @@ Ice::Connection::activate()
setState(StateActive);
}
+#ifndef ICEE_PURE_CLIENT
void
Ice::Connection::hold()
{
@@ -134,6 +135,11 @@ Ice::Connection::close(bool force)
#endif
setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+
+ //
+ // TODO: If blocking model we should call readStream() to wait for
+ // the connection closure from the server?
+ //
}
}
@@ -292,302 +298,209 @@ Ice::Connection::waitUntilFinished()
#endif
}
-
-Int
-Ice::Connection::fillRequestId(BasicStream* os)
+void
+Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
{
- //
- // Create a new unique request ID.
- //
- Int requestId = _nextRequestId++;
- if(requestId <= 0)
+ bool requestSent = false;
+ try
{
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
+ Lock sendSync(_sendMonitor);
+ if(!_transceiver)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
- //
- // Fill in the request ID.
- //
- Byte* dest = &(os->b[0]) + headerSize;
+ Int requestId;
+ if(out)
+ {
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ Byte* dest = &(os->b[0]) + headerSize;
#ifdef ICE_BIG_ENDIAN
- const Byte* src = reinterpret_cast<const Byte*>(&requestId) + sizeof(Ice::Int) - 1;
- *dest++ = *src--;
- *dest++ = *src--;
- *dest++ = *src--;
- *dest = *src;
+ const Byte* src = reinterpret_cast<const Byte*>(&requestId) + sizeof(Ice::Int) - 1;
+ *dest++ = *src--;
+ *dest++ = *src--;
+ *dest++ = *src--;
+ *dest = *src;
#else
- const Byte* src = reinterpret_cast<const Byte*>(&requestId);
- *dest++ = *src++;
- *dest++ = *src++;
- *dest++ = *src++;
- *dest = *src;
+ const Byte* src = reinterpret_cast<const Byte*>(&requestId);
+ *dest++ = *src++;
+ *dest++ = *src++;
+ *dest++ = *src++;
+ *dest = *src;
#endif
- return requestId;
-}
-
-void
-Ice::Connection::sendRequest(BasicStream* os)
-{
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
+#ifndef ICEE_PURE_BLOCKING_CLIENT
+#ifdef ICEE_BLOCKING_CLIENT
+ if(!_blocking)
+ {
+#endif
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+#ifdef ICEE_BLOCKING_CLIENT
+ }
+#endif
+#endif
+ }
- const Int sz = static_cast<Int>(os->b.size());
- Byte* dest = &(os->b[0]) + 10;
+ const Int sz = static_cast<Int>(os->b.size());
+ Byte* dest = &(os->b[0]) + 10;
#ifdef ICE_BIG_ENDIAN
- const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1;
- *dest++ = *src--;
- *dest++ = *src--;
- *dest++ = *src--;
- *dest = *src;
+ const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1;
+ *dest++ = *src--;
+ *dest++ = *src--;
+ *dest++ = *src--;
+ *dest = *src;
#else
- const Byte* src = reinterpret_cast<const Byte*>(&sz);
- *dest++ = *src++;
- *dest++ = *src++;
- *dest++ = *src++;
- *dest = *src;
+ const Byte* src = reinterpret_cast<const Byte*>(&sz);
+ *dest++ = *src++;
+ *dest++ = *src++;
+ *dest++ = *src++;
+ *dest = *src;
#endif
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending request", *os, _logger, _traceLevels);
- _transceiver->write(*os);
-}
-
-#ifdef ICEE_BLOCKING_CLIENT
-
-void
-Ice::Connection::sendBlockingRequest(BasicStream* os, Outgoing* out)
-{
- Int requestId;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_exception.get())
- {
- _exception->ice_throw();
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Fill in request id if it is a twoway call.
- //
- if(out)
- {
- requestId = fillRequestId(os);
- }
- }
-
- try
- {
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
- sendRequest(os);
-
- if(out)
- {
- os->reset();
- readStream(*os);
- }
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ if(_traceLevels->protocol >= 1)
+ {
+ traceRequest("sending request", *os, _logger, _traceLevels);
}
+ _transceiver->write(*os);
+ requestSent = true;
- if(out)
+ if(!out)
+ {
+ return;
+ }
+
+#ifdef ICEE_BLOCKING_CLIENT
+#ifndef ICEE_PURE_BLOCKING_CLIENT
+ if(_blocking)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_state != StateClosed)
- {
-#ifndef ICEE_PURE_CLIENT
- Int invokeNum = 0;
- parseMessage(*os, requestId, invokeNum);
-#else
- parseMessage(*os, requestId);
#endif
- }
-
//
- // parseMessage() can close the connection, so we must
- // check for closed state again.
+ // Re-use the stream for reading the reply.
//
- if(_state == StateClosed)
+ os->reset();
+
+ Int receivedRequestId = 0;
+#ifndef ICEE_PURE_CLIENT
+ Int invokeNum = 0;
+ readStreamAndParseMessage(*os, receivedRequestId, invokeNum);
+ if(invokeNum > 0)
{
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- }
-
- _transceiver = 0;
- _exception->ice_throw();
+ throw UnknownMessageException(__FILE__, __LINE__);
+ }
+ else if(requestId != receivedRequestId)
+ {
+ throw UnknownRequestIdException(__FILE__, __LINE__);
+ }
+#else
+ readStreamAndParseMessage(*os, receivedRequestId);
+ if(requestId != receivedRequestId)
+ {
+ throw UnknownRequestIdException(__FILE__, __LINE__);
}
- }
- }
- catch(const LocalException& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
-}
-
#endif
-
+ out->finished(*os);
#ifndef ICEE_PURE_BLOCKING_CLIENT
-
-void
-Ice::Connection::sendRequest(BasicStream* os, Outgoing* out)
-{
- Int requestId;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_exception.get())
- {
- _exception->ice_throw();
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Only add to the request map if this is a twoway call.
- //
- if(out)
- {
- requestId = fillRequestId(os);
-
- //
- // Add to the requests map.
- //
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
- }
- }
-
- bool timedOut = false;
- try
- {
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
- sendRequest(os);
}
-
- if(out)
+ else
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Wait until the request has completed, or until the
- // request times out.
- //
- Int tout = timeout();
+#endif
+#endif
+#ifndef ICEE_PURE_BLOCKING_CLIENT
+ //
+ // Wait until the request has completed, or until the request times out.
+ //
+ Int tout = timeout();
IceUtil::Time expireTime;
if(tout > 0)
{
- expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout);
+ expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout);
}
- while(out->state() == Outgoing::StateInProgress && !timedOut)
- {
- if(tout > 0)
- {
+
+ while(out->state() == Outgoing::StateInProgress)
+ {
+ if(tout > 0)
+ {
IceUtil::Time now = IceUtil::Time::now();
if(now < expireTime)
{
- timedWait(expireTime - now);
+ _sendMonitor.timedWait(expireTime - now);
}
-
- //
+
+ //
// Make sure we woke up because of timeout and not another response.
//
- if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime)
- {
- timedOut = true;
- }
- }
- else
- {
- wait();
- }
- }
+ if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime)
+ {
+ break;
+ }
+ }
+ else
+ {
+ _sendMonitor.wait();
+ }
+ }
+
+ //
+ // If the outgoing is still not finished, there was a timeout
+ // so we close the connection and wait until the outgoing gets
+ // notified of the connection closure.
+ //
+ if(out->state() == Outgoing::StateInProgress)
+ {
+ setState(StateClosed, TimeoutException(__FILE__, __LINE__));
+ while(out->state() == Outgoing::StateInProgress)
+ {
+ _sendMonitor.wait();
+ }
+ }
+#endif
+#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
}
+#endif
}
catch(const LocalException& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
-
- if(out)
+ if(requestSent)
{
//
- // If the request has already been removed from the
- // request map, we are out of luck. It would mean that
- // finished() has been called already, and therefore the
- // exception has been set using the Outgoing::finished()
- // callback. In this case, we cannot throw the exception
- // here, because we must not both raise an exception and
- // have Outgoing::finished() called with an
- // exception. This means that in some rare cases, a
- // request will not be retried even though it could. But I
- // honestly don't know how I could avoid this, without a
- // very elaborate and complex design, which would be bad
- // for performance.
+ // If the request has been sent we don't throw but instead
+ // notify the outgoing of the connection. Throwing
+ // directly would cause the client to retry and would
+ // violate the "at-most-once" semantics.
//
- map<Int, Outgoing*>::iterator p = _requests.find(requestId);
- if(p != _requests.end())
- {
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
-
- _exception->ice_throw();
- }
+ out->finished(*_exception.get());
}
else
{
+ //
+ // The request wasn't sent, we can safely retry the invocation
+ // without violating "at-most-once".
+ //
_exception->ice_throw();
}
}
-
- if(timedOut)
- {
- //
- // Must be called outside the synchronization of this
- // object.
- //
- exception(TimeoutException(__FILE__, __LINE__));
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We must wait until the exception has propagted
- // back to the Outgoing object.
- //
- while(out->state() == Outgoing::StateInProgress)
- {
- wait();
- }
- }
}
-#endif
-
#ifdef ICEE_HAS_BATCH
void
@@ -711,7 +624,7 @@ Ice::Connection::flushBatchRequests()
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ Lock sendSync(_sendMonitor);
if(!_transceiver) // Has the transceiver already been closed?
{
@@ -757,7 +670,10 @@ Ice::Connection::flushBatchRequests()
// Send the batch request.
//
_batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ if(_traceLevels->protocol >= 1)
+ {
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ }
_transceiver->write(_batchStream);
}
catch(const LocalException& ex)
@@ -796,7 +712,7 @@ Ice::Connection::sendResponse(BasicStream* os)
{
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ Lock sendSync(_sendMonitor);
if(!_transceiver) // Has the transceiver already been closed?
{
@@ -825,7 +741,10 @@ Ice::Connection::sendResponse(BasicStream* os)
// Send the reply.
//
os->i = os->b.begin();
- traceReply("sending reply", *os, _logger, _traceLevels);
+ if(_traceLevels->protocol >= 1)
+ {
+ traceReply("sending reply", *os, _logger, _traceLevels);
+ }
_transceiver->write(*os);
}
catch(const LocalException& ex)
@@ -841,6 +760,7 @@ Ice::Connection::sendResponse(BasicStream* os)
try
{
+ assert(_dispatchCount > 0);
if(--_dispatchCount == 0)
{
notifyAll();
@@ -867,6 +787,7 @@ Ice::Connection::sendNoResponse()
try
{
+ assert(_dispatchCount > 0);
if(--_dispatchCount == 0)
{
notifyAll();
@@ -891,14 +812,6 @@ Ice::Connection::endpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
-#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
-bool
-Ice::Connection::blocking() const
-{
- return _blocking;
-}
-#endif
-
#ifndef ICEE_PURE_CLIENT
void
@@ -937,8 +850,6 @@ Ice::Connection::getAdapter() const
return _in.getAdapter();
}
-#endif
-
ObjectPrx
Ice::Connection::createProxy(const Identity& ident) const
{
@@ -953,12 +864,7 @@ Ice::Connection::createProxy(const Identity& ident) const
return _instance->proxyFactory()->referenceToProxy(ref);
}
-void
-Ice::Connection::exception(const LocalException& ex)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
-}
+#endif
string
Ice::Connection::type() const
@@ -984,40 +890,40 @@ Ice::Connection::Connection(const InstancePtr& instance,
const EndpointPtr& endpoint,
const ObjectAdapterPtr& adapter) :
#else
-Ice::Connection::Connection(const InstancePtr& instance,
- const TransceiverPtr& transceiver,
- const EndpointPtr& endpoint) :
+ Ice::Connection::Connection(const InstancePtr& instance,
+ const TransceiverPtr& transceiver,
+ const EndpointPtr& endpoint) :
#endif
- _instance(instance),
- _transceiver(transceiver),
- _desc(transceiver->toString()),
- _type(transceiver->type()),
- _endpoint(endpoint),
- _logger(_instance->logger()), // Cached for better performance.
- _traceLevels(_instance->traceLevels()), // Cached for better performance.
- _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0),
- _requestHdr(headerSize + sizeof(Int), 0),
+ _instance(instance),
+ _transceiver(transceiver),
+ _desc(transceiver->toString()),
+ _type(transceiver->type()),
+ _endpoint(endpoint),
+ _logger(_instance->logger()), // Cached for better performance.
+ _traceLevels(_instance->traceLevels()), // Cached for better performance.
+ _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0),
+ _requestHdr(headerSize + sizeof(Int), 0),
#ifndef ICEE_PURE_CLIENT
- _replyHdr(headerSize, 0),
+ _replyHdr(headerSize, 0),
+ _in(_instance.get(), this, _stream, adapter),
#endif
#ifndef ICEE_PURE_BLOCKING_CLIENT
- _nextRequestId(1),
- _requestsHint(_requests.end()),
- _stream(_instance.get(), _instance->messageSizeMax()),
-#endif
-#ifndef ICEE_PURE_CLIENT
- _in(_instance.get(), this, _stream, adapter),
+ _stream(_instance.get(), _instance->messageSizeMax()),
#endif
#ifdef ICEE_HAS_BATCH
- _requestBatchHdr(headerSize + sizeof(Int), 0),
- _batchStream(_instance.get(), _instance->messageSizeMax()),
- _batchStreamInUse(false),
- _batchRequestNum(0),
+ _requestBatchHdr(headerSize + sizeof(Int), 0),
+ _batchStream(_instance.get(), _instance->messageSizeMax()),
+ _batchStreamInUse(false),
+ _batchRequestNum(0),
+#endif
+ _dispatchCount(0),
+ _state(StateNotValidated),
+ _stateTime(IceUtil::Time::now()),
+ _nextRequestId(1)
+#ifndef ICEE_PURE_BLOCKING_CLIENT
+ , _requestsHint(_requests.end())
#endif
- _dispatchCount(0),
- _state(StateNotValidated),
- _stateTime(IceUtil::Time::now())
{
#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT)
# ifdef ICEE_PURE_CLIENT
@@ -1131,8 +1037,8 @@ Ice::Connection::~Connection()
assert(_state == StateClosed);
assert(!_transceiver);
-#ifndef ICEE_PURE_BLOCKING_CLIENT
assert(_dispatchCount == 0);
+#ifndef ICEE_PURE_BLOCKING_CLIENT
assert(!_threadPerConnection);
#endif
}
@@ -1196,7 +1102,10 @@ Ice::Connection::validate()
os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
os.write(headerSize); // Message size.
os.i = os.b.begin();
- traceHeader("sending validate connection", os, _logger, _traceLevels);
+ if(_traceLevels->protocol >= 1)
+ {
+ traceHeader("sending validate connection", os, _logger, _traceLevels);
+ }
try
{
_transceiver->writeWithTimeout(os, timeout);
@@ -1273,7 +1182,10 @@ Ice::Connection::validate()
{
throw IllegalMessageSizeException(__FILE__, __LINE__);
}
- traceHeader("received validate connection", is, _logger, _traceLevels);
+ if(_traceLevels->protocol >= 1)
+ {
+ traceHeader("received validate connection", is, _logger, _traceLevels);
+ }
}
}
catch(const LocalException& ex)
@@ -1285,23 +1197,9 @@ Ice::Connection::validate()
}
#ifdef ICEE_PURE_CLIENT
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We start out in active state.
- //
- setState(StateActive);
- }
+ activate();
#else
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- }
+ hold();
#endif
}
@@ -1371,70 +1269,91 @@ Ice::Connection::setState(State state)
switch(state)
{
- case StateNotValidated:
- {
- assert(false);
- break;
- }
+ case StateNotValidated:
+ {
+ assert(false);
+ break;
+ }
- case StateActive:
- {
- //
- // Can only switch from holding or not validated to
- // active.
- //
+ case StateActive:
+ {
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
#ifdef ICEE_PURE_CLIENT
- if(_state != StateNotValidated)
- {
- return;
- }
+ if(_state != StateNotValidated)
+ {
+ return;
+ }
#else
- if(_state != StateHolding && _state != StateNotValidated)
- {
- return;
- }
-#endif
- break;
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
}
+#endif
+ break;
+ }
#ifndef ICEE_PURE_CLIENT
- case StateHolding:
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
{
- //
- // Can only switch from active or not validated to
- // holding.
- //
- if(_state != StateActive && _state != StateNotValidated)
- {
- return;
- }
- break;
+ return;
}
+ break;
+ }
#endif
- case StateClosing:
+ case StateClosing:
+ {
+ //
+ // Can't change back from closed.
+ //
+ if(_state == StateClosed)
{
- //
- // Can't change back from closed.
- //
- if(_state == StateClosed)
- {
- return;
- }
- break;
+ return;
}
+ break;
+ }
- case StateClosed:
+ case StateClosed:
+ {
+ //
+ // We shutdown both for reading and writing. This will
+ // unblock and read call with an exception. The thread
+ // per connection then closes the transceiver.
+ //
+ _transceiver->shutdownReadWrite();
+
+ //
+ // In blocking mode, we close the transceiver now.
+ //
+#ifdef ICEE_BLOCKING_CLIENT
+# ifndef ICEE_PURE_BLOCKING_CLIENT
+ if(_blocking)
{
- //
- // If we are in thread per connection mode, we
- // shutdown both for reading and writing. This will
- // unblock and read call with an exception. The thread
- // per connection then closes the transceiver.
- //
- _transceiver->shutdownReadWrite();
- break;
+# endif
+ Lock sync(_sendMonitor);
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ _transceiver = 0;
+# ifndef ICEE_PURE_BLOCKING_CLIENT
}
+# endif
+#endif
+ break;
+ }
}
_state = state;
@@ -1447,30 +1366,20 @@ Ice::Connection::setState(State state)
try
{
initiateShutdown();
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- }
#ifdef ICEE_BLOCKING_CLIENT
- if(_state != StateClosed
# ifndef ICEE_PURE_BLOCKING_CLIENT
- && _blocking
+ if(_blocking)
# endif
- )
- {
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
{
+ setState(StateClosed);
}
- _transceiver = 0;
- _state = StateClosed;
- }
#endif
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
}
}
@@ -1480,7 +1389,7 @@ Ice::Connection::initiateShutdown() const
assert(_state == StateClosing);
assert(_dispatchCount == 0);
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ Lock sendSync(_sendMonitor);
//
// Before we shut down, we send a close connection message.
@@ -1502,7 +1411,10 @@ Ice::Connection::initiateShutdown() const
// Send the message.
//
os.i = os.b.begin();
- traceHeader("sending close connection", os, _logger, _traceLevels);
+ if(_traceLevels->protocol >= 1)
+ {
+ traceHeader("sending close connection", os, _logger, _traceLevels);
+ }
_transceiver->write(os);
//
@@ -1517,361 +1429,151 @@ Ice::Connection::initiateShutdown() const
}
void
-Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
#ifndef ICEE_PURE_CLIENT
- ,Int& invokeNum
+Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int& requestId, Int& invokeNum)
+#else
+Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int& requestId)
#endif
-)
{
- assert(_state > StateNotValidated && _state < StateClosed);
+ stream.b.resize(headerSize);
+ stream.i = stream.b.begin();
+ _transceiver->read(stream);
- try
+ ptrdiff_t pos = stream.i - stream.b.begin();
+ assert(pos >= headerSize);
+ stream.i = stream.b.begin();
+ const Ice::Byte* header;
+ stream.readBlob(header, headerSize);
+ if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3])
{
- //
- // We don't need to check magic and version here. This has
- // already been done by the ThreadPerConnection,
- // which provides us with the stream.
- //
- assert(stream.i == stream.b.end());
- stream.i = stream.b.begin() + 8;
- Byte messageType;
- stream.read(messageType);
-
- Byte compress;
- stream.read(compress);
- if(compress == 2)
- {
- FeatureNotSupportedException ex(__FILE__, __LINE__);
- ex.unsupportedFeature = "compression";
- throw ex;
- }
- stream.i = stream.b.begin() + headerSize;
-
-#ifdef ICEE_BLOCKING_CLIENT
-# ifndef ICEE_PURE_BLOCKING_CLIENT
- if(_blocking)
- {
-# endif
- switch(messageType)
- {
- case closeConnectionMsg:
- {
- traceHeader("received close connection", stream, _logger, _traceLevels);
- setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
- break;
- }
-
- case replyMsg:
- {
- traceReply("received reply", stream, _logger, _traceLevels);
-
- Int reqId;
- stream.read(reqId);
- if(reqId != requestId)
- {
- throw UnknownRequestIdException(__FILE__, __LINE__);
- }
- break;
- }
-
-
- default:
- {
- traceHeader("received unexpected message\n"
- "(invalid, closing connection)",
- stream, _logger, _traceLevels);
- throw UnknownMessageException(__FILE__, __LINE__);
- break;
- }
- }
-# ifndef ICEE_PURE_BLOCKING_CLIENT
- }
- else
-# endif
-#endif
-#ifndef ICEE_PURE_BLOCKING_CLIENT
- {
- switch(messageType)
- {
- case closeConnectionMsg:
- {
- traceHeader("received close connection", stream, _logger, _traceLevels);
- setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
- break;
- }
-
-#ifndef ICEE_PURE_CLIENT
- case requestMsg:
- {
- if(_state == StateClosing)
- {
- traceRequest("received request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- traceRequest("received request", stream, _logger, _traceLevels);
- stream.read(requestId);
- invokeNum = 1;
- ++_dispatchCount;
- }
- break;
- }
-
- case requestBatchMsg:
- {
- if(_state == StateClosing)
- {
- traceBatchRequest("received batch request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- traceBatchRequest("received batch request", stream, _logger, _traceLevels);
- stream.read(invokeNum);
- if(invokeNum < 0)
- {
- invokeNum = 0;
- throw NegativeSizeException(__FILE__, __LINE__);
- }
- _dispatchCount += invokeNum;
- }
- break;
- }
-#endif
-
- case replyMsg:
- {
- traceReply("received reply", stream, _logger, _traceLevels);
-
- stream.read(requestId);
-
- map<Int, Outgoing*>::iterator p = _requests.end();
-
- if(_requestsHint != _requests.end())
- {
- if(_requestsHint->first == requestId)
- {
- p = _requestsHint;
- }
- }
-
- if(p == _requests.end())
- {
- p = _requests.find(requestId);
- }
-
- if(p == _requests.end())
- {
- throw UnknownRequestIdException(__FILE__, __LINE__);
- }
-
- if(p != _requests.end())
- {
- p->second->finished(stream);
-
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
- notifyAll(); // Wake up threads waiting in sendRequest()
- }
-
- break;
- }
-
- case validateConnectionMsg:
- {
- traceHeader("received validate connection", stream, _logger, _traceLevels);
- if(_warn)
- {
- Warning out(_logger);
- out << "ignoring unexpected validate connection message:\n" << _desc;
- }
- break;
- }
-
- default:
- {
- traceHeader("received unknown message\n"
- "(invalid, closing connection)",
- stream, _logger, _traceLevels);
- throw UnknownMessageException(__FILE__, __LINE__);
- break;
- }
- }
- }
-#endif
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic));
+ throw ex;
}
- catch(const LocalException& ex)
+ if(header[4] != protocolMajor)
{
- setState(StateClosed, ex);
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(header[4]);
+ ex.badMinor = static_cast<unsigned char>(header[5]);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ if(header[6] != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(header[6]);
+ ex.badMinor = static_cast<unsigned char>(header[7]);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ const Byte messageType = header[8];
+ if(header[9] == 2)
+ {
+ FeatureNotSupportedException ex(__FILE__, __LINE__);
+ ex.unsupportedFeature = "compression";
+ throw ex;
}
-}
-
-#ifndef ICEE_PURE_CLIENT
-void
-Ice::Connection::invokeAll(Int invokeNum, Int requestId)
-{
- //
- // Note: In contrast to other private or protected methods, this
- // operation must be called *without* the mutex locked.
- //
-
- try
+
+ Int size;
+ stream.i -= sizeof(Int);
+ stream.read(size);
+ if(size < headerSize)
{
- while(invokeNum > 0)
- {
- //
- // Prepare the invocation.
- //
- bool response = requestId != 0;
-
- //
- // Prepare the response if necessary.
- //
- if(response)
- {
- assert(invokeNum == 1); // No further invocations if a response is expected.
-
- BasicStream* os = _in.os();
- os->writeBlob(&_replyHdr[0], headerSize);
-
- //
- // Add the request ID.
- //
- os->write(requestId);
- }
-
- _in.invoke(response);
-
- --invokeNum;
- }
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
}
- catch(const LocalException& ex)
+ if(size > static_cast<Int>(_instance->messageSizeMax()))
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
+ throw MemoryLimitException(__FILE__, __LINE__);
}
- catch(const std::exception& ex)
+ if(size > static_cast<Int>(stream.b.size()))
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- UnknownException uex(__FILE__, __LINE__);
- uex.unknown = string("std::exception: ") + ex.what();
- setState(StateClosed, uex);
+ stream.b.resize(size);
}
- catch(...)
+ stream.i = stream.b.begin() + pos;
+
+ if(stream.i != stream.b.end())
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- UnknownException uex(__FILE__, __LINE__);
- uex.unknown = "unknown c++ exception";
- setState(StateClosed, uex);
+ _transceiver->read(stream);
}
- //
- // If invoke() above raised an exception, and therefore neither
- // sendResponse() nor sendNoResponse() has been called, then we
- // must decrement _dispatchCount here.
- //
- if(invokeNum > 0)
+ assert(stream.i == stream.b.end());
+ stream.i = stream.b.begin() + headerSize;
+
+ switch(messageType)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert(_dispatchCount >= 0);
- if(_dispatchCount == 0)
+ case closeConnectionMsg:
+ {
+ if(_traceLevels->protocol >= 1)
{
- notifyAll();
+ traceHeader("received close connection", stream, _logger, _traceLevels);
}
+ throw CloseConnectionException(__FILE__, __LINE__);
+ break;
}
-}
-#endif
-
-void
-Ice::Connection::readStream(IceInternal::BasicStream& stream)
-{
- try
+
+ case replyMsg:
{
- stream.b.resize(headerSize);
- stream.i = stream.b.begin();
- _transceiver->read(stream);
-
- ptrdiff_t pos = stream.i - stream.b.begin();
- assert(pos >= headerSize);
- stream.i = stream.b.begin();
- Ice::Byte m[4];
- stream.read(m[0]);
- stream.read(m[1]);
- stream.read(m[2]);
- stream.read(m[3]);
- if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
+ if(_traceLevels->protocol >= 1)
{
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(m));
- throw ex;
+ traceReply("received reply", stream, _logger, _traceLevels);
}
- Byte pMajor;
- Byte pMinor;
- stream.read(pMajor);
- stream.read(pMinor);
- if(pMajor != protocolMajor)
+ stream.read(requestId);
+ break;
+ }
+
+#ifndef ICEE_PURE_CLIENT
+ case requestMsg:
+ {
+ if(_traceLevels->protocol >= 1)
{
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
+ traceRequest("received request", stream, _logger, _traceLevels);
}
- Byte eMajor;
- Byte eMinor;
- stream.read(eMajor);
- stream.read(eMinor);
- if(eMajor != encodingMajor)
+ stream.read(requestId);
+ invokeNum = 1;
+ break;
+ }
+
+ case requestBatchMsg:
+ {
+ if(_traceLevels->protocol >= 1)
{
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
+ traceBatchRequest("received batch request", stream, _logger, _traceLevels);
}
- Byte messageType;
- stream.read(messageType);
- Byte compress;
- stream.read(compress);
- Int size;
- stream.read(size);
- if(size < headerSize)
+ stream.read(invokeNum);
+ if(invokeNum < 0)
{
- throw IllegalMessageSizeException(__FILE__, __LINE__);
+ invokeNum = 0;
+ throw NegativeSizeException(__FILE__, __LINE__);
}
- if(size > static_cast<Int>(_instance->messageSizeMax()))
+ break;
+ }
+#endif
+
+ case validateConnectionMsg:
+ {
+ if(_traceLevels->protocol >= 1)
{
- throw MemoryLimitException(__FILE__, __LINE__);
+ traceHeader("received validate connection", stream, _logger, _traceLevels);
}
- if(size > static_cast<Int>(stream.b.size()))
+ if(_warn)
{
- stream.b.resize(size);
+ Warning out(_logger);
+ out << "ignoring unexpected validate connection message:\n" << _desc;
}
- stream.i = stream.b.begin() + pos;
-
- if(stream.i != stream.b.end())
+ break;
+ }
+
+ default:
+ {
+ if(_traceLevels->protocol >= 1)
{
- _transceiver->read(stream);
- assert(stream.i == stream.b.end());
+ traceHeader("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels);
}
+ throw UnknownMessageException(__FILE__, __LINE__);
+ break;
}
- catch(const Ice::LocalException& ex)
- {
- exception(ex);
}
}
@@ -1891,16 +1593,10 @@ Ice::Connection::run()
}
catch(const LocalException&)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
+ Lock sync(*this);
assert(_state == StateClosed);
-
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
+
+ Lock sendSync(_sendMonitor);
try
{
_transceiver->close();
@@ -1914,11 +1610,8 @@ Ice::Connection::run()
notifyAll();
return;
}
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateActive);
- }
+
+ activate();
bool closed = false;
@@ -1927,17 +1620,114 @@ Ice::Connection::run()
Int requestId = 0;
#ifndef ICEE_PURE_CLIENT
Int invokeNum = 0;
- _in.os()->resize(0);
- _in.is()->resize(0);
+ _in.os()->reset();
+ _in.is()->reset();
#endif
- readStream(_stream);
-
- auto_ptr<LocalException> exception;
- map<Int, Outgoing*> requests;
+ //
+ // Read and parse the next message. We don't need to lock the
+ // send monitor here as we have the guarantee that
+ // _transceiver won't be set to 0 by another thread, the
+ // thread per connection is the only thread that can set
+ // _transceiver to 0.
+ //
+ try
+ {
+#ifndef ICEE_PURE_CLIENT
+ readStreamAndParseMessage(_stream, requestId, invokeNum);
+#else
+ readStreamAndParseMessage(_stream, requestId);
+#endif
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ }
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state != StateClosed)
+ {
+#ifndef ICEE_PURE_CLIENT
+ if(invokeNum > 0) // We received a request or a batch request
+ {
+ if(_state < StateClosing)
+ {
+ _dispatchCount += invokeNum;
+ }
+ else if(invokeNum == 1)
+ {
+ invokeNum = 0;
+ if(_traceLevels->protocol >= 1)
+ {
+ traceRequest("received request during closing\n"
+ "(ignored by server, client will retry)",
+ _stream, _logger, _traceLevels);
+ }
+ }
+ else if(invokeNum > 1)
+ {
+ invokeNum = 0;
+ if(_traceLevels->protocol >= 1)
+ {
+ traceBatchRequest("received batch request during closing\n"
+ "(ignored by server, client will retry)",
+ _stream, _logger, _traceLevels);
+ }
+ }
+ }
+ else
+#endif
+ if(requestId > 0)
+ {
+ //
+ // The message is a reply, we search the Outgoing object waiting
+ // for this reply and pass it the stream before to notify the
+ // send monitor to wake up threads waiting for replies.
+ //
+ try
+ {
+ Lock sync(_sendMonitor);
+
+ map<Int, Outgoing*>::iterator p = _requests.end();
+ if(p != _requestsHint)
+ {
+ if(_requestsHint->first == requestId)
+ {
+ p = _requestsHint;
+ }
+ }
+
+ if(p == _requests.end())
+ {
+ p = _requests.find(requestId);
+ }
+
+ if(p == _requests.end())
+ {
+ throw UnknownRequestIdException(__FILE__, __LINE__);
+ }
+
+ p->second->finished(_stream);
+
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+ _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest()
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+ }
#ifndef ICEE_PURE_CLIENT
while(_state == StateHolding)
@@ -1945,51 +1735,32 @@ Ice::Connection::run()
wait();
}
#endif
- if(_state != StateClosed)
- {
-#ifndef ICEE_PURE_CLIENT
- parseMessage(_stream, requestId, invokeNum);
-#else
- parseMessage(_stream, requestId);
-#endif
- }
- //
- // parseMessage() can close the connection, so we must
- // check for closed state again.
- //
if(_state == StateClosed)
{
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
+ Lock sync(_sendMonitor);
try
{
_transceiver->close();
}
- catch(const LocalException& ex)
+ catch(const LocalException&)
{
- exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
}
-
_transceiver = 0;
notifyAll();
-
- //
- // We cannot simply return here. We have to make sure
- // that all requests are notified about the closed
- // connection below.
- //
- closed = true;
+ closed = true;
}
if(_state == StateClosed || _state == StateClosing)
{
- requests.swap(_requests);
- _requestsHint = _requests.end();
+ Lock sync(_sendMonitor);
+ assert(_exception.get());
+ for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+ _requests.clear();
+ _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest()
}
}
@@ -1999,24 +1770,69 @@ Ice::Connection::run()
// so that nested calls are possible.
//
#ifndef ICEE_PURE_CLIENT
- invokeAll(invokeNum, requestId);
-#endif
- if(requests.size() != 0)
+ try
+ {
+ for(;invokeNum > 0; --invokeNum)
+ {
+ //
+ // Prepare the response if necessary.
+ //
+ const bool response = requestId != 0;
+ if(response)
+ {
+ assert(invokeNum == 1); // No further invocations if a response is expected.
+
+ //
+ // Add the reply header and request id.
+ //
+ BasicStream* os = _in.os();
+ os->writeBlob(&_replyHdr[0], headerSize);
+ os->write(requestId);
+ }
+
+ //
+ // Dispatch the incoming request.
+ //
+ _in.invoke(response);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ }
+ catch(const std::exception& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ UnknownException uex(__FILE__, __LINE__);
+ uex.unknown = string("std::exception: ") + ex.what();
+ setState(StateClosed, uex);
+ }
+ catch(...)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ UnknownException uex(__FILE__, __LINE__);
+ uex.unknown = "unknown c++ exception";
+ setState(StateClosed, uex);
+ }
- for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ //
+ // If invoke() above raised an exception, and therefore neither
+ // sendResponse() nor sendNoResponse() has been called, then we
+ // must decrement _dispatchCount here.
+ //
+ if(invokeNum > 0)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert(_dispatchCount >= 0);
+ if(_dispatchCount == 0)
{
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ notifyAll();
}
- notifyAll(); // Wake up threads waiting in sendRequest()
}
-
- if(exception.get())
- {
- assert(closed);
- exception->ice_throw();
- }
+#endif
}
}