diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 1202 |
1 files changed, 310 insertions, 892 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 39226868d6c..c896d190569 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -16,6 +16,7 @@ #include <Ice/DefaultsAndOverrides.h> #include <Ice/Transceiver.h> #include <Ice/ThreadPool.h> +#include <Ice/SelectorThread.h> #include <Ice/ConnectionMonitor.h> #include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager(). #include <Ice/EndpointI.h> @@ -31,33 +32,33 @@ using namespace std; using namespace Ice; using namespace IceInternal; -namespace +Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; } + +namespace IceInternal { -class CallFinished : public ThreadPoolWorkItem +class FlushSentCallbacks : public ThreadPoolWorkItem { public: - CallFinished(const Ice::ConnectionIPtr& connection) : _connection(connection) + FlushSentCallbacks(const Ice::ConnectionIPtr& connection) : _connection(connection) { } - virtual void + void execute(const ThreadPoolPtr& threadPool) { - _connection->finished(threadPool); + threadPool->promoteFollower(); + _connection->flushSentCallbacks(); } private: - ConnectionIPtr _connection; + const Ice::ConnectionIPtr _connection; }; } - -Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; } - void Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) { @@ -118,7 +119,7 @@ Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) if(!response) { // - // Only notify oneway requests. The connection keeps track of twoway + // Only notify oneway requests. The connection keeps track of twoway // requests in the _requests/_asyncRequests maps and will notify them // of the connection exceptions. // @@ -144,60 +145,21 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) { try { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state == StateClosed) // The connection might already be closed if the communicator was destroyed. { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _startCallback = callback; - - // - // The connection might already be closed if the communicator was destroyed. - // - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } + assert(_exception.get()); + _exception->ice_throw(); } - if(_threadPerConnection) + SocketStatus status = initialize(); + if(status == Finished) { - // - // In thread per connection mode, we create the thread for the connection. The - // intialization and validation of the connection is taken care of by the thread - // per connection. - // - try - { - _thread = new ThreadPerConnection(this); - _thread->start(_threadPerConnectionStackSize); - } - catch(const IceUtil::Exception& ex) - { - { - Error out(_logger); - out << "cannot create thread for connection:\n" << ex; - } - - // - // Clean up. - // - _thread = 0; - ex.ice_throw(); - } + status = validate(); } - else - { - SocketStatus status = initialize(0); - if(status == Finished) - { - status = validate(0); - } - - if(status == Finished) - { - finishStart(); - return; // We're done! - } + if(status != Finished) + { int timeout; DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); if(defaultsAndOverrides->overrideConnectTimeout) @@ -208,25 +170,24 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) { timeout = _endpoint->timeout(); } - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } + _sendInProgress = true; _selectorThread->_register(_transceiver->fd(), this, status, timeout); - } - - if(!callback) // Wait for the connection to be validated. - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(callback) + { + _startCallback = callback; + return; + } + + // + // Wait for the connection to be validated. + // while(_state <= StateNotValidated) { wait(); } - + if(_state >= StateClosing) { assert(_exception.get()); @@ -236,16 +197,22 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) } catch(const Ice::LocalException& ex) { + exception(ex); + if(callback) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - if(callback) - { - return; - } + callback->connectionStartFailed(this, *_exception.get()); + return; } - waitUntilFinished(); - throw; + else + { + waitUntilFinished(); + throw; + } + } + + if(callback) + { + callback->connectionStartCompleted(this); } } @@ -282,7 +249,7 @@ void Ice::ConnectionI::destroy(DestructionReason reason) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + switch(reason) { case ObjectAdapterDeactivated: @@ -290,7 +257,7 @@ Ice::ConnectionI::destroy(DestructionReason reason) setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); break; } - + case CommunicatorDestroyed: { setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); @@ -303,7 +270,7 @@ void Ice::ConnectionI::close(bool force) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + if(force) { setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); @@ -321,7 +288,7 @@ Ice::ConnectionI::close(bool force) { wait(); } - + setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } @@ -342,42 +309,24 @@ Ice::ConnectionI::isActiveOrHolding() const bool Ice::ConnectionI::isFinished() const { - IceUtil::ThreadPtr threadPerConnection; + // + // We can use trylock here, because as long as there are still + // threads operating in this connection object, connection + // destruction is considered as not yet finished. + // + IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); + if(!sync.acquired()) { - // - // We can use trylock here, because as long as there are still - // threads operating in this connection object, connection - // destruction is considered as not yet finished. - // - IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); - - if(!sync.acquired()) - { - return false; - } - - if(_transceiver || _dispatchCount != 0) - { - return false; - } - - if(_thread && _thread->isAlive()) - { - return false; - } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = 0; + return false; } - if(threadPerConnection) + if(_transceiver || _dispatchCount != 0) { - threadPerConnection->getThreadControl().join(); + return false; } + assert(_state == StateClosed); return true; } @@ -407,80 +356,68 @@ Ice::ConnectionI::waitUntilHolding() const void Ice::ConnectionI::waitUntilFinished() { - IceUtil::ThreadPtr threadPerConnection; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) - { - wait(); - } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver) + wait(); + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver) + { + if(_state != StateClosed && _endpoint->timeout() >= 0) { - if(_state != StateClosed && _endpoint->timeout() >= 0) + IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); + IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(IceUtil::Time::Monotonic); + + if(waitTime > IceUtil::Time()) { - IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); - IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(IceUtil::Time::Monotonic); - - if(waitTime > IceUtil::Time()) - { - // - // We must wait a bit longer until we close this - // connection. - // - if(!timedWait(waitTime)) - { - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - } - } - else + // + // We must wait a bit longer until we close this + // connection. + // + if(!timedWait(waitTime)) { - // - // We already waited long enough, so let's close this - // connection! - // setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); } - - // - // No return here, we must still wait until close() is - // called on the _transceiver. - // } else { - wait(); + // + // We already waited long enough, so let's close this + // connection! + // + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); } - } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = 0; - // - // Clear the OA. See bug 1673 for the details of why this is necessary. - // - _adapter = 0; + // + // No return here, we must still wait until close() is + // called on the _transceiver. + // + } + else + { + wait(); + } } - if(threadPerConnection) - { - threadPerConnection->getThreadControl().join(); - } + assert(_state == StateClosed); + + // + // Clear the OA. See bug 1673 for the details of why this is necessary. + // + _adapter = 0; } void @@ -491,12 +428,12 @@ Ice::ConnectionI::monitor() { return; } - + if(_state != StateActive) { return; } - + // // Active connection management for idle connections. // @@ -507,7 +444,7 @@ Ice::ConnectionI::monitor() { return; } - + if(IceUtil::Time::now(IceUtil::Time::Monotonic) >= _acmAbsoluteTimeout) { setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); @@ -532,7 +469,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) assert(_state > StateNotValidated); assert(_state < StateClosing); - + Int requestId; if(response) { @@ -545,7 +482,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) _nextRequestId = 1; requestId = _nextRequestId++; } - + // // Fill in the request ID. // @@ -560,7 +497,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) // // Send the message. If it can't be sent without blocking the message is added // to _sendStreams and it will be sent by the selector thread. - // + // bool sent = false; try { @@ -585,7 +522,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) return sent; } -void +bool Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) { BasicStream* os = out->__getOs(); @@ -616,7 +553,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _nextRequestId = 1; requestId = _nextRequestId++; } - + // // Fill in the request ID. // @@ -628,10 +565,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } + bool sent; try { OutgoingMessage message(out, os, compress, response); - sendMessage(message); + sent = sendMessage(message); } catch(const LocalException& ex) { @@ -648,6 +586,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), pair<const Int, OutgoingAsyncPtr>(requestId, out)); } + return sent; } void @@ -705,17 +644,17 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // Get the batch stream back. // _batchStream.swap(*os); - + if(_exception.get()) { _exception->ice_throw(); } - + bool flush = false; if(_batchAutoFlush) { // - // Throw memory limit exception if the first message added causes us to + // Throw memory limit exception if the first message added causes us to // go over limit. Otherwise put aside the marshalled message that caused // limit to be exceeded and rollback stream to the marker. // @@ -743,7 +682,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()); _batchStream.b.resize(_batchMarker); - + // // Send the batch stream without the last request. // @@ -785,19 +724,19 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) { throw MemoryLimitException(__FILE__, __LINE__); } - + // // Start a new batch with the last message that caused us to go over the limit. // _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); + _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); } // // Increment the number of requests in the batch. // ++_batchRequestNum; - + // // We compress the whole batch if there is at least one compressed // message. @@ -806,7 +745,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) { _batchRequestCompress = true; } - + // // Notify about the batch stream not being in use anymore. // @@ -852,7 +791,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) { wait(); } - + if(_exception.get()) { _exception->ice_throw(); @@ -902,7 +841,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) return sent; } -void +bool Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -919,7 +858,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) if(_batchRequestNum == 0) { outAsync->__sent(this); - return; + return true; } // @@ -936,10 +875,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) // // Send the batch stream. // + bool sent; try { OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false); - sendMessage(message); + sent = sendMessage(message); } catch(const Ice::LocalException& ex) { @@ -956,6 +896,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; + return sent; } void @@ -976,7 +917,7 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) assert(_exception.get()); _exception->ice_throw(); } - + OutgoingMessage message(os, compressFlag > 0); sendMessage(message); @@ -987,7 +928,7 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) if(_acmTimeout > 0) { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } } @@ -1002,14 +943,14 @@ Ice::ConnectionI::sendNoResponse() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) { notifyAll(); } - + if(_state == StateClosed) { assert(_exception.get()); @@ -1020,10 +961,10 @@ Ice::ConnectionI::sendNoResponse() { initiateShutdown(); } - + if(_acmTimeout > 0) { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } } @@ -1039,12 +980,6 @@ Ice::ConnectionI::endpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } -bool -Ice::ConnectionI::threadPerConnection() const -{ - return _threadPerConnection; // No mutex protection necessary, _threadPerConnection is immutable. -} - void Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter) { @@ -1102,23 +1037,19 @@ Ice::ConnectionI::createProxy(const Identity& ident) const bool Ice::ConnectionI::datagram() const { - assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint->datagram(); // No mutex protection necessary, _endpoint is immutable. } bool Ice::ConnectionI::readable() const { - assert(!_threadPerConnection); // Only for use with a thread pool. return true; } bool Ice::ConnectionI::read(BasicStream& stream) { - assert(!_threadPerConnection); // Only for use with a thread pool. - - return _transceiver->read(stream, 0); + return _transceiver->read(stream); // // Updating _acmAbsoluteTimeout is too expensive here, because we @@ -1130,8 +1061,6 @@ Ice::ConnectionI::read(BasicStream& stream) void Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - Byte compress = 0; Int requestId = 0; Int invokeNum = 0; @@ -1147,7 +1076,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) // could be various race conditions with close connection // messages and other messages. // - threadPool->promoteFollower(); + threadPool->promoteFollower(this); if(_state != StateClosed) { @@ -1184,22 +1113,14 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) void Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - threadPool->promoteFollower(); auto_ptr<LocalException> localEx; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - --_finishedCount; - assert(threadPool.get() == _threadPool.get()); - if(_finishedCount > 0 || _state != StateClosed || _sendInProgress) - { - return; - } + assert(threadPool.get() == _threadPool.get() && _state == StateClosed && !_sendInProgress); _threadPool->decFdsInUse(); _selectorThread->decFdsInUse(); @@ -1212,19 +1133,25 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) { localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); } - + _transceiver = 0; notifyAll(); + + _flushSentCallbacks = 0; // Clear cyclic reference count. } - finishStart(*_exception.get()); + if(_startCallback) + { + _startCallback->connectionStartFailed(this, *_exception.get()); + _startCallback = 0; + } // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid. - for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o) + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { o->finished(*_exception.get()); } - _queuedStreams.clear(); + _sendStreams.clear(); for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) { @@ -1237,11 +1164,11 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) q->second->__finished(*_exception.get()); // The exception is immutable at this point. } _asyncRequests.clear(); - + if(localEx.get()) { localEx->ice_throw(); - } + } } void @@ -1294,12 +1221,21 @@ Ice::ConnectionI::toString() const // // Operations from SocketReadyCallback -// -SocketStatus -Ice::ConnectionI::socketReady(bool finished) +// +SocketStatus +Ice::ConnectionI::socketReady() { - if(!finished) + StartCallbackPtr callback; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_sendInProgress); + + if(_state == StateClosed) + { + return Finished; + } + try { // @@ -1308,7 +1244,7 @@ Ice::ConnectionI::socketReady(bool finished) // if(!_sendStreams.empty()) { - if(!send(0)) + if(!send()) { return NeedWrite; } @@ -1316,100 +1252,57 @@ Ice::ConnectionI::socketReady(bool finished) } else { - // - // If there's nothing to send, we're still validating the connection. - // - int state; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_state == StateClosed || _state <= StateNotValidated); - - state = _state; - - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } - } - - if(state == StateNotInitialized) + assert(_state == StateClosed || _state <= StateNotValidated); + if(_state == StateNotInitialized) { - SocketStatus status = initialize(0); + SocketStatus status = initialize(); if(status != Finished) { return status; } } - - if(state <= StateNotValidated) + + if(_state <= StateNotValidated) { - SocketStatus status = validate(0); + SocketStatus status = validate(); if(status != Finished) { return status; } } - - finishStart(); + + swap(_startCallback, callback); } } catch(const Ice::LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); + return Finished; } - } - // - // If there's no more data to send or if connection validation is finished, we checkout - // the connection state to figure out whether or not it's time to unregister with the - // selector thread. - // - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_sendInProgress); - if(_state == StateClosed) - { - assert(!_startCallback || (!_threadPerConnection && !_registeredWithPool)); - - _queuedStreams.insert(_queuedStreams.begin(), _sendStreams.begin(), _sendStreams.end()); - _sendStreams.clear(); - _sendInProgress = false; - - if(_threadPerConnection) - { - _transceiver->shutdownReadWrite(); - } - else - { - if(!_registeredWithPool) - { - _threadPool->execute(new CallFinished(this)); - ++_finishedCount; // For each unregistration, finished() is called once. - } - else - { - unregisterWithPool(); - } - } - notifyAll(); - return Finished; - } - else if(_queuedStreams.empty()) - { + assert(_sendStreams.empty()); + _selectorThread->unregister(this); _sendInProgress = false; if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } - return Finished; } - else + + if(callback) { - _sendStreams.swap(_queuedStreams); - return NeedWrite; // We're not finished yet, there's more data to send! + callback->connectionStartCompleted(this); } + return Finished; +} + +void +Ice::ConnectionI::socketFinished() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_sendInProgress && _state == StateClosed); + _sendInProgress = false; + _threadPool->finish(this); } void @@ -1441,12 +1334,8 @@ Ice::ConnectionI::getTransceiver() const Ice::ConnectionI::ConnectionI(const InstancePtr& instance, const TransceiverPtr& transceiver, const EndpointIPtr& endpoint, - const ObjectAdapterPtr& adapter, - bool threadPerConnection, - size_t threadPerConnectionStackSize) : - EventHandler(instance), - _threadPerConnection(threadPerConnection), - _threadPerConnectionStackSize(threadPerConnectionStackSize), + const ObjectAdapterPtr& adapter) : + EventHandler(instance, transceiver->fd()), _transceiver(transceiver), _desc(transceiver->toString()), _type(transceiver->type()), @@ -1454,8 +1343,6 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _adapter(adapter), _logger(_instance->initializationData().logger), // Cached for better performance. _traceLevels(_instance->traceLevels()), // Cached for better performance. - _registeredWithPool(false), - _finishedCount(0), _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), _acmTimeout(0), _compressionLevel(1), @@ -1512,30 +1399,20 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, __setNoDelete(true); try { - if(!threadPerConnection) + if(adapterImpl) { - // - // Only set _threadPool if we really need it, i.e., if we are - // not in thread per connection mode. Thread pools have lazy - // initialization in Instance, and we don't want them to be - // created if they are not needed. - // - if(adapterImpl) - { - const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool(); - } - else - { - const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); - } - _threadPool->incFdsInUse(); + const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool(); } + else + { + const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); + } + _threadPool->incFdsInUse(); - // - // Only set selector thread if we really need it. - // const_cast<SelectorThreadPtr&>(_selectorThread) = _instance->selectorThread(); _selectorThread->incFdsInUse(); + + _flushSentCallbacks = new FlushSentCallbacks(this); } catch(const IceUtil::Exception&) { @@ -1551,8 +1428,6 @@ Ice::ConnectionI::~ConnectionI() assert(_state == StateClosed); assert(!_transceiver); assert(_dispatchCount == 0); - assert(!_thread); - assert(_queuedStreams.empty()); assert(_requests.empty()); assert(_asyncRequests.empty()); } @@ -1636,7 +1511,7 @@ Ice::ConnectionI::setState(State state) { return; } - + switch(state) { case StateNotInitialized: @@ -1644,7 +1519,7 @@ Ice::ConnectionI::setState(State state) assert(false); break; } - + case StateNotValidated: { if(_state != StateNotInitialized) @@ -1665,13 +1540,10 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_threadPerConnection) - { - registerWithPool(); - } + _threadPool->_register(this); break; } - + case StateHolding: { // @@ -1682,10 +1554,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_threadPerConnection) - { - unregisterWithPool(); - } + _threadPool->unregister(this); break; } @@ -1698,13 +1567,10 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_threadPerConnection) - { - registerWithPool(); // We need to continue to read in closing state. - } + _threadPool->_register(this); // We need to continue to read in closing state. break; } - + case StateClosed: { if(_sendInProgress) @@ -1713,39 +1579,15 @@ Ice::ConnectionI::setState(State state) // Unregister with both the pool and the selector thread. We unregister with // the pool to ensure that it stops reading on the socket (otherwise, if the // socket is closed the thread pool would spin always reading 0 from the FD). - // The selector thread will register again the FD with the pool once it's + // The selector thread will register again the FD with the pool once it's // done. // - _selectorThread->unregister(_transceiver->fd()); - if(!_threadPerConnection) - { - unregisterWithPool(); - } - - _transceiver->shutdownWrite(); // Prevent further writes. - } - else if(_threadPerConnection) - { - // - // If we are in thread per connection mode or we're initializing - // the connection in blocking mode, we shutdown both for reading - // and writing. This will unblock and read call with an exception. - // The thread per connection then closes the transceiver. - // - _transceiver->shutdownReadWrite(); + _selectorThread->finish(this); + _threadPool->unregister(this); } else { - if(!_registeredWithPool) - { - _threadPool->execute(new CallFinished(this)); - ++_finishedCount; // For each unregistration, finished() is called once. - } - else - { - unregisterWithPool(); - } - _transceiver->shutdownWrite(); // Prevent further writes. + _threadPool->finish(this); } break; } @@ -1828,46 +1670,24 @@ Ice::ConnectionI::initiateShutdown() } SocketStatus -Ice::ConnectionI::initialize(int timeout) +Ice::ConnectionI::initialize() { - try + SocketStatus status = _transceiver->initialize(); + if(status != Finished) { - SocketStatus status = _transceiver->initialize(timeout); - if(status != Finished) - { - if(timeout != 0) - { - throw TimeoutException(__FILE__, __LINE__); - } - return status; - } + return status; } - catch(const TimeoutException&) - { - throw ConnectTimeoutException(__FILE__, __LINE__); - } - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } - - // - // Update the connection description once the transceiver is initialized. - // - const_cast<string&>(_desc) = _transceiver->toString(); - setState(StateNotValidated); - } - + // + // Update the connection description once the transceiver is initialized. + // + const_cast<string&>(_desc) = _transceiver->toString(); + setState(StateNotValidated); return Finished; } SocketStatus -Ice::ConnectionI::validate(int timeout) +Ice::ConnectionI::validate() { if(!_endpoint->datagram()) // Datagram connections are always implicitly validated. { @@ -1890,26 +1710,10 @@ Ice::ConnectionI::validate(int timeout) os.i = os.b.begin(); traceSend(os, _logger, _traceLevels); } - else - { - // The stream can only be non-empty if we're doing a non-blocking connection validation. - assert(!_threadPerConnection); - } - try - { - if(!_transceiver->write(os, timeout)) - { - if(timeout != 0) - { - throw TimeoutException(__FILE__, __LINE__); - } - return NeedWrite; - } - } - catch(const TimeoutException&) + if(!_transceiver->write(os)) { - throw ConnectTimeoutException(__FILE__, __LINE__); + return NeedWrite; } } else // The client side has the passive role for connection validation. @@ -1920,26 +1724,10 @@ Ice::ConnectionI::validate(int timeout) is.b.resize(headerSize); is.i = is.b.begin(); } - else - { - // The stream can only be non-empty if we're doing a non-blocking connection validation. - assert(!_threadPerConnection); - } - try - { - if(!_transceiver->read(is, timeout)) - { - if(timeout != 0) - { - throw TimeoutException(__FILE__, __LINE__); - } - return NeedRead; - } - } - catch(const TimeoutException&) + if(!_transceiver->read(is)) { - throw ConnectTimeoutException(__FILE__, __LINE__); + return NeedRead; } assert(is.i == is.b.end()); @@ -1999,32 +1787,24 @@ Ice::ConnectionI::validate(int timeout) } } - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _stream.resize(0); - _stream.i = _stream.b.begin(); - - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } - - // - // We start out in holding state. - // - setState(StateHolding); - } + _stream.resize(0); + _stream.i = _stream.b.begin(); + // + // We start out in holding state. + // + setState(StateHolding); return Finished; } bool -Ice::ConnectionI::send(int timeout) +Ice::ConnectionI::send() { assert(_transceiver); assert(!_sendStreams.empty()); - + + bool flushSentCallbacks = _sentCallbacks.empty(); + while(!_sendStreams.empty()) { OutgoingMessage* message = &_sendStreams.front(); @@ -2041,7 +1821,7 @@ Ice::ConnectionI::send(int timeout) // Message compressed. Request compressed response, if any. // message->stream->b[9] = 2; - + // // Do compression. // @@ -2069,7 +1849,7 @@ Ice::ConnectionI::send(int timeout) // message->stream->b[9] = 1; } - + // // No compression, just fill in the message size. // @@ -2097,22 +1877,48 @@ Ice::ConnectionI::send(int timeout) // Send the first message. // assert(message->stream->i); - if(!_transceiver->write(*message->stream, timeout)) + if(!_transceiver->write(*message->stream)) { - assert(timeout == 0); + if(flushSentCallbacks && !_sentCallbacks.empty()) + { + _threadPool->execute(_flushSentCallbacks); + } return false; } // // Notify the message that it was sent. // - message->sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread. + message->sent(this, true); + if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get())) + { + _sentCallbacks.push_back(message->outAsync); + } _sendStreams.pop_front(); } + if(flushSentCallbacks && !_sentCallbacks.empty()) + { + _threadPool->execute(_flushSentCallbacks); + } return true; } +void +Ice::ConnectionI::flushSentCallbacks() +{ + vector<OutgoingAsyncMessageCallbackPtr> callbacks; + { + Lock sync(*this); + assert(!_sentCallbacks.empty()); + _sentCallbacks.swap(callbacks); + } + for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->__sent(_instance); + } +} + bool Ice::ConnectionI::sendMessage(OutgoingMessage& message) { @@ -2120,35 +1926,29 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) message.stream->i = 0; // Reset the message stream iterator before starting sending the message. - // - // If another thread is currently sending messages, we queue the - // message in _queuedStreams. It will be picked up eventually by - // the selector thread once the messages from _sendStreams are all - // sent. - // if(_sendInProgress) - { - _queuedStreams.push_back(message); - _queuedStreams.back().adopt(0); + { + _sendStreams.push_back(message); + _sendStreams.back().adopt(0); return false; } assert(!_sendInProgress); // - // Attempt to send the message without blocking. If the send blocks, we register + // Attempt to send the message without blocking. If the send blocks, we register // the connection with the selector thread. // message.stream->i = message.stream->b.begin(); - + if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes. { // // Message compressed. Request compressed response, if any. // message.stream->b[9] = 2; - + // // Do compression. // @@ -2168,12 +1968,12 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Send the message without blocking. // - if(_transceiver->write(stream, 0)) + if(_transceiver->write(stream)) { message.sent(this, false); if(_acmTimeout > 0) { - _acmAbsoluteTimeout = + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } return true; @@ -2191,7 +1991,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // message.stream->b[9] = 1; } - + // // No compression, just fill in the message size. // @@ -2203,7 +2003,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) copy(p, p + sizeof(Int), message.stream->b.begin() + 10); #endif message.stream->i = message.stream->b.begin(); - + if(message.outAsync) { trace("sending asynchronous request", *message.stream, _logger, _traceLevels); @@ -2216,12 +2016,12 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Send the message without blocking. // - if(_transceiver->write(*message.stream, 0)) + if(_transceiver->write(*message.stream)) { message.sent(this, false); if(_acmTimeout > 0) { - _acmAbsoluteTimeout = + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } return true; @@ -2236,71 +2036,6 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) return false; } -void -Ice::ConnectionI::finishStart() -{ - // - // We set _startCallback to null to break potential cyclic reference count - // and because the destructor checks for it to ensure that we always invoke - // on the callback. - // - - StartCallbackPtr callback; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - swap(callback, _startCallback); - } - if(callback) - { - callback->connectionStartCompleted(this); - } -} - -void -Ice::ConnectionI::finishStart(const Ice::LocalException& ex) -{ - // - // We set _startCallback to null to break potential cyclic reference count - // and because the destructor checks for it to ensure that we always invoke - // on the callback. - // - - StartCallbackPtr callback; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - swap(callback, _startCallback); - } - if(callback) - { - callback->connectionStartFailed(this, ex); - } -} - -void -Ice::ConnectionI::registerWithPool() -{ - assert(!_threadPerConnection); // Only for use with a thread pool. - - if(!_registeredWithPool) - { - _threadPool->_register(_transceiver->fd(), this); - _registeredWithPool = true; - } -} - -void -Ice::ConnectionI::unregisterWithPool() -{ - assert(!_threadPerConnection); // Only for use with a thread pool. - - if(_registeredWithPool) - { - _threadPool->unregister(_transceiver->fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. - } -} - static string getBZ2Error(int bzError) { @@ -2385,7 +2120,7 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed) throw ex; } compressed.b.resize(headerSize + sizeof(Int) + compressedLen); - + // // Write the size of the compressed stream into the header of the // uncompressed stream. Since the header will be copied, this size @@ -2410,7 +2145,7 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed) #else copy(p, p + sizeof(Int), compressed.b.begin() + headerSize); #endif - + // // Copy the header from the uncompressed stream to the compressed one. // @@ -2452,18 +2187,18 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request OutgoingAsyncPtr& outAsync) { assert(_state > StateNotValidated && _state < StateClosed); - + if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } - + try { // // We don't need to check magic and version here. This has - // already been done by the ThreadPool or the - // ThreadPerConnection, which provides us with the stream. + // already been done by the ThreadPool, which provides us + // with the stream. // assert(stream.i == stream.b.end()); stream.i = stream.b.begin() + 8; @@ -2477,7 +2212,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request stream.b.swap(ustream.b); } stream.i = stream.b.begin() + headerSize; - + switch(messageType) { case closeConnectionMsg: @@ -2497,7 +2232,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } break; } - + case requestMsg: { if(_state == StateClosing) @@ -2516,12 +2251,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } break; } - + case requestBatchMsg: { if(_state == StateClosing) { - trace("received batch request during closing\n(ignored by server, client will retry)", stream, + trace("received batch request during closing\n(ignored by server, client will retry)", stream, _logger, _traceLevels); } else @@ -2539,16 +2274,16 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } break; } - + case replyMsg: { traceRecv(stream, _logger, _traceLevels); - + stream.read(requestId); - + map<Int, Outgoing*>::iterator p = _requests.end(); map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end(); - + if(_requestsHint != _requests.end()) { if(_requestsHint->first == requestId) @@ -2556,7 +2291,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request p = _requestsHint; } } - + if(p == _requests.end()) { if(_asyncRequestsHint != _asyncRequests.end()) @@ -2567,26 +2302,26 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } } } - + if(p == _requests.end() && q == _asyncRequests.end()) { p = _requests.find(requestId); } - + if(p == _requests.end() && q == _asyncRequests.end()) { q = _asyncRequests.find(requestId); } - + if(p == _requests.end() && q == _asyncRequests.end()) { throw UnknownRequestIdException(__FILE__, __LINE__); } - + if(p != _requests.end()) { p->second->finished(stream); - + if(p == _requestsHint) { _requests.erase(p++); @@ -2600,9 +2335,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request else { assert(q != _asyncRequests.end()); - + outAsync = q->second; - + if(q == _asyncRequestsHint) { _asyncRequests.erase(q++); @@ -2613,10 +2348,10 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request _asyncRequests.erase(q); } } - + break; } - + case validateConnectionMsg: { traceRecv(stream, _logger, _traceLevels); @@ -2627,7 +2362,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } break; } - + default: { trace("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels); @@ -2678,7 +2413,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B BasicStream* is = in.is(); stream.swap(*is); BasicStream* os = in.os(); - + // // Prepare the response if necessary. // @@ -2686,7 +2421,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B { assert(invokeNum == 1); // No further invocations if a response is expected. os->writeBlob(replyHdr, sizeof(replyHdr)); - + // // Add the request ID. // @@ -2694,7 +2429,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B } in.invoke(servantManager); - + // // If there are more invocations, we need the stream back. // @@ -2710,320 +2445,3 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B } } -void -Ice::ConnectionI::run() -{ - try - { - // - // Initialize the connection transceiver and validate the connection using - // blocking operations. - // - SocketStatus status; - - int timeout; - DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - if(defaultsAndOverrides->overrideConnectTimeout) - { - timeout = defaultsAndOverrides->overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint->timeout(); - } - - status = initialize(timeout); - assert(status == Finished); - - status = validate(timeout); - assert(status == Finished); - } - catch(const LocalException& ex) - { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - - if(_transceiver) - { - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = 0; - } - notifyAll(); - } - - finishStart(ex); - return; - } - - finishStart(); - - const bool warnUdp = _instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0; - - bool closed = false; - - while(!closed) - { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - - BasicStream stream(_instance.get()); - - try - { - stream.b.resize(headerSize); - stream.i = stream.b.begin(); - _transceiver->read(stream, -1); - - ptrdiff_t pos = stream.i - stream.b.begin(); - if(pos < headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - stream.i = stream.b.begin(); - const Byte* header; - stream.readBlob(header, headerSize); - if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3]) - { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic)); - throw ex; - } - if(header[4] != protocolMajor) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(header[4]); - ex.badMinor = static_cast<unsigned char>(header[5]); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - if(header[6] != encodingMajor) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(header[6]); - ex.badMinor = static_cast<unsigned char>(header[7]); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } - - Int size; - stream.i -= sizeof(Int); - stream.read(size); - if(size < headerSize) - { - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(_instance->messageSizeMax())) - { - throw MemoryLimitException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(stream.b.size())) - { - stream.b.resize(size); - } - stream.i = stream.b.begin() + pos; - - if(stream.i != stream.b.end()) - { - if(_endpoint->datagram()) - { - if(warnUdp) - { - Warning out(_logger); - out << "DatagramLimitException: maximum size of " << pos << " exceeded"; - } - throw DatagramLimitException(__FILE__, __LINE__); - } - else - { - _transceiver->read(stream, -1); - assert(stream.i == stream.b.end()); - } - } - } - catch(const DatagramLimitException&) // Expected. - { - continue; - } - catch(const SocketException& ex) - { - exception(ex); - } - catch(const LocalException& ex) - { - if(_endpoint->datagram()) - { - if(_warn) - { - Warning out(_logger); - out << "datagram connection exception:\n" << ex << '\n' << _desc; - } - continue; - } - else - { - exception(ex); - } - } - - Byte compress = 0; - Int requestId = 0; - Int invokeNum = 0; - ServantManagerPtr servantManager; - ObjectAdapterPtr adapter; - OutgoingAsyncPtr outAsync; - - auto_ptr<LocalException> localEx; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateHolding) - { - wait(); - } - - if(_state != StateClosed) - { - parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); - } - - // - // parseMessage() can close the connection, so we must - // check for closed state again. - // - if(_state == StateClosed) - { - if(_sendInProgress) - { - _selectorThread->unregister(_transceiver->fd()); - } - - // - // Prevent further writes. - // - _transceiver->shutdownWrite(); - - // - // We must make sure that nobody is sending before closing the transceiver. - // - while(_sendInProgress) - { - wait(); - } - - try - { - _transceiver->close(); - } - catch(const LocalException& ex) - { - localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - } - _transceiver = 0; - notifyAll(); - - // - // We cannot simply return here. We have to make sure that all requests (regular and - // async) are notified about the closed connection below. - // - closed = true; - } - } - - // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. - // - if(outAsync) - { - outAsync->__finished(stream); - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); - - if(closed) - { - // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid. - for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o) - { - o->finished(*_exception.get()); - } - _queuedStreams.clear(); - - for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->finished(*_exception.get()); // The exception is immutable at this point. - } - _requests.clear(); - - for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) - { - q->second->__finished(*_exception.get()); // The exception is immutable at this point. - } - _asyncRequests.clear(); - } - - if(localEx.get()) - { - assert(closed); - localEx->ice_throw(); - } - } -} - -Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) : - _connection(connection) -{ -} - -void -Ice::ConnectionI::ThreadPerConnection::run() -{ - if(_connection->_instance->initializationData().threadHook) - { - _connection->_instance->initializationData().threadHook->start(); - } - - try - { - _connection->run(); - } - catch(const std::exception& ex) - { - Error out(_connection->_logger); - out << "exception in thread per connection:\n" << _connection->toString() << ex.what(); - } - catch(...) - { - Error out(_connection->_logger); - out << "unknown exception in thread per connection:\n" << _connection->toString(); - } - - if(_connection->_instance->initializationData().threadHook) - { - _connection->_instance->initializationData().threadHook->stop(); - } - - _connection = 0; // Resolve cyclic dependency. -} |