summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp406
1 files changed, 238 insertions, 168 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 7cd4aae0847..1128835a881 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -36,17 +36,83 @@ using namespace IceInternal;
void IceInternal::incRef(Connection* p) { p->__incRef(); }
void IceInternal::decRef(Connection* p) { p->__decRef(); }
+void
+IceInternal::Connection::activate()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ setState(StateActive);
+}
+
+void
+IceInternal::Connection::hold()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ setState(StateHolding);
+}
+
+void
+IceInternal::Connection::destroy(DestructionReason reason)
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ switch(reason)
+ {
+ case ObjectAdapterDeactivated:
+ {
+ setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
+ break;
+ }
+
+ case CommunicatorDestroyed:
+ {
+ setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
+ break;
+ }
+ }
+}
+
bool
-IceInternal::Connection::destroyed() const
+IceInternal::Connection::isDestroyed() const
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
return _state >= StateClosing;
}
+bool
+IceInternal::Connection::isFinished() const
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ return _transceiver == 0;
+}
+
+void
+IceInternal::Connection::waitUntilHolding() const
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ while(_state < StateHolding || _dispatchCount > 0)
+ {
+ wait();
+ }
+}
+
+void
+IceInternal::Connection::waitUntilFinished() const
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ while(_transceiver)
+ {
+ wait();
+ }
+}
+
void
IceInternal::Connection::validate()
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
if(_endpoint->datagram())
{
@@ -122,34 +188,20 @@ IceInternal::Connection::validate()
}
void
-IceInternal::Connection::hold()
+IceInternal::Connection::incProxyCount()
{
- IceUtil::RecMutex::Lock sync(*this);
- setState(StateHolding);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ assert(_proxyCount >= 0);
+ ++_proxyCount;
}
void
-IceInternal::Connection::activate()
+IceInternal::Connection::decProxyCount()
{
- IceUtil::RecMutex::Lock sync(*this);
- setState(StateActive);
-}
-
-void
-IceInternal::Connection::incUsageCount()
-{
- IceUtil::RecMutex::Lock sync(*this);
- assert(_usageCount >= 0);
- ++_usageCount;
-}
-
-void
-IceInternal::Connection::decUsageCount()
-{
- IceUtil::RecMutex::Lock sync(*this);
- assert(_usageCount > 0);
- --_usageCount;
- if(_usageCount == 0 && !_adapter)
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ assert(_proxyCount > 0);
+ --_proxyCount;
+ if(_proxyCount == 0 && !_adapter)
{
assert(_requests.empty());
assert(_asyncRequests.empty());
@@ -164,9 +216,9 @@ IceInternal::Connection::prepareRequest(BasicStream* os)
}
void
-IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp)
+IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool compress)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
if(_exception.get())
{
@@ -198,17 +250,17 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp)
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
- comp = false;
+ compress = false;
}
else
{
if(_defaultsAndOverrides->overrideCompress)
{
- comp = _defaultsAndOverrides->overrideCompressValue;
+ compress = _defaultsAndOverrides->overrideCompressValue;
}
}
- if(comp)
+ if(compress)
{
//
// Change message type.
@@ -219,7 +271,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp)
// Do compression.
//
BasicStream cstream(_instance);
- compress(*os, cstream);
+ doCompress(*os, cstream);
//
// Send the request.
@@ -265,9 +317,9 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp)
}
void
-IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp)
+IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
if(_exception.get())
{
@@ -296,17 +348,17 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
- comp = false;
+ compress = false;
}
else
{
if(_defaultsAndOverrides->overrideCompress)
{
- comp = _defaultsAndOverrides->overrideCompressValue;
+ compress = _defaultsAndOverrides->overrideCompressValue;
}
}
- if(comp)
+ if(compress)
{
//
// Change message type.
@@ -317,7 +369,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp
// Do compression.
//
BasicStream cstream(_instance);
- compress(*os, cstream);
+ doCompress(*os, cstream);
//
// Send the request.
@@ -398,6 +450,7 @@ IceInternal::Connection::finishBatchRequest(BasicStream* os)
assert(_state < StateClosing);
_batchStream.swap(*os); // Get the batch stream back.
+ ++_batchRequestNum; // Increment the number of requests in the batch.
unlock(); // Give the Connection back.
}
@@ -409,9 +462,9 @@ IceInternal::Connection::abortBatchRequest()
}
void
-IceInternal::Connection::flushBatchRequest(bool comp)
+IceInternal::Connection::flushBatchRequest(bool compress)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
if(_exception.get())
{
@@ -428,19 +481,26 @@ IceInternal::Connection::flushBatchRequest(bool comp)
_batchStream.i = _batchStream.b.begin();
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p;
+ p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+
if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
- comp = false;
+ compress = false;
}
else
{
if(_defaultsAndOverrides->overrideCompress)
{
- comp = _defaultsAndOverrides->overrideCompressValue;
+ compress = _defaultsAndOverrides->overrideCompressValue;
}
}
- if(comp)
+ if(compress)
{
//
// Change message type.
@@ -451,7 +511,7 @@ IceInternal::Connection::flushBatchRequest(bool comp)
// Do compression.
//
BasicStream cstream(_instance);
- compress(_batchStream, cstream);
+ doCompress(_batchStream, cstream);
//
// Send the batch request.
@@ -480,11 +540,13 @@ IceInternal::Connection::flushBatchRequest(bool comp)
}
//
- // Reset _batchStream so that new batch messages can be sent.
+ // Reset _batchStream and _batchRequestNum, so that new batch
+ // messages can be sent.
//
BasicStream dummy(_instance);
_batchStream.swap(dummy);
assert(_batchStream.b.empty());
+ _batchRequestNum = 0;
}
catch(const LocalException& ex)
{
@@ -495,9 +557,9 @@ IceInternal::Connection::flushBatchRequest(bool comp)
}
void
-IceInternal::Connection::sendResponse(BasicStream* os, bool comp)
+IceInternal::Connection::sendResponse(BasicStream* os, bool compress)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
try
{
@@ -508,17 +570,17 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool comp)
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
- comp = false;
+ compress = false;
}
else
{
if(_defaultsAndOverrides->overrideCompress)
{
- comp = _defaultsAndOverrides->overrideCompressValue;
+ compress = _defaultsAndOverrides->overrideCompressValue;
}
}
- if(comp)
+ if(compress)
{
//
// Change message type.
@@ -529,7 +591,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool comp)
// Do compression.
//
BasicStream cstream(_instance);
- compress(*os, cstream);
+ doCompress(*os, cstream);
//
// Send the reply.
@@ -557,17 +619,47 @@ IceInternal::Connection::sendResponse(BasicStream* os, bool comp)
_transceiver->write(*os, _endpoint->timeout());
}
- --_responseCount;
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
- if(_state == StateClosing && _responseCount == 0 && !_endpoint->datagram())
+ if(_state == StateClosing && _dispatchCount == 0)
{
- closeConnection();
+ initiateShutdown();
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+}
+
+void
+IceInternal::Connection::sendNoResponse()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ try
+ {
+ if(_state == StateClosed)
+ {
+ return;
+ }
+
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
}
}
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- return;
}
}
@@ -588,7 +680,7 @@ IceInternal::Connection::endpoint() const
void
IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
//
// We are registered with a thread pool in active and closing
@@ -620,7 +712,7 @@ IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
ObjectAdapterPtr
IceInternal::Connection::getAdapter() const
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
return _adapter;
}
@@ -640,12 +732,13 @@ void
IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
OutgoingAsyncPtr outAsync;
- bool invoke = false;
- bool comp = false;
- bool batch = false;
+
+ Int invoke = 0;
+ Int requestId = 0;
+ bool compress = false;
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
threadPool->promoteFollower();
@@ -671,9 +764,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
messageType == compressedReplyMsg)
{
BasicStream ustream(_instance);
- uncompress(stream, ustream);
+ doUncompress(stream, ustream);
stream.b.swap(ustream.b);
- comp = true;
+ compress = true;
}
stream.i = stream.b.begin() + headerSize;
@@ -691,7 +784,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
else
{
traceRequest("received request", stream, _logger, _traceLevels);
- invoke = true;
+ stream.read(requestId);
+ invoke = 1;
+ ++_dispatchCount;
}
break;
}
@@ -707,7 +802,9 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
else
{
traceRequest("received compressed request", stream, _logger, _traceLevels);
- invoke = true;
+ stream.read(requestId);
+ invoke = 1;
+ ++_dispatchCount;
}
break;
}
@@ -723,8 +820,12 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
else
{
traceBatchRequest("received batch request", stream, _logger, _traceLevels);
- invoke = true;
- batch = true;
+ stream.read(invoke);
+ if(invoke < 0)
+ {
+ throw NegativeSizeException(__FILE__, __LINE__);
+ }
+ _dispatchCount += invoke;
}
break;
}
@@ -740,8 +841,12 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
else
{
traceBatchRequest("received compressed batch request", stream, _logger, _traceLevels);
- invoke = true;
- batch = true;
+ stream.read(invoke);
+ if(invoke < 0)
+ {
+ throw NegativeSizeException(__FILE__, __LINE__);
+ }
+ _dispatchCount += invoke;
}
break;
}
@@ -758,7 +863,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
traceReply("received reply", stream, _logger, _traceLevels);
}
- Int requestId;
stream.read(requestId);
map<Int, Outgoing*>::iterator p = _requests.end();
@@ -893,22 +997,13 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
// Method invocation must be done outside the thread
// synchronization, so that nested calls are possible.
//
- if(invoke)
+ if(invoke > 0)
{
//
- // If this is not a batch request, get the request id.
- //
- Int requestId = 0;
- if(!batch)
- {
- stream.read(requestId);
- }
- bool response = !_endpoint->datagram() && requestId != 0;
-
- //
// Prepare the invocation.
//
- Incoming in(_instance, _adapter, response ? this : 0, comp);
+ bool response = !_endpoint->datagram() && requestId != 0;
+ Incoming in(_instance, _adapter, this, response, compress);
BasicStream* is = in.is();
stream.swap(*is);
BasicStream* os = in.os();
@@ -920,53 +1015,39 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
//
if(response)
{
- ++_responseCount;
- os->write(protocolVersion);
- os->write(encodingVersion);
- os->write(replyMsg);
- os->write(Int(0)); // Message size (placeholder).
- os->write(requestId);
+ assert(invoke == 1);
+ os->writeBlob(_replyHdr);
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p;
+ p = reinterpret_cast<const Byte*>(&requestId);
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
}
//
// Do the invocation, or multiple invocations for batch
// messages.
//
- do
+ while(invoke-- > 0)
{
- if(in.invoke())
- {
- //
- // If invoke() returned true, the operation was
- // dispatched asynchronously, meaning that we
- // don't send a response below.
- //
- response = false;
- }
+ in.invoke();
}
- while(batch && is->i < is->b.end());
}
catch(const LocalException& ex)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
setState(StateClosed, ex);
return;
}
-
- //
- // Send a response if necessary.
- //
- if(response)
- {
- sendResponse(os, comp);
- }
}
}
void
IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
threadPool->promoteFollower();
@@ -977,13 +1058,15 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
else if(_state == StateClosed)
{
_transceiver->close();
+ _transceiver = 0;
+ notifyAll();
}
}
void
IceInternal::Connection::exception(const LocalException& ex)
{
- IceUtil::RecMutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
setState(StateClosed, ex);
}
@@ -1004,62 +1087,42 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_logger(_instance->logger()),
_traceLevels(_instance->traceLevels()),
_defaultsAndOverrides(_instance->defaultsAndOverrides()),
+ _registeredWithPool(false),
_warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0),
+ _requestHdr(headerSize + 4, 0),
+ _requestBatchHdr(headerSize + 4, 0),
+ _replyHdr(headerSize + 4, 0),
_nextRequestId(1),
_requestsHint(_requests.end()),
_asyncRequestsHint(_asyncRequests.end()),
_batchStream(_instance),
- _responseCount(0),
- _usageCount(0),
- _state(StateHolding),
- _registeredWithPool(false)
+ _batchRequestNum(0),
+ _dispatchCount(0),
+ _proxyCount(0),
+ _state(StateHolding)
{
vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr);
- requestHdr.reserve(headerSize + 4);
- requestHdr.push_back(protocolVersion);
- requestHdr.push_back(encodingVersion);
- requestHdr.push_back(requestMsg);
- requestHdr.push_back(0); // Message size (placeholder).
- requestHdr.push_back(0); // Message size (placeholder).
- requestHdr.push_back(0); // Message size (placeholder).
- requestHdr.push_back(0); // Message size (placeholder).
- requestHdr.push_back(0); // Request ID (placeholder).
- requestHdr.push_back(0); // Request ID (placeholder).
- requestHdr.push_back(0); // Request ID (placeholder).
- requestHdr.push_back(0); // Request ID (placeholder).
- assert(_requestHdr.size() == headerSize + 4);
+ requestHdr[0] = protocolVersion;
+ requestHdr[1] = encodingVersion;
+ requestHdr[2] = requestMsg;
vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr);
- requestBatchHdr.resize(_requestHdr.size() - 4);
- copy(_requestHdr.begin(), _requestHdr.end() - 4, requestBatchHdr.begin());
+ requestBatchHdr[0] = protocolVersion;
+ requestBatchHdr[1] = encodingVersion;
requestBatchHdr[2] = requestBatchMsg;
+
+ vector<Byte>& replyHdr = const_cast<vector<Byte>&>(_replyHdr);
+ replyHdr[0] = protocolVersion;
+ replyHdr[1] = encodingVersion;
+ replyHdr[2] = replyMsg;
}
IceInternal::Connection::~Connection()
{
- assert(_usageCount == 0);
assert(_state == StateClosed);
-}
-
-void
-IceInternal::Connection::destroy(DestructionReason reason)
-{
- RecMutex::Lock sync(*this);
-
- switch(reason)
- {
- case ObjectAdapterDeactivated:
- {
- setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
- break;
- }
-
- case CommunicatorDestroyed:
- {
- setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
- break;
- }
- }
+ assert(!_transceiver);
+ assert(_dispatchCount == 0);
+ assert(_proxyCount == 0);
}
void
@@ -1174,17 +1237,19 @@ IceInternal::Connection::setState(State state)
registerWithPool();
}
unregisterWithPool();
+ _dispatchCount = 0;
break;
}
}
_state = state;
+ notifyAll();
- if(_state == StateClosing && _responseCount == 0 && !_endpoint->datagram())
+ if(_state == StateClosing && _dispatchCount == 0)
{
try
{
- closeConnection();
+ initiateShutdown();
}
catch(const LocalException& ex)
{
@@ -1194,21 +1259,26 @@ IceInternal::Connection::setState(State state)
}
void
-IceInternal::Connection::closeConnection() const
+IceInternal::Connection::initiateShutdown() const
{
- BasicStream os(_instance);
- os.write(protocolVersion);
- os.write(encodingVersion);
- os.write(closeConnectionMsg);
- os.write(headerSize); // Message size.
- os.i = os.b.begin();
- traceHeader("sending close connection", os, _logger, _traceLevels);
- _transceiver->write(os, _endpoint->timeout());
+ assert(_state == StateClosing);
+ assert(_dispatchCount == 0);
- //
- // A close connection is always followed by a connection shutdown.
- //
- _transceiver->shutdown();
+ if(!_endpoint->datagram())
+ {
+ //
+ // Before we shut down, we send a close connection message.
+ //
+ BasicStream os(_instance);
+ os.write(protocolVersion);
+ os.write(encodingVersion);
+ os.write(closeConnectionMsg);
+ os.write(headerSize); // Message size.
+ os.i = os.b.begin();
+ traceHeader("sending close connection", os, _logger, _traceLevels);
+ _transceiver->write(os, _endpoint->timeout());
+ _transceiver->shutdown();
+ }
}
void
@@ -1321,7 +1391,7 @@ getBZ2Error(int bzError)
}
void
-IceInternal::Connection::compress(BasicStream& uncompressed, BasicStream& compressed)
+IceInternal::Connection::doCompress(BasicStream& uncompressed, BasicStream& compressed)
{
const Byte* p;
@@ -1366,7 +1436,7 @@ IceInternal::Connection::compress(BasicStream& uncompressed, BasicStream& compre
}
void
-IceInternal::Connection::uncompress(BasicStream& compressed, BasicStream& uncompressed)
+IceInternal::Connection::doUncompress(BasicStream& compressed, BasicStream& uncompressed)
{
Int uncompressedSize;
compressed.i = compressed.b.begin() + headerSize;