summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp1202
1 files changed, 310 insertions, 892 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 39226868d6c..c896d190569 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -16,6 +16,7 @@
#include <Ice/DefaultsAndOverrides.h>
#include <Ice/Transceiver.h>
#include <Ice/ThreadPool.h>
+#include <Ice/SelectorThread.h>
#include <Ice/ConnectionMonitor.h>
#include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager().
#include <Ice/EndpointI.h>
@@ -31,33 +32,33 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
-namespace
+Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; }
+
+namespace IceInternal
{
-class CallFinished : public ThreadPoolWorkItem
+class FlushSentCallbacks : public ThreadPoolWorkItem
{
public:
- CallFinished(const Ice::ConnectionIPtr& connection) : _connection(connection)
+ FlushSentCallbacks(const Ice::ConnectionIPtr& connection) : _connection(connection)
{
}
- virtual void
+ void
execute(const ThreadPoolPtr& threadPool)
{
- _connection->finished(threadPool);
+ threadPool->promoteFollower();
+ _connection->flushSentCallbacks();
}
private:
- ConnectionIPtr _connection;
+ const Ice::ConnectionIPtr _connection;
};
}
-
-Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; }
-
void
Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
{
@@ -118,7 +119,7 @@ Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
if(!response)
{
//
- // Only notify oneway requests. The connection keeps track of twoway
+ // Only notify oneway requests. The connection keeps track of twoway
// requests in the _requests/_asyncRequests maps and will notify them
// of the connection exceptions.
//
@@ -144,60 +145,21 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
{
try
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state == StateClosed) // The connection might already be closed if the communicator was destroyed.
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _startCallback = callback;
-
- //
- // The connection might already be closed if the communicator was destroyed.
- //
- if(_state == StateClosed)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
+ assert(_exception.get());
+ _exception->ice_throw();
}
- if(_threadPerConnection)
+ SocketStatus status = initialize();
+ if(status == Finished)
{
- //
- // In thread per connection mode, we create the thread for the connection. The
- // intialization and validation of the connection is taken care of by the thread
- // per connection.
- //
- try
- {
- _thread = new ThreadPerConnection(this);
- _thread->start(_threadPerConnectionStackSize);
- }
- catch(const IceUtil::Exception& ex)
- {
- {
- Error out(_logger);
- out << "cannot create thread for connection:\n" << ex;
- }
-
- //
- // Clean up.
- //
- _thread = 0;
- ex.ice_throw();
- }
+ status = validate();
}
- else
- {
- SocketStatus status = initialize(0);
- if(status == Finished)
- {
- status = validate(0);
- }
-
- if(status == Finished)
- {
- finishStart();
- return; // We're done!
- }
+ if(status != Finished)
+ {
int timeout;
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
if(defaultsAndOverrides->overrideConnectTimeout)
@@ -208,25 +170,24 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
{
timeout = _endpoint->timeout();
}
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_state == StateClosed)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
+
_sendInProgress = true;
_selectorThread->_register(_transceiver->fd(), this, status, timeout);
- }
-
- if(!callback) // Wait for the connection to be validated.
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(callback)
+ {
+ _startCallback = callback;
+ return;
+ }
+
+ //
+ // Wait for the connection to be validated.
+ //
while(_state <= StateNotValidated)
{
wait();
}
-
+
if(_state >= StateClosing)
{
assert(_exception.get());
@@ -236,16 +197,22 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
}
catch(const Ice::LocalException& ex)
{
+ exception(ex);
+ if(callback)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- if(callback)
- {
- return;
- }
+ callback->connectionStartFailed(this, *_exception.get());
+ return;
}
- waitUntilFinished();
- throw;
+ else
+ {
+ waitUntilFinished();
+ throw;
+ }
+ }
+
+ if(callback)
+ {
+ callback->connectionStartCompleted(this);
}
}
@@ -282,7 +249,7 @@ void
Ice::ConnectionI::destroy(DestructionReason reason)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
+
switch(reason)
{
case ObjectAdapterDeactivated:
@@ -290,7 +257,7 @@ Ice::ConnectionI::destroy(DestructionReason reason)
setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
break;
}
-
+
case CommunicatorDestroyed:
{
setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
@@ -303,7 +270,7 @@ void
Ice::ConnectionI::close(bool force)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
+
if(force)
{
setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
@@ -321,7 +288,7 @@ Ice::ConnectionI::close(bool force)
{
wait();
}
-
+
setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
@@ -342,42 +309,24 @@ Ice::ConnectionI::isActiveOrHolding() const
bool
Ice::ConnectionI::isFinished() const
{
- IceUtil::ThreadPtr threadPerConnection;
+ //
+ // We can use trylock here, because as long as there are still
+ // threads operating in this connection object, connection
+ // destruction is considered as not yet finished.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
+ if(!sync.acquired())
{
- //
- // We can use trylock here, because as long as there are still
- // threads operating in this connection object, connection
- // destruction is considered as not yet finished.
- //
- IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
-
- if(!sync.acquired())
- {
- return false;
- }
-
- if(_transceiver || _dispatchCount != 0)
- {
- return false;
- }
-
- if(_thread && _thread->isAlive())
- {
- return false;
- }
-
- assert(_state == StateClosed);
-
- threadPerConnection = _thread;
- _thread = 0;
+ return false;
}
- if(threadPerConnection)
+ if(_transceiver || _dispatchCount != 0)
{
- threadPerConnection->getThreadControl().join();
+ return false;
}
+ assert(_state == StateClosed);
return true;
}
@@ -407,80 +356,68 @@ Ice::ConnectionI::waitUntilHolding() const
void
Ice::ConnectionI::waitUntilFinished()
{
- IceUtil::ThreadPtr threadPerConnection;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We wait indefinitely until connection closing has been
- // initiated. We also wait indefinitely until all outstanding
- // requests are completed. Otherwise we couldn't guarantee
- // that there are no outstanding calls when deactivate() is
- // called on the servant locators.
- //
- while(_state < StateClosing || _dispatchCount > 0)
- {
- wait();
- }
-
- //
- // Now we must wait until close() has been called on the
- // transceiver.
- //
- while(_transceiver)
+ wait();
+ }
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver)
+ {
+ if(_state != StateClosed && _endpoint->timeout() >= 0)
{
- if(_state != StateClosed && _endpoint->timeout() >= 0)
+ IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
+ IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(IceUtil::Time::Monotonic);
+
+ if(waitTime > IceUtil::Time())
{
- IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
- IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(IceUtil::Time::Monotonic);
-
- if(waitTime > IceUtil::Time())
- {
- //
- // We must wait a bit longer until we close this
- // connection.
- //
- if(!timedWait(waitTime))
- {
- setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
- }
- }
- else
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ if(!timedWait(waitTime))
{
- //
- // We already waited long enough, so let's close this
- // connection!
- //
setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
}
-
- //
- // No return here, we must still wait until close() is
- // called on the _transceiver.
- //
}
else
{
- wait();
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
}
- }
-
- assert(_state == StateClosed);
-
- threadPerConnection = _thread;
- _thread = 0;
- //
- // Clear the OA. See bug 1673 for the details of why this is necessary.
- //
- _adapter = 0;
+ //
+ // No return here, we must still wait until close() is
+ // called on the _transceiver.
+ //
+ }
+ else
+ {
+ wait();
+ }
}
- if(threadPerConnection)
- {
- threadPerConnection->getThreadControl().join();
- }
+ assert(_state == StateClosed);
+
+ //
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
+ //
+ _adapter = 0;
}
void
@@ -491,12 +428,12 @@ Ice::ConnectionI::monitor()
{
return;
}
-
+
if(_state != StateActive)
{
return;
}
-
+
//
// Active connection management for idle connections.
//
@@ -507,7 +444,7 @@ Ice::ConnectionI::monitor()
{
return;
}
-
+
if(IceUtil::Time::now(IceUtil::Time::Monotonic) >= _acmAbsoluteTimeout)
{
setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
@@ -532,7 +469,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
-
+
Int requestId;
if(response)
{
@@ -545,7 +482,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
_nextRequestId = 1;
requestId = _nextRequestId++;
}
-
+
//
// Fill in the request ID.
//
@@ -560,7 +497,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
//
// Send the message. If it can't be sent without blocking the message is added
// to _sendStreams and it will be sent by the selector thread.
- //
+ //
bool sent = false;
try
{
@@ -585,7 +522,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
return sent;
}
-void
+bool
Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
{
BasicStream* os = out->__getOs();
@@ -616,7 +553,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
_nextRequestId = 1;
requestId = _nextRequestId++;
}
-
+
//
// Fill in the request ID.
//
@@ -628,10 +565,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
#endif
}
+ bool sent;
try
{
OutgoingMessage message(out, os, compress, response);
- sendMessage(message);
+ sent = sendMessage(message);
}
catch(const LocalException& ex)
{
@@ -648,6 +586,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
_asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
pair<const Int, OutgoingAsyncPtr>(requestId, out));
}
+ return sent;
}
void
@@ -705,17 +644,17 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
// Get the batch stream back.
//
_batchStream.swap(*os);
-
+
if(_exception.get())
{
_exception->ice_throw();
}
-
+
bool flush = false;
if(_batchAutoFlush)
{
//
- // Throw memory limit exception if the first message added causes us to
+ // Throw memory limit exception if the first message added causes us to
// go over limit. Otherwise put aside the marshalled message that caused
// limit to be exceeded and rollback stream to the marker.
//
@@ -743,7 +682,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
//
vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
_batchStream.b.resize(_batchMarker);
-
+
//
// Send the batch stream without the last request.
//
@@ -785,19 +724,19 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
{
throw MemoryLimitException(__FILE__, __LINE__);
}
-
+
//
// Start a new batch with the last message that caused us to go over the limit.
//
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
}
//
// Increment the number of requests in the batch.
//
++_batchRequestNum;
-
+
//
// We compress the whole batch if there is at least one compressed
// message.
@@ -806,7 +745,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
{
_batchRequestCompress = true;
}
-
+
//
// Notify about the batch stream not being in use anymore.
//
@@ -852,7 +791,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
{
wait();
}
-
+
if(_exception.get())
{
_exception->ice_throw();
@@ -902,7 +841,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
return sent;
}
-void
+bool
Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -919,7 +858,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
if(_batchRequestNum == 0)
{
outAsync->__sent(this);
- return;
+ return true;
}
//
@@ -936,10 +875,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
//
// Send the batch stream.
//
+ bool sent;
try
{
OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false);
- sendMessage(message);
+ sent = sendMessage(message);
}
catch(const Ice::LocalException& ex)
{
@@ -956,6 +896,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
+ return sent;
}
void
@@ -976,7 +917,7 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
assert(_exception.get());
_exception->ice_throw();
}
-
+
OutgoingMessage message(os, compressFlag > 0);
sendMessage(message);
@@ -987,7 +928,7 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) +
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) +
IceUtil::Time::seconds(_acmTimeout);
}
}
@@ -1002,14 +943,14 @@ Ice::ConnectionI::sendNoResponse()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
-
+
try
{
if(--_dispatchCount == 0)
{
notifyAll();
}
-
+
if(_state == StateClosed)
{
assert(_exception.get());
@@ -1020,10 +961,10 @@ Ice::ConnectionI::sendNoResponse()
{
initiateShutdown();
}
-
+
if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) +
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) +
IceUtil::Time::seconds(_acmTimeout);
}
}
@@ -1039,12 +980,6 @@ Ice::ConnectionI::endpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
-bool
-Ice::ConnectionI::threadPerConnection() const
-{
- return _threadPerConnection; // No mutex protection necessary, _threadPerConnection is immutable.
-}
-
void
Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
{
@@ -1102,23 +1037,19 @@ Ice::ConnectionI::createProxy(const Identity& ident) const
bool
Ice::ConnectionI::datagram() const
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint->datagram(); // No mutex protection necessary, _endpoint is immutable.
}
bool
Ice::ConnectionI::readable() const
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return true;
}
bool
Ice::ConnectionI::read(BasicStream& stream)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- return _transceiver->read(stream, 0);
+ return _transceiver->read(stream);
//
// Updating _acmAbsoluteTimeout is too expensive here, because we
@@ -1130,8 +1061,6 @@ Ice::ConnectionI::read(BasicStream& stream)
void
Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
Byte compress = 0;
Int requestId = 0;
Int invokeNum = 0;
@@ -1147,7 +1076,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
// could be various race conditions with close connection
// messages and other messages.
//
- threadPool->promoteFollower();
+ threadPool->promoteFollower(this);
if(_state != StateClosed)
{
@@ -1184,22 +1113,14 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
void
Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
threadPool->promoteFollower();
auto_ptr<LocalException> localEx;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- --_finishedCount;
- assert(threadPool.get() == _threadPool.get());
- if(_finishedCount > 0 || _state != StateClosed || _sendInProgress)
- {
- return;
- }
+ assert(threadPool.get() == _threadPool.get() && _state == StateClosed && !_sendInProgress);
_threadPool->decFdsInUse();
_selectorThread->decFdsInUse();
@@ -1212,19 +1133,25 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
{
localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
}
-
+
_transceiver = 0;
notifyAll();
+
+ _flushSentCallbacks = 0; // Clear cyclic reference count.
}
- finishStart(*_exception.get());
+ if(_startCallback)
+ {
+ _startCallback->connectionStartFailed(this, *_exception.get());
+ _startCallback = 0;
+ }
// Note: the streams must be cleared first because they expect the Outgoing objects to still be valid.
- for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o)
+ for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
o->finished(*_exception.get());
}
- _queuedStreams.clear();
+ _sendStreams.clear();
for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
@@ -1237,11 +1164,11 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
q->second->__finished(*_exception.get()); // The exception is immutable at this point.
}
_asyncRequests.clear();
-
+
if(localEx.get())
{
localEx->ice_throw();
- }
+ }
}
void
@@ -1294,12 +1221,21 @@ Ice::ConnectionI::toString() const
//
// Operations from SocketReadyCallback
-//
-SocketStatus
-Ice::ConnectionI::socketReady(bool finished)
+//
+SocketStatus
+Ice::ConnectionI::socketReady()
{
- if(!finished)
+ StartCallbackPtr callback;
+
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_sendInProgress);
+
+ if(_state == StateClosed)
+ {
+ return Finished;
+ }
+
try
{
//
@@ -1308,7 +1244,7 @@ Ice::ConnectionI::socketReady(bool finished)
//
if(!_sendStreams.empty())
{
- if(!send(0))
+ if(!send())
{
return NeedWrite;
}
@@ -1316,100 +1252,57 @@ Ice::ConnectionI::socketReady(bool finished)
}
else
{
- //
- // If there's nothing to send, we're still validating the connection.
- //
- int state;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_state == StateClosed || _state <= StateNotValidated);
-
- state = _state;
-
- if(_state == StateClosed)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
- }
-
- if(state == StateNotInitialized)
+ assert(_state == StateClosed || _state <= StateNotValidated);
+ if(_state == StateNotInitialized)
{
- SocketStatus status = initialize(0);
+ SocketStatus status = initialize();
if(status != Finished)
{
return status;
}
}
-
- if(state <= StateNotValidated)
+
+ if(_state <= StateNotValidated)
{
- SocketStatus status = validate(0);
+ SocketStatus status = validate();
if(status != Finished)
{
return status;
}
}
-
- finishStart();
+
+ swap(_startCallback, callback);
}
}
catch(const Ice::LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateClosed, ex);
+ return Finished;
}
- }
- //
- // If there's no more data to send or if connection validation is finished, we checkout
- // the connection state to figure out whether or not it's time to unregister with the
- // selector thread.
- //
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_sendInProgress);
- if(_state == StateClosed)
- {
- assert(!_startCallback || (!_threadPerConnection && !_registeredWithPool));
-
- _queuedStreams.insert(_queuedStreams.begin(), _sendStreams.begin(), _sendStreams.end());
- _sendStreams.clear();
- _sendInProgress = false;
-
- if(_threadPerConnection)
- {
- _transceiver->shutdownReadWrite();
- }
- else
- {
- if(!_registeredWithPool)
- {
- _threadPool->execute(new CallFinished(this));
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
- else
- {
- unregisterWithPool();
- }
- }
- notifyAll();
- return Finished;
- }
- else if(_queuedStreams.empty())
- {
+ assert(_sendStreams.empty());
+ _selectorThread->unregister(this);
_sendInProgress = false;
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
- return Finished;
}
- else
+
+ if(callback)
{
- _sendStreams.swap(_queuedStreams);
- return NeedWrite; // We're not finished yet, there's more data to send!
+ callback->connectionStartCompleted(this);
}
+ return Finished;
+}
+
+void
+Ice::ConnectionI::socketFinished()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_sendInProgress && _state == StateClosed);
+ _sendInProgress = false;
+ _threadPool->finish(this);
}
void
@@ -1441,12 +1334,8 @@ Ice::ConnectionI::getTransceiver() const
Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
const TransceiverPtr& transceiver,
const EndpointIPtr& endpoint,
- const ObjectAdapterPtr& adapter,
- bool threadPerConnection,
- size_t threadPerConnectionStackSize) :
- EventHandler(instance),
- _threadPerConnection(threadPerConnection),
- _threadPerConnectionStackSize(threadPerConnectionStackSize),
+ const ObjectAdapterPtr& adapter) :
+ EventHandler(instance, transceiver->fd()),
_transceiver(transceiver),
_desc(transceiver->toString()),
_type(transceiver->type()),
@@ -1454,8 +1343,6 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_adapter(adapter),
_logger(_instance->initializationData().logger), // Cached for better performance.
_traceLevels(_instance->traceLevels()), // Cached for better performance.
- _registeredWithPool(false),
- _finishedCount(0),
_warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
_acmTimeout(0),
_compressionLevel(1),
@@ -1512,30 +1399,20 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
__setNoDelete(true);
try
{
- if(!threadPerConnection)
+ if(adapterImpl)
{
- //
- // Only set _threadPool if we really need it, i.e., if we are
- // not in thread per connection mode. Thread pools have lazy
- // initialization in Instance, and we don't want them to be
- // created if they are not needed.
- //
- if(adapterImpl)
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool();
- }
- else
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
- }
- _threadPool->incFdsInUse();
+ const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool();
}
+ else
+ {
+ const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
+ }
+ _threadPool->incFdsInUse();
- //
- // Only set selector thread if we really need it.
- //
const_cast<SelectorThreadPtr&>(_selectorThread) = _instance->selectorThread();
_selectorThread->incFdsInUse();
+
+ _flushSentCallbacks = new FlushSentCallbacks(this);
}
catch(const IceUtil::Exception&)
{
@@ -1551,8 +1428,6 @@ Ice::ConnectionI::~ConnectionI()
assert(_state == StateClosed);
assert(!_transceiver);
assert(_dispatchCount == 0);
- assert(!_thread);
- assert(_queuedStreams.empty());
assert(_requests.empty());
assert(_asyncRequests.empty());
}
@@ -1636,7 +1511,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
-
+
switch(state)
{
case StateNotInitialized:
@@ -1644,7 +1519,7 @@ Ice::ConnectionI::setState(State state)
assert(false);
break;
}
-
+
case StateNotValidated:
{
if(_state != StateNotInitialized)
@@ -1665,13 +1540,10 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_threadPerConnection)
- {
- registerWithPool();
- }
+ _threadPool->_register(this);
break;
}
-
+
case StateHolding:
{
//
@@ -1682,10 +1554,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_threadPerConnection)
- {
- unregisterWithPool();
- }
+ _threadPool->unregister(this);
break;
}
@@ -1698,13 +1567,10 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_threadPerConnection)
- {
- registerWithPool(); // We need to continue to read in closing state.
- }
+ _threadPool->_register(this); // We need to continue to read in closing state.
break;
}
-
+
case StateClosed:
{
if(_sendInProgress)
@@ -1713,39 +1579,15 @@ Ice::ConnectionI::setState(State state)
// Unregister with both the pool and the selector thread. We unregister with
// the pool to ensure that it stops reading on the socket (otherwise, if the
// socket is closed the thread pool would spin always reading 0 from the FD).
- // The selector thread will register again the FD with the pool once it's
+ // The selector thread will register again the FD with the pool once it's
// done.
//
- _selectorThread->unregister(_transceiver->fd());
- if(!_threadPerConnection)
- {
- unregisterWithPool();
- }
-
- _transceiver->shutdownWrite(); // Prevent further writes.
- }
- else if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode or we're initializing
- // the connection in blocking mode, we shutdown both for reading
- // and writing. This will unblock and read call with an exception.
- // The thread per connection then closes the transceiver.
- //
- _transceiver->shutdownReadWrite();
+ _selectorThread->finish(this);
+ _threadPool->unregister(this);
}
else
{
- if(!_registeredWithPool)
- {
- _threadPool->execute(new CallFinished(this));
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
- else
- {
- unregisterWithPool();
- }
- _transceiver->shutdownWrite(); // Prevent further writes.
+ _threadPool->finish(this);
}
break;
}
@@ -1828,46 +1670,24 @@ Ice::ConnectionI::initiateShutdown()
}
SocketStatus
-Ice::ConnectionI::initialize(int timeout)
+Ice::ConnectionI::initialize()
{
- try
+ SocketStatus status = _transceiver->initialize();
+ if(status != Finished)
{
- SocketStatus status = _transceiver->initialize(timeout);
- if(status != Finished)
- {
- if(timeout != 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
- return status;
- }
+ return status;
}
- catch(const TimeoutException&)
- {
- throw ConnectTimeoutException(__FILE__, __LINE__);
- }
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_state == StateClosed)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // Update the connection description once the transceiver is initialized.
- //
- const_cast<string&>(_desc) = _transceiver->toString();
- setState(StateNotValidated);
- }
-
+ //
+ // Update the connection description once the transceiver is initialized.
+ //
+ const_cast<string&>(_desc) = _transceiver->toString();
+ setState(StateNotValidated);
return Finished;
}
SocketStatus
-Ice::ConnectionI::validate(int timeout)
+Ice::ConnectionI::validate()
{
if(!_endpoint->datagram()) // Datagram connections are always implicitly validated.
{
@@ -1890,26 +1710,10 @@ Ice::ConnectionI::validate(int timeout)
os.i = os.b.begin();
traceSend(os, _logger, _traceLevels);
}
- else
- {
- // The stream can only be non-empty if we're doing a non-blocking connection validation.
- assert(!_threadPerConnection);
- }
- try
- {
- if(!_transceiver->write(os, timeout))
- {
- if(timeout != 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
- return NeedWrite;
- }
- }
- catch(const TimeoutException&)
+ if(!_transceiver->write(os))
{
- throw ConnectTimeoutException(__FILE__, __LINE__);
+ return NeedWrite;
}
}
else // The client side has the passive role for connection validation.
@@ -1920,26 +1724,10 @@ Ice::ConnectionI::validate(int timeout)
is.b.resize(headerSize);
is.i = is.b.begin();
}
- else
- {
- // The stream can only be non-empty if we're doing a non-blocking connection validation.
- assert(!_threadPerConnection);
- }
- try
- {
- if(!_transceiver->read(is, timeout))
- {
- if(timeout != 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
- return NeedRead;
- }
- }
- catch(const TimeoutException&)
+ if(!_transceiver->read(is))
{
- throw ConnectTimeoutException(__FILE__, __LINE__);
+ return NeedRead;
}
assert(is.i == is.b.end());
@@ -1999,32 +1787,24 @@ Ice::ConnectionI::validate(int timeout)
}
}
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _stream.resize(0);
- _stream.i = _stream.b.begin();
-
- if(_state == StateClosed)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- }
+ _stream.resize(0);
+ _stream.i = _stream.b.begin();
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
return Finished;
}
bool
-Ice::ConnectionI::send(int timeout)
+Ice::ConnectionI::send()
{
assert(_transceiver);
assert(!_sendStreams.empty());
-
+
+ bool flushSentCallbacks = _sentCallbacks.empty();
+
while(!_sendStreams.empty())
{
OutgoingMessage* message = &_sendStreams.front();
@@ -2041,7 +1821,7 @@ Ice::ConnectionI::send(int timeout)
// Message compressed. Request compressed response, if any.
//
message->stream->b[9] = 2;
-
+
//
// Do compression.
//
@@ -2069,7 +1849,7 @@ Ice::ConnectionI::send(int timeout)
//
message->stream->b[9] = 1;
}
-
+
//
// No compression, just fill in the message size.
//
@@ -2097,22 +1877,48 @@ Ice::ConnectionI::send(int timeout)
// Send the first message.
//
assert(message->stream->i);
- if(!_transceiver->write(*message->stream, timeout))
+ if(!_transceiver->write(*message->stream))
{
- assert(timeout == 0);
+ if(flushSentCallbacks && !_sentCallbacks.empty())
+ {
+ _threadPool->execute(_flushSentCallbacks);
+ }
return false;
}
//
// Notify the message that it was sent.
//
- message->sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread.
+ message->sent(this, true);
+ if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get()))
+ {
+ _sentCallbacks.push_back(message->outAsync);
+ }
_sendStreams.pop_front();
}
+ if(flushSentCallbacks && !_sentCallbacks.empty())
+ {
+ _threadPool->execute(_flushSentCallbacks);
+ }
return true;
}
+void
+Ice::ConnectionI::flushSentCallbacks()
+{
+ vector<OutgoingAsyncMessageCallbackPtr> callbacks;
+ {
+ Lock sync(*this);
+ assert(!_sentCallbacks.empty());
+ _sentCallbacks.swap(callbacks);
+ }
+ for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
+ {
+ (*p)->__sent(_instance);
+ }
+}
+
bool
Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
@@ -2120,35 +1926,29 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
message.stream->i = 0; // Reset the message stream iterator before starting sending the message.
- //
- // If another thread is currently sending messages, we queue the
- // message in _queuedStreams. It will be picked up eventually by
- // the selector thread once the messages from _sendStreams are all
- // sent.
- //
if(_sendInProgress)
- {
- _queuedStreams.push_back(message);
- _queuedStreams.back().adopt(0);
+ {
+ _sendStreams.push_back(message);
+ _sendStreams.back().adopt(0);
return false;
}
assert(!_sendInProgress);
//
- // Attempt to send the message without blocking. If the send blocks, we register
+ // Attempt to send the message without blocking. If the send blocks, we register
// the connection with the selector thread.
//
message.stream->i = message.stream->b.begin();
-
+
if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes.
{
//
// Message compressed. Request compressed response, if any.
//
message.stream->b[9] = 2;
-
+
//
// Do compression.
//
@@ -2168,12 +1968,12 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
// Send the message without blocking.
//
- if(_transceiver->write(stream, 0))
+ if(_transceiver->write(stream))
{
message.sent(this, false);
if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout =
+ _acmAbsoluteTimeout =
IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
return true;
@@ -2191,7 +1991,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
message.stream->b[9] = 1;
}
-
+
//
// No compression, just fill in the message size.
//
@@ -2203,7 +2003,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
copy(p, p + sizeof(Int), message.stream->b.begin() + 10);
#endif
message.stream->i = message.stream->b.begin();
-
+
if(message.outAsync)
{
trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
@@ -2216,12 +2016,12 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
// Send the message without blocking.
//
- if(_transceiver->write(*message.stream, 0))
+ if(_transceiver->write(*message.stream))
{
message.sent(this, false);
if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout =
+ _acmAbsoluteTimeout =
IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
return true;
@@ -2236,71 +2036,6 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
return false;
}
-void
-Ice::ConnectionI::finishStart()
-{
- //
- // We set _startCallback to null to break potential cyclic reference count
- // and because the destructor checks for it to ensure that we always invoke
- // on the callback.
- //
-
- StartCallbackPtr callback;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- swap(callback, _startCallback);
- }
- if(callback)
- {
- callback->connectionStartCompleted(this);
- }
-}
-
-void
-Ice::ConnectionI::finishStart(const Ice::LocalException& ex)
-{
- //
- // We set _startCallback to null to break potential cyclic reference count
- // and because the destructor checks for it to ensure that we always invoke
- // on the callback.
- //
-
- StartCallbackPtr callback;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- swap(callback, _startCallback);
- }
- if(callback)
- {
- callback->connectionStartFailed(this, ex);
- }
-}
-
-void
-Ice::ConnectionI::registerWithPool()
-{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- if(!_registeredWithPool)
- {
- _threadPool->_register(_transceiver->fd(), this);
- _registeredWithPool = true;
- }
-}
-
-void
-Ice::ConnectionI::unregisterWithPool()
-{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- if(_registeredWithPool)
- {
- _threadPool->unregister(_transceiver->fd());
- _registeredWithPool = false;
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
-}
-
static string
getBZ2Error(int bzError)
{
@@ -2385,7 +2120,7 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed)
throw ex;
}
compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
-
+
//
// Write the size of the compressed stream into the header of the
// uncompressed stream. Since the header will be copied, this size
@@ -2410,7 +2145,7 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed)
#else
copy(p, p + sizeof(Int), compressed.b.begin() + headerSize);
#endif
-
+
//
// Copy the header from the uncompressed stream to the compressed one.
//
@@ -2452,18 +2187,18 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
OutgoingAsyncPtr& outAsync)
{
assert(_state > StateNotValidated && _state < StateClosed);
-
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
-
+
try
{
//
// We don't need to check magic and version here. This has
- // already been done by the ThreadPool or the
- // ThreadPerConnection, which provides us with the stream.
+ // already been done by the ThreadPool, which provides us
+ // with the stream.
//
assert(stream.i == stream.b.end());
stream.i = stream.b.begin() + 8;
@@ -2477,7 +2212,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
stream.b.swap(ustream.b);
}
stream.i = stream.b.begin() + headerSize;
-
+
switch(messageType)
{
case closeConnectionMsg:
@@ -2497,7 +2232,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
break;
}
-
+
case requestMsg:
{
if(_state == StateClosing)
@@ -2516,12 +2251,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
break;
}
-
+
case requestBatchMsg:
{
if(_state == StateClosing)
{
- trace("received batch request during closing\n(ignored by server, client will retry)", stream,
+ trace("received batch request during closing\n(ignored by server, client will retry)", stream,
_logger, _traceLevels);
}
else
@@ -2539,16 +2274,16 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
break;
}
-
+
case replyMsg:
{
traceRecv(stream, _logger, _traceLevels);
-
+
stream.read(requestId);
-
+
map<Int, Outgoing*>::iterator p = _requests.end();
map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end();
-
+
if(_requestsHint != _requests.end())
{
if(_requestsHint->first == requestId)
@@ -2556,7 +2291,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
p = _requestsHint;
}
}
-
+
if(p == _requests.end())
{
if(_asyncRequestsHint != _asyncRequests.end())
@@ -2567,26 +2302,26 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
}
}
-
+
if(p == _requests.end() && q == _asyncRequests.end())
{
p = _requests.find(requestId);
}
-
+
if(p == _requests.end() && q == _asyncRequests.end())
{
q = _asyncRequests.find(requestId);
}
-
+
if(p == _requests.end() && q == _asyncRequests.end())
{
throw UnknownRequestIdException(__FILE__, __LINE__);
}
-
+
if(p != _requests.end())
{
p->second->finished(stream);
-
+
if(p == _requestsHint)
{
_requests.erase(p++);
@@ -2600,9 +2335,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
else
{
assert(q != _asyncRequests.end());
-
+
outAsync = q->second;
-
+
if(q == _asyncRequestsHint)
{
_asyncRequests.erase(q++);
@@ -2613,10 +2348,10 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
_asyncRequests.erase(q);
}
}
-
+
break;
}
-
+
case validateConnectionMsg:
{
traceRecv(stream, _logger, _traceLevels);
@@ -2627,7 +2362,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
break;
}
-
+
default:
{
trace("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels);
@@ -2678,7 +2413,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
BasicStream* is = in.is();
stream.swap(*is);
BasicStream* os = in.os();
-
+
//
// Prepare the response if necessary.
//
@@ -2686,7 +2421,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
{
assert(invokeNum == 1); // No further invocations if a response is expected.
os->writeBlob(replyHdr, sizeof(replyHdr));
-
+
//
// Add the request ID.
//
@@ -2694,7 +2429,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
}
in.invoke(servantManager);
-
+
//
// If there are more invocations, we need the stream back.
//
@@ -2710,320 +2445,3 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
}
}
-void
-Ice::ConnectionI::run()
-{
- try
- {
- //
- // Initialize the connection transceiver and validate the connection using
- // blocking operations.
- //
- SocketStatus status;
-
- int timeout;
- DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- if(defaultsAndOverrides->overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides->overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint->timeout();
- }
-
- status = initialize(timeout);
- assert(status == Finished);
-
- status = validate(timeout);
- assert(status == Finished);
- }
- catch(const LocalException& ex)
- {
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
-
- if(_transceiver)
- {
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- _transceiver = 0;
- }
- notifyAll();
- }
-
- finishStart(ex);
- return;
- }
-
- finishStart();
-
- const bool warnUdp = _instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0;
-
- bool closed = false;
-
- while(!closed)
- {
- //
- // We must accept new connections outside the thread
- // synchronization, because we use blocking accept.
- //
-
- BasicStream stream(_instance.get());
-
- try
- {
- stream.b.resize(headerSize);
- stream.i = stream.b.begin();
- _transceiver->read(stream, -1);
-
- ptrdiff_t pos = stream.i - stream.b.begin();
- if(pos < headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- stream.i = stream.b.begin();
- const Byte* header;
- stream.readBlob(header, headerSize);
- if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3])
- {
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic));
- throw ex;
- }
- if(header[4] != protocolMajor)
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(header[4]);
- ex.badMinor = static_cast<unsigned char>(header[5]);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- if(header[6] != encodingMajor)
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(header[6]);
- ex.badMinor = static_cast<unsigned char>(header[7]);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
-
- Int size;
- stream.i -= sizeof(Int);
- stream.read(size);
- if(size < headerSize)
- {
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(_instance->messageSizeMax()))
- {
- throw MemoryLimitException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(stream.b.size()))
- {
- stream.b.resize(size);
- }
- stream.i = stream.b.begin() + pos;
-
- if(stream.i != stream.b.end())
- {
- if(_endpoint->datagram())
- {
- if(warnUdp)
- {
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << pos << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
- else
- {
- _transceiver->read(stream, -1);
- assert(stream.i == stream.b.end());
- }
- }
- }
- catch(const DatagramLimitException&) // Expected.
- {
- continue;
- }
- catch(const SocketException& ex)
- {
- exception(ex);
- }
- catch(const LocalException& ex)
- {
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "datagram connection exception:\n" << ex << '\n' << _desc;
- }
- continue;
- }
- else
- {
- exception(ex);
- }
- }
-
- Byte compress = 0;
- Int requestId = 0;
- Int invokeNum = 0;
- ServantManagerPtr servantManager;
- ObjectAdapterPtr adapter;
- OutgoingAsyncPtr outAsync;
-
- auto_ptr<LocalException> localEx;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_state == StateHolding)
- {
- wait();
- }
-
- if(_state != StateClosed)
- {
- parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
- }
-
- //
- // parseMessage() can close the connection, so we must
- // check for closed state again.
- //
- if(_state == StateClosed)
- {
- if(_sendInProgress)
- {
- _selectorThread->unregister(_transceiver->fd());
- }
-
- //
- // Prevent further writes.
- //
- _transceiver->shutdownWrite();
-
- //
- // We must make sure that nobody is sending before closing the transceiver.
- //
- while(_sendInProgress)
- {
- wait();
- }
-
- try
- {
- _transceiver->close();
- }
- catch(const LocalException& ex)
- {
- localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
- }
- _transceiver = 0;
- notifyAll();
-
- //
- // We cannot simply return here. We have to make sure that all requests (regular and
- // async) are notified about the closed connection below.
- //
- closed = true;
- }
- }
-
- //
- // Asynchronous replies must be handled outside the thread
- // synchronization, so that nested calls are possible.
- //
- if(outAsync)
- {
- outAsync->__finished(stream);
- }
-
- //
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
- //
- invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
-
- if(closed)
- {
- // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid.
- for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o)
- {
- o->finished(*_exception.get());
- }
- _queuedStreams.clear();
-
- for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
- }
- _requests.clear();
-
- for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
- {
- q->second->__finished(*_exception.get()); // The exception is immutable at this point.
- }
- _asyncRequests.clear();
- }
-
- if(localEx.get())
- {
- assert(closed);
- localEx->ice_throw();
- }
- }
-}
-
-Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) :
- _connection(connection)
-{
-}
-
-void
-Ice::ConnectionI::ThreadPerConnection::run()
-{
- if(_connection->_instance->initializationData().threadHook)
- {
- _connection->_instance->initializationData().threadHook->start();
- }
-
- try
- {
- _connection->run();
- }
- catch(const std::exception& ex)
- {
- Error out(_connection->_logger);
- out << "exception in thread per connection:\n" << _connection->toString() << ex.what();
- }
- catch(...)
- {
- Error out(_connection->_logger);
- out << "unknown exception in thread per connection:\n" << _connection->toString();
- }
-
- if(_connection->_instance->initializationData().threadHook)
- {
- _connection->_instance->initializationData().threadHook->stop();
- }
-
- _connection = 0; // Resolve cyclic dependency.
-}