summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp461
1 files changed, 244 insertions, 217 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 683237d9466..f684f3525b2 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -70,6 +70,8 @@ IceInternal::Connection::validate()
os.write(validateConnectionMsg);
os.write((Byte)1); // Compression status.
os.write(headerSize); // Message size.
+
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os.i = os.b.begin();
traceHeader("sending validate connection", os, _logger, _traceLevels);
_transceiver->write(os, _endpoint->timeout());
@@ -240,25 +242,22 @@ IceInternal::Connection::destroy(DestructionReason reason)
bool
IceInternal::Connection::isValidated() const
{
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock sync(_queryMutex);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
return _state > StateNotValidated;
}
bool
IceInternal::Connection::isDestroyed() const
{
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock sync(_queryMutex);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
return _state >= StateClosing;
}
bool
IceInternal::Connection::isFinished() const
{
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock sync(_queryMutex);
- return _transceiver == 0 && _dispatchCount == 0;
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ return _threadPool == 0 && _dispatchCount == 0;
}
void
@@ -293,7 +292,7 @@ IceInternal::Connection::waitUntilFinished()
// Now we must wait for connection closure. If there is a timeout,
// we force the connection closure.
//
- while(_transceiver)
+ while(_threadPool)
{
if(_state != StateClosed && _endpoint->timeout() >= 0)
{
@@ -407,16 +406,52 @@ IceInternal::Connection::prepareRequest(BasicStream* os)
void
IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ Int requestId;
- if(_exception.get())
{
- _exception->ice_throw();
- }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- Int requestId;
+ if(_exception.get())
+ {
+ //
+ // Only raise an exception if this is a datagram or oneway
+ // call. For twoway calls, the exception will be provided
+ // using finished().
+ //
+ if(_endpoint->datagram() || oneway)
+ {
+ _exception->ice_throw();
+ }
+ else
+ {
+ out->finished(*_exception.get());
+ return;
+ }
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(!_endpoint->datagram() && !oneway)
+ {
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+ }
try
{
@@ -427,12 +462,6 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
//
if(!_endpoint->datagram() && !oneway)
{
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
@@ -467,6 +496,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
//
// Send the request.
//
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending request", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -488,6 +518,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
//
// Send the request.
//
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending request", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
@@ -495,39 +526,56 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
}
catch(const LocalException& ex)
{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // Only add to the request map if there was no exception, and if
- // the operation is not oneway.
- //
- if(!_endpoint->datagram() && !oneway)
- {
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
- }
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ //
+ // Only raise an exception if this is a datagram or oneway
+ // call. For twoway calls, the exception will be provided
+ // using the finished() callbacks.
+ //
+ if(_endpoint->datagram() || oneway)
+ {
+ _exception->ice_throw();
+ }
}
}
void
IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ assert(!_endpoint->datagram()); // Async requests are always twoway.
+
+ Int requestId;
- if(_exception.get())
{
- _exception->ice_throw();
- }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- Int requestId;
+ if(_exception.get())
+ {
+ out->__finished(*_exception.get());
+ return;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
+ pair<const Int, OutgoingAsyncPtr>(requestId, out));
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+ }
try
{
@@ -536,12 +584,6 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
//
// Fill in the request ID.
//
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
@@ -575,6 +617,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
//
// Send the request.
//
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -596,6 +639,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
//
// Send the request.
//
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
@@ -603,20 +647,9 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
}
catch(const LocalException& ex)
{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
setState(StateClosed, ex);
assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // Only add to the request map if there was no exception.
- //
- _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
- pair<const Int, OutgoingAsyncPtr>(requestId, out));
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
}
}
@@ -681,134 +714,140 @@ IceInternal::Connection::abortBatchRequest()
void
IceInternal::Connection::flushBatchRequest()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ BasicStream batchStream(_instance.get());
- if(_exception.get())
{
- _exception->ice_throw();
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ if(_exception.get())
+ {
+ //
+ // Since batch requests are all oneways (or datagrams), we
+ // must report the exception to the caller.
+ //
+ _exception->ice_throw();
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ if(_batchStream.b.empty())
+ {
+ return; // Nothing to do.
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+
+ //
+ // Reset _batchStream and _batchRequestNum, so that new batch
+ // messages can be sent.
+ //
+ _batchStream.swap(batchStream);
+ assert(_batchStream.b.empty());
+ _batchRequestNum = 0;
}
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
- if(!_batchStream.b.empty())
+ assert(!batchStream.b.empty());
+
+ try
{
- try
+ batchStream.i = batchStream.b.begin();
+
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize);
+#endif
+
+ bool compress;
+ if(batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
+ {
+ compress = false;
+ }
+ else
+ {
+ compress = _endpoint->compress();
+ }
+
+ if(compress)
{
- _batchStream.i = _batchStream.b.begin();
+ //
+ // Set compression status.
+ //
+ batchStream.b[9] = 2; // Message is compressed.
//
- // Fill in the number of requests in the batch.
+ // Do compression.
//
- const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#endif
-
- bool compress;
- if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
- {
- compress = false;
- }
- else
- {
- compress = _endpoint->compress();
- }
+ BasicStream cstream(_instance.get());
+ doCompress(batchStream, cstream);
- if(compress)
- {
- //
- // Set compression status.
- //
- _batchStream.b[9] = 2; // Message is compressed.
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(_batchStream, cstream);
-
- //
- // Send the batch request.
- //
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(_batchStream.b.size());
- p = reinterpret_cast<const Byte*>(&sz);
+ //
+ // Send the batch request.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ batchStream.i = batchStream.b.begin();
+ traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(batchStream.b.size());
+ p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
+ reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + 10);
#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
+ copy(p, p + sizeof(Int), batchStream.b.begin() + 10);
#endif
-
- //
- // Send the batch request.
- //
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver->write(_batchStream, _endpoint->timeout());
- }
//
- // Reset _batchStream and _batchRequestNum, so that new batch
- // messages can be sent.
+ // Send the batch request.
//
- BasicStream dummy(_instance.get());
- _batchStream.swap(dummy);
- assert(_batchStream.b.empty());
- _batchRequestNum = 0;
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ batchStream.i = batchStream.b.begin();
+ traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels);
+ _transceiver->write(batchStream, _endpoint->timeout());
}
}
-
- if(_proxyCount == 0 && !_adapter && closingOK())
+ catch(const LocalException& ex)
{
- setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+
+ //
+ // Since batch requests are all oneways (or datagrams), we
+ // must report the exception to the caller.
+ //
+ _exception->ice_throw();
+ }
+
+ {
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ if(_proxyCount == 0 && !_adapter && closingOK())
+ {
+ setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ }
}
}
void
IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
-
try
{
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock s(_queryMutex);
- --_dispatchCount;
- }
-
- if(_dispatchCount == 0)
- {
- notifyAll();
- }
-
- if(_state == StateClosed)
- {
- return;
- }
-
bool compress;
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
@@ -825,7 +864,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
// Set compression status.
//
os->b[9] = 2; // Message is compressed.
-
+
//
// Do compression.
//
@@ -835,6 +874,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
//
// Send the reply.
//
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceReply("sending reply", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -856,24 +896,42 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
//
// Send the reply.
//
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceReply("sending reply", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
}
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
- }
}
catch(const LocalException& ex)
{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
setState(StateClosed, ex);
}
- if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
}
}
@@ -884,22 +942,11 @@ IceInternal::Connection::sendNoResponse()
try
{
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock s(_queryMutex);
- --_dispatchCount;
- }
-
- if(_dispatchCount == 0)
+ if(--_dispatchCount == 0)
{
notifyAll();
}
- if(_state == StateClosed)
- {
- return;
- }
-
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
@@ -968,10 +1015,7 @@ IceInternal::Connection::readable() const
void
IceInternal::Connection::read(BasicStream& stream)
{
- if(_transceiver)
- {
- _transceiver->read(stream, 0);
- }
+ _transceiver->read(stream, 0);
//
// Updating _acmAbsoluteTimeout is too expensive here, because we
@@ -1102,11 +1146,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
traceRequest("received request", stream, _logger, _traceLevels);
stream.read(requestId);
invoke = 1;
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock s(_queryMutex);
- ++_dispatchCount;
- }
+ ++_dispatchCount;
}
break;
}
@@ -1127,11 +1167,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
{
throw NegativeSizeException(__FILE__, __LINE__);
}
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock s(_queryMutex);
- _dispatchCount += invoke;
- }
+ _dispatchCount += invoke;
}
break;
}
@@ -1340,12 +1376,6 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
}
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock s(_queryMutex);
- _transceiver = 0;
- }
-
_threadPool = 0; // We don't need the thread pool anymore.
notifyAll();
}
@@ -1387,7 +1417,6 @@ string
IceInternal::Connection::toString() const
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- assert(_transceiver);
return _transceiver->toString();
}
@@ -1485,10 +1514,8 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
IceInternal::Connection::~Connection()
{
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock sync(_queryMutex);
assert(_state == StateClosed);
- assert(!_transceiver);
+ assert(!_threadPool);
assert(_dispatchCount == 0);
assert(_proxyCount == 0);
}
@@ -1652,12 +1679,6 @@ IceInternal::Connection::setState(State state)
// Here we ignore any exceptions in close().
}
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock sync(_queryMutex);
- _transceiver = 0;
- }
-
_threadPool = 0; // We don't need the thread pool anymore.
}
else
@@ -1669,12 +1690,8 @@ IceInternal::Connection::setState(State state)
}
}
- {
- // See _queryMutex comment in header file.
- IceUtil::Mutex::Lock sync(_queryMutex);
- _state = state;
- _stateTime = IceUtil::Time::now();
- }
+ _state = state;
+ _stateTime = IceUtil::Time::now();
notifyAll();
@@ -1682,6 +1699,11 @@ IceInternal::Connection::setState(State state)
{
try
{
+ //
+ // Locking of _sendMutex is not necessary here, because if
+ // we are in closing state, there are no sending threads
+ // anymore.
+ //
initiateShutdown();
}
catch(const LocalException& ex)
@@ -1711,6 +1733,11 @@ IceInternal::Connection::initiateShutdown() const
os.write(closeConnectionMsg);
os.write((Byte)1); // Compression status: compression supported but not used.
os.write(headerSize); // Message size.
+
+ //
+ // Send the message.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
os.i = os.b.begin();
traceHeader("sending close connection", os, _logger, _traceLevels);
_transceiver->write(os, _endpoint->timeout());