diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 2569 |
1 files changed, 1578 insertions, 991 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index cdf3dce6b80..ccc6d5236e7 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -34,195 +34,226 @@ using namespace IceInternal; Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; } void -Ice::ConnectionI::validate() +Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) { - bool active = false; + if(adopted) + { + if(str) + { + delete stream; + stream = 0; + adopted = false; + } + else + { + return; // Stream is already adopted. + } + } + else if(!str) + { + if(out || outAsync) + { + return; // Adopting request stream is not necessary. + } + else + { + str = stream; // Adopt this stream + stream = 0; + } + } - if(!_endpoint->datagram()) // Datagram connections are always implicitly validated. + assert(str); + stream = new BasicStream(str->instance()); + stream->swap(*str); + adopted = true; +} + +void +Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify) +{ + if(out) + { + out->sent(notify); // true = notify the waiting thread that the request was sent. + } + else if(outAsync) + { + outAsync->__sent(connection); + } + + if(adopted) + { + delete stream; + stream = 0; + } +} + +void +Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) +{ + if(!response) + { + // + // Only notify oneway requests. The connection keeps track of twoway + // requests in the _requests/_asyncRequests maps and will notify them + // of the connection exceptions. + // + if(out) + { + out->finished(ex); + } + else if(outAsync) + { + outAsync->__finished(ex); + } + } + + if(adopted) + { + delete stream; + stream = 0; + } +} + +void +Ice::ConnectionI::start(const StartCallbackPtr& callback) +{ + try { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl()) - { - // - // In thread per connection mode, this connection's thread - // will take care of connection validation. Therefore all we - // have to do here is to wait until this thread has completed - // validation. - // - while(_state == StateNotValidated) - { - wait(); - } - - if(_state >= StateClosing) - { - assert(_exception.get()); - _exception->ice_throw(); - } - - return; - } - + _startCallback = callback; + // - // The connection might already be closed (e.g.: the communicator - // was destroyed or object adapter deactivated.) + // The connection might already be closed if the communicator was destroyed. // - assert(_state == StateNotValidated || _state == StateClosed); if(_state == StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - - if(_adapter) - { - active = true; // The server side has the active role for connection validation. - } - else - { - active = false; // The client side has the passive role for connection validation. - } - } - - try - { - Int timeout; - if(_instance->defaultsAndOverrides()->overrideConnectTimeout) - { - timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint->timeout(); - } - - if(active) - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - BasicStream os(_instance.get()); - os.write(magic[0]); - os.write(magic[1]); - os.write(magic[2]); - os.write(magic[3]); - os.write(protocolMajor); - os.write(protocolMinor); - os.write(encodingMajor); - os.write(encodingMinor); - os.write(validateConnectionMsg); - os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). - os.write(headerSize); // Message size. - os.i = os.b.begin(); - traceHeader("sending validate connection", os, _logger, _traceLevels); - try - { - _transceiver->initialize(timeout); - _transceiver->write(os, timeout); - } - catch(const TimeoutException&) - { - throw ConnectTimeoutException(__FILE__, __LINE__); - } - } - else + // + // In thread per connection mode, we create the thread for the connection. The + // intialization and validation of the connection is taken care of by the thread + // per connection. If a callback is given, no need to wait, the thread will notify + // the callback, otherwise wait until the connection is validated. + // + if(_threadPerConnection) { - BasicStream is(_instance.get()); - is.b.resize(headerSize); - is.i = is.b.begin(); try { - _transceiver->initialize(timeout); - _transceiver->read(is, timeout); + _thread = new ThreadPerConnection(this); + _thread->start(_threadPerConnectionStackSize); } - catch(const TimeoutException&) + catch(const IceUtil::Exception& ex) { - throw ConnectTimeoutException(__FILE__, __LINE__); - } - assert(is.i == is.b.end()); - is.i = is.b.begin(); - Byte m[4]; - is.read(m[0]); - is.read(m[1]); - is.read(m[2]); - is.read(m[3]); - if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) - { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); - throw ex; - } - Byte pMajor; - Byte pMinor; - is.read(pMajor); - is.read(pMinor); - if(pMajor != protocolMajor) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - Byte eMajor; - Byte eMinor; - is.read(eMajor); - is.read(eMinor); - if(eMajor != encodingMajor) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } - Byte messageType; - is.read(messageType); - if(messageType != validateConnectionMsg) - { - throw ConnectionNotValidatedException(__FILE__, __LINE__); + { + Error out(_logger); + out << "cannot create thread for connection:\n" << ex; + } + + // + // Clean up. + // + _thread = 0; + _state = StateClosed; + + ex.ice_throw(); } - Byte compress; - is.read(compress); // Ignore compression status for validate connection. - Int size; - is.read(size); - if(size != headerSize) + + if(!callback) // Wait for the connection to be validated. { - throw IllegalMessageSizeException(__FILE__, __LINE__); + while(_state <= StateNotValidated) + { + wait(); + } + + if(_state >= StateClosing) + { + assert(_exception.get()); + _exception->ice_throw(); + } } - traceHeader("received validate connection", is, _logger, _traceLevels); + return; // We're done. } } - catch(const LocalException& ex) + + SocketStatus status = initialize(); + if(status == Finished) + { + status = validate(); + } + + if(status == Finished) + { + finishStart(); + return; // We're done! + } + + assert(callback); + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state == StateClosed) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); assert(_exception.get()); _exception->ice_throw(); } - } - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + int timeout; + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideConnectTimeout) + { + timeout = defaultsAndOverrides->overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint->timeout(); + } - if(_acmTimeout > 0) + _sendInProgress = true; + _selectorThread->_register(_transceiver->fd(), this, status, timeout); + return; + } + catch(const Ice::LocalException& ex) + { { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + + // + // If start is called with a callback, the callback is notified either by the + // thread per conncetion or the thread pool. + // + if(callback) + { + if(!_threadPerConnection) + { + registerWithPool(); + unregisterWithPool(); // Let finished do the close. + } + return; + } + + // + // Close the transceiver if there's no thread per connection, otherwise, the + // thread per connection takes care of it. + // + if(!_thread && _transceiver) + { + try + { + _transceiver->close(); + } + catch(const Ice::LocalException&) + { + // Here we ignore any exceptions in close(). + } + _transceiver = 0; + } } - // - // We start out in holding state. - // - setState(StateHolding); + waitUntilFinished(); + throw; } } @@ -230,10 +261,14 @@ void Ice::ConnectionI::activate() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state <= StateNotValidated) + { + return; + } - while(_state == StateNotValidated) + if(_acmTimeout > 0) { - wait(); + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } setState(StateActive); @@ -243,10 +278,9 @@ void Ice::ConnectionI::hold() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateNotValidated) + if(_state <= StateNotValidated) { - wait(); + return; } setState(StateHolding); @@ -255,20 +289,35 @@ Ice::ConnectionI::hold() void Ice::ConnectionI::destroy(DestructionReason reason) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - switch(reason) + bool send = false; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + switch(reason) + { + case ObjectAdapterDeactivated: + { + send = setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + break; + } + + case CommunicatorDestroyed: + { + send = setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); + break; + } + } + } + + if(send) // Send the close connection message { - case ObjectAdapterDeactivated: + try { - setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - break; + finishSendMessage(); } - - case CommunicatorDestroyed: + catch(const Ice::LocalException&) { - setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); - break; + // Ignore. } } } @@ -276,32 +325,47 @@ Ice::ConnectionI::destroy(DestructionReason reason) void Ice::ConnectionI::close(bool force) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(force) + bool send = false; { - setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(force) + { + setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + } + else + { + // + // If we do a graceful shutdown, then we wait until all + // outstanding requests have been completed. Otherwise, the + // CloseConnectionException will cause all outstanding + // requests to be retried, regardless of whether the server + // has processed them or not. + // + while(!_requests.empty() || !_asyncRequests.empty()) + { + wait(); + } + + send = setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + } } - else + + if(send) // Send the close connection message { - // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, the - // CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the server - // has processed them or not. - // - while(!_requests.empty() || !_asyncRequests.empty()) + try { - wait(); + finishSendMessage(); + } + catch(const Ice::LocalException&) + { + // Ignore. } - - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } bool -Ice::ConnectionI::isDestroyed() const +Ice::ConnectionI::isActiveOrHolding() const { // // We can not use trylock here, otherwise the outgoing connection @@ -310,7 +374,7 @@ Ice::ConnectionI::isDestroyed() const // IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _state >= StateClosing; + return _state > StateNotValidated && _state < StateClosing; } bool @@ -460,56 +524,55 @@ Ice::ConnectionI::waitUntilFinished() void Ice::ConnectionI::monitor() { - IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); - - if(!sync.acquired()) + bool send = false; { - return; - } + IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); + if(!sync.acquired()) + { + return; + } - if(_state != StateActive) - { - return; - } + if(_state != StateActive) + { + return; + } - // - // Check for timed out async requests. - // - for(map<Int, AsyncRequest>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) - { - if(p->second.t > IceUtil::Time() && p->second.t <= IceUtil::Time::now(IceUtil::Time::Monotonic)) + // + // Active connection management for idle connections. + // + if(_acmTimeout <= 0 || + !_requests.empty() || !_asyncRequests.empty() || + _batchStreamInUse || !_batchStream.b.empty() || + _sendInProgress || _dispatchCount > 0) { - setState(StateClosed, TimeoutException(__FILE__, __LINE__)); return; } + + if(IceUtil::Time::now(IceUtil::Time::Monotonic) >= _acmAbsoluteTimeout) + { + send = setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + } } - - // - // Active connection management for idle connections. - // - if(_acmTimeout > 0 && - _requests.empty() && _asyncRequests.empty() && - !_batchStreamInUse && _batchStream.b.empty() && - _dispatchCount == 0) + + if(send) { - if(IceUtil::Time::now(IceUtil::Time::Monotonic) >= _acmAbsoluteTimeout) + try + { + finishSendMessage(); + } + catch(const Ice::LocalException&) { - setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); - return; } } } -void -Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress) +bool +Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) { - Int requestId; - + BasicStream* os = out->os(); + bool send = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams. - if(_exception.get()) { // @@ -523,10 +586,8 @@ Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress) assert(_state > StateNotValidated); assert(_state < StateClosing); - // - // Only add to the request map if this is a twoway call. - // - if(out) + Int requestId; + if(response) { // // Create a new unique request ID. @@ -547,134 +608,65 @@ Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress) #else copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif - - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); } - - if(_acmTimeout > 0) + + // + // Send the message. If it can't be sent without blocking the message is added + // to _sendStreams and it will be sent by the selector thread or by this thread + // if flush is true. + // + try { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + OutgoingMessage message(out, os, compress, response); + send = sendMessage(message); } - } - - try - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? + catch(const LocalException& ex) { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); } - - if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes. + + if(response) { // - // Message compressed. Request compressed response, if any. - // - os->b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(*os, cstream); - - // - // Send the request. + // Add to the requests map. // - os->i = os->b.begin(); - traceRequest("sending request", *os, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); } - else - { - if(compress) - { - // - // Message not compressed. Request compressed response, if any. - // - os->b[9] = 1; - } - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); -#else - copy(p, p + sizeof(Int), os->b.begin() + 10); -#endif - - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending request", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); + if(!send) + { + return !_sendInProgress && _queuedStreams.empty(); // The request was sent if it's not queued! } } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - if(out) + if(send) + { + try { - // - // If the request has already been removed from the - // request map, we are out of luck. It would mean that - // finished() has been called already, and therefore the - // exception has been set using the Outgoing::finished() - // callback. In this case, we cannot throw the exception - // here, because we must not both raise an exception and - // have Outgoing::finished() called with an - // exception. This means that in some rare cases, a - // request will not be retried even though it could. But I - // honestly don't know how I could avoid this, without a - // very elaborate and complex design, which would be bad - // for performance. - // - map<Int, Outgoing*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) - { - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - - _exception->ice_throw(); - } + finishSendMessage(); } - else + catch(const Ice::LocalException&) { - _exception->ice_throw(); + assert(_exception.get()); + if(!response) // Twoway calls are notified through finished() + { + throw; + } } } + return true; // The request was sent. } void -Ice::ConnectionI::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPtr& out, bool compress) +Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) { - Int requestId; + BasicStream* os = out->__getOs(); + bool send = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway. - if(_exception.get()) { // @@ -687,140 +679,66 @@ Ice::ConnectionI::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPtr& out, assert(_state > StateNotValidated); assert(_state < StateClosing); - - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) + + Int requestId; + if(response) { - _nextRequestId = 1; + // + // Create a new unique request ID. + // requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); #else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); + copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif - - // - // Add to the async requests map. - // - struct AsyncRequest asyncRequest; - asyncRequest.p = out; - if(_endpoint->timeout() > 0) - { - asyncRequest.t = - IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::milliSeconds(_endpoint->timeout()); } - _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, AsyncRequest>(requestId, asyncRequest)); - - if(_acmTimeout > 0) + + try { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + OutgoingMessage message(out, os, compress, response); + send = sendMessage(message); } - } - - try - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? + catch(const LocalException& ex) { + setState(StateClosed, ex); assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. + _exception->ice_throw(); } - if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes. - { - // - // Message compressed. Request compressed response, if any. - // - os->b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(*os, cstream); - - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending asynchronous request", *os, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else + if(response) { - if(compress) - { - // - // Message not compressed. Request compressed response, if any. - // - os->b[9] = 1; - } - - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); -#else - copy(p, p + sizeof(Int), os->b.begin() + 10); -#endif - // - // Send the request. + // Add to the async requests map. // - os->i = os->b.begin(); - traceRequest("sending asynchronous request", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), + pair<const Int, OutgoingAsyncPtr>(requestId, out)); } } - catch(const LocalException& ex) + + if(send) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - - // - // If the request has already been removed from the async - // request map, we are out of luck. It would mean that - // finished() has been called already, and therefore the - // exception has been set using the - // OutgoingAsync::__finished() callback. In this case, we - // cannot throw the exception here, because we must not both - // raise an exception and have OutgoingAsync::__finished() - // called with an exception. This means that in some rare - // cases, a request will not be retried even though it - // could. But I honestly don't know how I could avoid this, - // without a very elaborate and complex design, which would be - // bad for performance. - // - map<Int, AsyncRequest>::iterator p = _asyncRequests.find(requestId); - if(p != _asyncRequests.end()) + try { - if(p == _asyncRequestsHint) - { - _asyncRequests.erase(p++); - _asyncRequestsHint = p; - } - else + finishSendMessage(); + } + catch(const Ice::LocalException&) + { + assert(_exception.get()); + if(!response) // Twoway calls are notified through finished(). { - _asyncRequests.erase(p); - } - - _exception->ice_throw(); + throw; + } } } } @@ -872,9 +790,8 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) void Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) { - bool autoflush = false; - vector<Ice::Byte> lastRequest; - + bool send = false; + try { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -882,16 +799,15 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // Get the batch stream back. // _batchStream.swap(*os); - + + if(_exception.get()) + { + _exception->ice_throw(); + } + + bool flush = false; if(_batchAutoFlush) { - IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_transceiver) - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - // // Throw memory limit exception if the first message added causes us to // go over limit. Otherwise put aside the marshalled message that caused @@ -903,122 +819,152 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) } catch(const Ice::Exception&) { - if(_batchRequestNum == 0) + if(_batchRequestNum > 0) + { + flush = true; + } + else { - resetBatch(true); throw; } - vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest); - _batchStream.b.resize(_batchMarker); - autoflush = true; } } - if(!autoflush) + if(flush) { // - // Increment the number of requests in the batch. + // Temporarily save the last request. + // + vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()); + _batchStream.b.resize(_batchMarker); + + // + // Send the batch stream without the last request. + // + try + { + // + // Fill in the number of requests in the batch. + // + const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); +#endif + + OutgoingMessage message(&_batchStream, _batchRequestCompress); + send = sendMessage(message); + if(send) + { + // + // If the request can't be sent immediately and this is a foreground send, + // we adopt the stream to be able to re-use _batchStream immediately. + // + assert(!_sendStreams.empty()); + _sendStreams.back().adopt(0); + } + } + catch(const Ice::LocalException& ex) + { + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); + } + + // + // Reset the batch. // - ++_batchRequestNum; + BasicStream dummy(_instance.get(), _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchRequestCompress = false; + _batchMarker = 0; // - // We compress the whole batch if there is at least one compressed - // message. + // Check again if the last request doesn't exceed what we can send with the auto flush // - if(compress) + if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax()) { - _batchRequestCompress = true; + throw MemoryLimitException(__FILE__, __LINE__); } - + // - // Notify about the batch stream not being in use anymore. + // Start a new batch with the last message that caused us to go over the limit. // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); + _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); } - } - - if(autoflush) - { - // - // We have to keep _batchStreamInUse set until after we insert the - // saved marshalled data into a new stream. - // - flushBatchRequestsInternal(true); - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // - // Throw memory limit exception if the message that caused us to go over - // limit causes us to exceed the limit by itself. + // Increment the number of requests in the batch. // - if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax()) - { - resetBatch(true); - throw MemoryLimitException(__FILE__, __LINE__); - } - + ++_batchRequestNum; + // - // Start a new batch with the last message that caused us to - // go over the limit. + // We compress the whole batch if there is at least one compressed + // message. // - try - { - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - ex.ice_throw(); - } - if(compress) { _batchRequestCompress = true; } - + // - // Notify that the batch stream not in use anymore. + // Notify about the batch stream not being in use anymore. // - ++_batchRequestNum; + assert(_batchStreamInUse); _batchStreamInUse = false; notifyAll(); } + catch(const Ice::LocalException&) + { + abortBatchRequest(); + if(send) + { + finishSendMessage(); // Let exceptions go through to report auto-flush failures to the caller. + } + throw; + } + + if(send) + { + finishSendMessage(); // Let exceptions go through to report auto-flush failures to the caller. + } } void Ice::ConnectionI::abortBatchRequest() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Reset the batch stream. We cannot save old requests - // in the batch stream, as they might be corrupted due to - // incomplete marshaling. - // - resetBatch(true); + + BasicStream dummy(_instance.get(), _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchRequestCompress = false; + _batchMarker = 0; + + assert(_batchStreamInUse); + _batchStreamInUse = false; + notifyAll(); } void Ice::ConnectionI::flushBatchRequests() { - flushBatchRequestsInternal(false); + BatchOutgoing out(this, _instance.get()); + out.invoke(); } -void -Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse) +bool +Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) { + bool send = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(!ignoreInUse) + while(_batchStreamInUse && !_exception.get()) { - while(_batchStreamInUse && !_exception.get()) - { - wait(); - } + wait(); } if(_exception.get()) @@ -1026,36 +972,78 @@ Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse) _exception->ice_throw(); } - if(_batchStream.b.empty()) + if(_batchRequestNum == 0) { - return; // Nothing to do. + return true; } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - _batchStream.i = _batchStream.b.begin(); + // + // Fill in the number of requests in the batch. + // + const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); +#endif + _batchStream.swap(*out->os()); - if(_acmTimeout > 0) + // + // Send the batch stream. + // + try { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + OutgoingMessage message(out, out->os(), _batchRequestCompress, false); + send = sendMessage(message); + } + catch(const Ice::LocalException& ex) + { + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); } // - // Prevent that new batch requests are added while we are - // flushing. + // Reset the batch stream. // - _batchStreamInUse = true; + BasicStream dummy(_instance.get(), _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchRequestCompress = false; + _batchMarker = 0; + + if(!send) + { + return !_sendInProgress && _queuedStreams.empty(); // The request was sent if it's not queued! + } } - - try + + if(send) + { + finishSendMessage(); + } + return true; +} + +void +Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) +{ + bool send = false; { - IceUtil::Mutex::Lock sendSync(_sendMutex); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + while(_batchStreamInUse && !_exception.get()) + { + wait(); + } + + if(_exception.get()) + { + _exception->ice_throw(); + } - if(!_transceiver) // Has the transceiver already been closed? + if(_batchRequestNum == 0) { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. + return; } // @@ -1067,195 +1055,123 @@ Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - - if(_batchRequestCompress && _batchStream.b.size() >= 100) // Only compress messages larger than 100 bytes. + _batchStream.swap(*outAsync->__getOs()); + + // + // Send the batch stream. + // + try { - // - // Message compressed. Request compressed response, if any. - // - _batchStream.b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(_batchStream, cstream); - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); + OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false); + send = sendMessage(message); } - else + catch(const Ice::LocalException& ex) { - if(_batchRequestCompress) - { - // - // Message not compressed. Request compressed response, if any. - // - _batchStream.b[9] = 1; - } - - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(_batchStream.b.size()); - const Byte* q = reinterpret_cast<const Byte*>(&sz); -#ifdef ICE_BIG_ENDIAN - reverse_copy(q, q + sizeof(Int), _batchStream.b.begin() + 10); -#else - copy(q, q + sizeof(Int), _batchStream.b.begin() + 10); -#endif - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver->write(_batchStream, _endpoint->timeout()); + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); } - } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); // - // Since batch requests are all oneways (or datagrams), we - // must report the exception to the caller. + // Reset the batch stream. // - _exception->ice_throw(); + BasicStream dummy(_instance.get(), _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchRequestCompress = false; + _batchMarker = 0; } + if(send) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Reset the batch stream, and notify that flushing is over. - // - resetBatch(!ignoreInUse); - } -} - -void -Ice::ConnectionI::resetBatch(bool resetInUse) -{ - BasicStream dummy(_instance.get(), _batchAutoFlush); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - - // - // Notify about the batch stream not being in use - // anymore. - // - if(resetInUse) - { - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + finishSendMessage(); } } void Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) { - try + bool send = false; { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state > StateNotValidated); - // - // Only compress if compression was requested by the client, - // and if the message is larger than 100 bytes. - // - if(compressFlag > 0 && os->b.size() >= 100) + try { - // - // Message compressed. Request compressed response, if any. - // - os->b[9] = 2; + if(--_dispatchCount == 0) + { + notifyAll(); + } - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(*os, cstream); + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } - // - // Send the reply. - // - os->i = os->b.begin(); - traceReply("sending reply", *os, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { - if(compressFlag > 0) + OutgoingMessage message(os, compressFlag > 0); + send = sendMessage(message); + + if(_state == StateClosing && _dispatchCount == 0) { - // - // Message not compressed. Request compressed response, if any. - // - os->b[9] = 1; + send = initiateShutdown(send); } - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); -#else - copy(p, p + sizeof(Int), os->b.begin() + 10); -#endif - - // - // Send the reply. - // - os->i = os->b.begin(); - traceReply("sending reply", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + + IceUtil::Time::seconds(_acmTimeout); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); } } - catch(const LocalException& ex) + + if(send) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); + try + { + finishSendMessage(); + } + catch(Ice::LocalException&) + { + // Ignore. + } } +} +void +Ice::ConnectionI::sendNoResponse() +{ + bool send = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) { notifyAll(); } - + + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } + if(_state == StateClosing && _dispatchCount == 0) { - initiateShutdown(); + send = initiateShutdown(false); } - + if(_acmTimeout > 0) { - _acmAbsoluteTimeout = - IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + + IceUtil::Time::seconds(_acmTimeout); } } catch(const LocalException& ex) @@ -1263,31 +1179,18 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) setState(StateClosed, ex); } } -} -void -Ice::ConnectionI::sendNoResponse() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(_state > StateNotValidated); - - try + if(send) { - if(--_dispatchCount == 0) + try { - notifyAll(); + finishSendMessage(); } - - if(_state == StateClosing && _dispatchCount == 0) + catch(Ice::LocalException&) { - initiateShutdown(); + // Ignore. } } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } } EndpointIPtr @@ -1307,12 +1210,15 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_exception.get()) + if(_state == StateClosing || _state == StateClosed) { + assert(_exception.get()); _exception->ice_throw(); } - - assert(_state < StateClosing); + else if(_state <= StateNotValidated) + { + return; + } _adapter = adapter; @@ -1370,12 +1276,12 @@ Ice::ConnectionI::readable() const return true; } -void +bool Ice::ConnectionI::read(BasicStream& stream) { assert(!_threadPerConnection); // Only for use with a thread pool. - _transceiver->read(stream, 0); + return _transceiver->read(stream, 0); // // Updating _acmAbsoluteTimeout is too expensive here, because we @@ -1446,9 +1352,6 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) threadPool->promoteFollower(); auto_ptr<LocalException> localEx; - - map<Int, Outgoing*> requests; - map<Int, AsyncRequest> asyncRequests; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1456,49 +1359,48 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) --_finishedCount; assert(threadPool.get() == _threadPool.get()); - if(_finishedCount == 0 && _state == StateClosed) + if(_finishedCount > 0 || _state != StateClosed || _sendInProgress) { - _threadPool->decFdsInUse(); + return; + } - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); + _threadPool->decFdsInUse(); + _selectorThread->decFdsInUse(); - try - { - _transceiver->close(); - } - catch(const LocalException& ex) - { - localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - } - - _transceiver = 0; - notifyAll(); + try + { + _transceiver->close(); } - - if(_state == StateClosed || _state == StateClosing) + catch(const LocalException& ex) { - requests.swap(_requests); - _requestsHint = _requests.end(); - - asyncRequests.swap(_asyncRequests); - _asyncRequestsHint = _asyncRequests.end(); + localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); } + + _transceiver = 0; + notifyAll(); } - for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) + finishStart(*_exception.get()); + + // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid. + for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o) { - p->second->finished(*_exception.get()); // The exception is immutable at this point. + o->finished(*_exception.get()); } + _queuedStreams.clear(); - for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) + for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) { - q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. + p->second->finished(*_exception.get()); // The exception is immutable at this point. } + _requests.clear(); + for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + { + q->second->__finished(*_exception.get()); // The exception is immutable at this point. + } + _asyncRequests.clear(); + if(localEx.get()) { localEx->ice_throw(); @@ -1554,6 +1456,139 @@ Ice::ConnectionI::toString() const } // +// Operations from SocketReadyCallback +// +SocketStatus +Ice::ConnectionI::socketReady(bool finished) +{ + if(!finished) + { + try + { + // + // First, we check if there's something to send. If that's the case, the connection + // must be active and the only thing to do is send the queued streams. + // + if(!_sendStreams.empty()) + { + if(!send(0)) + { + return NeedWrite; + } + assert(_sendStreams.empty()); + } + else + { + // + // If there's nothing to send, we're still validating the connection. + // + int state; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state == StateClosed || _state <= StateNotValidated); + + state = _state; + + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } + } + + if(state == StateNotInitialized) + { + SocketStatus status = initialize(); + if(status != Finished) + { + return status; + } + } + + if(state <= StateNotValidated) + { + SocketStatus status = validate(); + if(status != Finished) + { + return status; + } + } + + finishStart(); + } + } + catch(const Ice::LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + } + } + + // + // If there's no more data to send or if connection validation is finished, we checkout + // the connection state to figure out whether or not it's time to unregister with the + // selector thread. + // + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_sendInProgress); + if(_state == StateClosed) + { + assert(!_startCallback || (!_threadPerConnection && !_registeredWithPool)); + + _queuedStreams.insert(_queuedStreams.begin(), _sendStreams.begin(), _sendStreams.end()); + _sendStreams.clear(); + _sendInProgress = false; + + if(_threadPerConnection) + { + _transceiver->shutdownReadWrite(); + } + else + { + registerWithPool(); + unregisterWithPool(); // Let finished() do the close. + } + notifyAll(); + return Finished; + } + else if(_waitingForSend > 0) // If there's synchronous calls waiting to be sent, unregister. + { + _sendInProgress = false; + notifyAll(); + return Finished; + } + else if(_queuedStreams.empty()) + { + _sendInProgress = false; + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + } + return Finished; + } + else + { + _sendStreams.swap(_queuedStreams); + return NeedWrite; // We're not finished yet, there's more data to send! + } +} + +void +Ice::ConnectionI::socketTimeout() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state <= StateNotValidated) + { + setState(StateClosed, ConnectTimeoutException(__FILE__, __LINE__)); + } + else if(_state <= StateClosing) + { + setState(StateClosed, TimeoutException(__FILE__, __LINE__)); + } +} + +// // Only used by the SSL plug-in. // // The external party has to synchronize the connection, since the @@ -1596,8 +1631,10 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _batchRequestNum(0), _batchRequestCompress(false), _batchMarker(0), + _sendInProgress(false), + _waitingForSend(0), _dispatchCount(0), - _state(StateNotValidated), + _state(StateNotInitialized), _stateTime(IceUtil::Time::now(IceUtil::Time::Monotonic)) { Int& acmTimeout = const_cast<Int&>(_acmTimeout); @@ -1635,17 +1672,17 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _servantManager = adapterImpl->getServantManager(); } - if(!threadPerConnection) + __setNoDelete(true); + try { - // - // Only set _threadPool if we really need it, i.e., if we are - // not in thread per connection mode. Thread pools have lazy - // initialization in Instance, and we don't want them to be - // created if they are not needed. - // - __setNoDelete(true); - try + if(!threadPerConnection) { + // + // Only set _threadPool if we really need it, i.e., if we are + // not in thread per connection mode. Thread pools have lazy + // initialization in Instance, and we don't want them to be + // created if they are not needed. + // if(adapterImpl) { const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool(); @@ -1656,76 +1693,34 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, } _threadPool->incFdsInUse(); } - catch(const IceUtil::Exception& ex) - { - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - __setNoDelete(false); - ex.ice_throw(); - } + // + // Only set selector thread if we really need it. + // + const_cast<SelectorThreadPtr&>(_selectorThread) = _instance->selectorThread(); + _selectorThread->incFdsInUse(); + } + catch(const IceUtil::Exception&) + { __setNoDelete(false); + throw; } + __setNoDelete(false); } Ice::ConnectionI::~ConnectionI() { + assert(!_startCallback); assert(_state == StateClosed); assert(!_transceiver); assert(_dispatchCount == 0); assert(!_thread); + assert(_queuedStreams.empty()); + assert(_requests.empty()); + assert(_asyncRequests.empty()); } -void -Ice::ConnectionI::start() -{ - // - // If we are in thread per connection mode, create the thread for this connection. - // We can't start the thread in the constructor because it can cause a race condition - // (see bug 1718). - // - if(_threadPerConnection) - { - try - { - _thread = new ThreadPerConnection(this); - _thread->start(_threadPerConnectionStackSize); - } - catch(const IceUtil::Exception& ex) - { - { - Error out(_logger); - out << "cannot create thread for connection:\n" << ex; - } - - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - // - // Clean up. - // - _transceiver = 0; - _thread = 0; - _state = StateClosed; - - ex.ice_throw(); - } - } -} - -void +bool Ice::ConnectionI::setState(State state, const LocalException& ex) { // @@ -1736,7 +1731,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) if(_state == state) // Don't switch twice. { - return; + return false; } if(!_exception.get()) @@ -1777,10 +1772,10 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) // exceptions. Otherwise new requests may retry on a connection // that is not yet marked as closed or closing. // - setState(state); + return setState(state); } -void +bool Ice::ConnectionI::setState(State state) { // @@ -1795,135 +1790,122 @@ Ice::ConnectionI::setState(State state) // // Skip graceful shutdown if we are destroyed before validation. // - if(_state == StateNotValidated && state == StateClosing) + if(_state <= StateNotValidated && state == StateClosing) { state = StateClosed; } if(_state == state) // Don't switch twice. { - return; + return false; } switch(state) { - case StateNotValidated: + case StateNotInitialized: + { + assert(false); + break; + } + + case StateNotValidated: + { + if(_state != StateNotInitialized) { - assert(false); - break; + assert(_state == StateClosed); + return false; } + break; + } - case StateActive: + case StateActive: + { + // + // Can only switch from holding or not validated to + // active. + // + if(_state != StateHolding && _state != StateNotValidated) { - // - // Can only switch from holding or not validated to - // active. - // - if(_state != StateHolding && _state != StateNotValidated) - { - return; - } - if(!_threadPerConnection) - { - registerWithPool(); - } - break; + return false; + } + if(!_threadPerConnection) + { + registerWithPool(); + } + break; + } + + case StateHolding: + { + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return false; + } + if(!_threadPerConnection) + { + unregisterWithPool(); + } + break; + } + + case StateClosing: + { + // + // Can't change back from closed. + // + if(_state == StateClosed) + { + return false; } + if(!_threadPerConnection) + { + registerWithPool(); // We need to continue to read in closing state. + } + break; + } - case StateHolding: + case StateClosed: + { + if(_sendInProgress) { // - // Can only switch from active or not validated to - // holding. + // Unregister with both the pool and the selector thread. We unregister with + // the pool to ensure that it stops reading on the socket (otherwise, if the + // socket is closed the thread pool would spin always reading 0 from the FD). + // The selector thread will register again the FD with the pool once it's + // done. // - if(_state != StateActive && _state != StateNotValidated) - { - return; - } + _selectorThread->unregister(_transceiver->fd()); if(!_threadPerConnection) { unregisterWithPool(); } - break; - } - case StateClosing: + _transceiver->shutdownWrite(); // Prevent further writes. + } + else if(_state <= StateNotValidated || _threadPerConnection) { // - // Can't change back from closed. + // If we are in thread per connection mode or we're initializing + // the connection in blocking mode, we shutdown both for reading + // and writing. This will unblock and read call with an exception. + // The thread per connection then closes the transceiver. // - if(_state == StateClosed) - { - return; - } - if(!_threadPerConnection) - { - registerWithPool(); // We need to continue to read in closing state. - } - break; + _transceiver->shutdownReadWrite(); } - - case StateClosed: + else { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we - // shutdown both for reading and writing. This will - // unblock and read call with an exception. The thread - // per connection then closes the transceiver. - // - _transceiver->shutdownReadWrite(); - } - else if(_state == StateNotValidated) - { - // - // If we change from not validated we can close right - // away. - // - assert(!_registeredWithPool); - - _threadPool->decFdsInUse(); - - // - // We must make sure that nobody is sending when we - // close the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = 0; - //notifyAll(); // We notify already below. - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); + registerWithPool(); + unregisterWithPool(); // Let finished() do the close. - // - // We must prevent any further writes when _state == StateClosed. - // However, functions such as sendResponse cannot acquire the main - // mutex in order to check _state. Therefore we shut down the write - // end of the transceiver, which causes subsequent write attempts - // to fail with an exception. - // - _transceiver->shutdownWrite(); - } - break; + _transceiver->shutdownWrite(); // Prevent further writes. } + break; + } } // @@ -1954,25 +1936,25 @@ Ice::ConnectionI::setState(State state) { try { - initiateShutdown(); + return initiateShutdown(false); } catch(const LocalException& ex) { setState(StateClosed, ex); } } + + return false; } -void -Ice::ConnectionI::initiateShutdown() const +bool +Ice::ConnectionI::initiateShutdown(bool queue) { assert(_state == StateClosing); assert(_dispatchCount == 0); if(!_endpoint->datagram()) { - IceUtil::Mutex::Lock sendSync(_sendMutex); - // // Before we shut down, we send a close connection message. // @@ -1989,12 +1971,9 @@ Ice::ConnectionI::initiateShutdown() const os.write((Byte)1); // Compression status: compression supported but not used. os.write(headerSize); // Message size. - // - // Send the message. - // - os.i = os.b.begin(); - traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver->write(os, _endpoint->timeout()); + OutgoingMessage message(&os, false); + return sendMessage(message, queue); + // // The CloseConnection message should be sufficient. Closing the write // end of the socket is probably an artifact of how things were done @@ -2005,6 +1984,610 @@ Ice::ConnectionI::initiateShutdown() const // //_transceiver->shutdownWrite(); } + + return false; +} + +SocketStatus +Ice::ConnectionI::initialize() +{ + int timeout = 0; + if(!_startCallback || _threadPerConnection) + { + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideConnectTimeout) + { + timeout = defaultsAndOverrides->overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint->timeout(); + } + } + + try + { + SocketStatus status = _transceiver->initialize(timeout); + if(status != Finished) + { + if(!_startCallback || _threadPerConnection) + { + throw TimeoutException(__FILE__, __LINE__); + } + return status; + } + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } + + // + // Update the connection description once the transceiver is initialized. + // + const_cast<string&>(_desc) = _transceiver->toString(); + + setState(StateNotValidated); + } + + return Finished; +} + +SocketStatus +Ice::ConnectionI::validate() +{ + if(!_endpoint->datagram()) // Datagram connections are always implicitly validated. + { + Int timeout = 0; + if(!_startCallback || _threadPerConnection) + { + if(_instance->defaultsAndOverrides()->overrideConnectTimeout) + { + timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint->timeout(); + } + } + + if(_adapter) // The server side has the active role for connection validation. + { + BasicStream& os = _stream; + if(os.b.empty()) + { + os.write(magic[0]); + os.write(magic[1]); + os.write(magic[2]); + os.write(magic[3]); + os.write(protocolMajor); + os.write(protocolMinor); + os.write(encodingMajor); + os.write(encodingMinor); + os.write(validateConnectionMsg); + os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). + os.write(headerSize); // Message size. + os.i = os.b.begin(); + traceSend(os, _logger, _traceLevels); + } + else + { + // The stream can only be non-empty if we're doing a non-blocking connection validation. + assert(_startCallback && !_threadPerConnection); + } + + try + { + if(!_transceiver->write(os, timeout)) + { + if(!_startCallback || _threadPerConnection) + { + throw TimeoutException(__FILE__, __LINE__); + } + return NeedWrite; + } + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } + } + else // The client side has the passive role for connection validation. + { + BasicStream& is = _stream; + if(is.b.empty()) + { + is.b.resize(headerSize); + is.i = is.b.begin(); + } + else + { + // The stream can only be non-empty if we're doing a non-blocking connection validation. + assert(_startCallback && !_threadPerConnection); + } + + try + { + if(!_transceiver->read(is, timeout)) + { + if(!_startCallback || _threadPerConnection) + { + throw TimeoutException(__FILE__, __LINE__); + } + return NeedRead; + } + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } + + assert(is.i == is.b.end()); + is.i = is.b.begin(); + Byte m[4]; + is.read(m[0]); + is.read(m[1]); + is.read(m[2]); + is.read(m[3]); + if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) + { + BadMagicException ex(__FILE__, __LINE__); + ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); + throw ex; + } + Byte pMajor; + Byte pMinor; + is.read(pMajor); + is.read(pMinor); + if(pMajor != protocolMajor) + { + UnsupportedProtocolException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(pMajor); + ex.badMinor = static_cast<unsigned char>(pMinor); + ex.major = static_cast<unsigned char>(protocolMajor); + ex.minor = static_cast<unsigned char>(protocolMinor); + throw ex; + } + Byte eMajor; + Byte eMinor; + is.read(eMajor); + is.read(eMinor); + if(eMajor != encodingMajor) + { + UnsupportedEncodingException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(eMajor); + ex.badMinor = static_cast<unsigned char>(eMinor); + ex.major = static_cast<unsigned char>(encodingMajor); + ex.minor = static_cast<unsigned char>(encodingMinor); + throw ex; + } + Byte messageType; + is.read(messageType); + if(messageType != validateConnectionMsg) + { + throw ConnectionNotValidatedException(__FILE__, __LINE__); + } + Byte compress; + is.read(compress); // Ignore compression status for validate connection. + Int size; + is.read(size); + if(size != headerSize) + { + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + traceRecv(is, _logger, _traceLevels); + } + } + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _stream.resize(0); + _stream.i = _stream.b.begin(); + + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } + + // + // We start out in holding state. + // + setState(StateHolding); + } + + return Finished; +} + +bool +Ice::ConnectionI::send(int timeout) +{ + assert(_transceiver); + assert(!_sendStreams.empty()); + + while(!_sendStreams.empty()) + { + OutgoingMessage* message = &_sendStreams.front(); + + // + // Prepare the message stream for writing if necessary. + // + if(!message->stream->i) + { + message->stream->i = message->stream->b.begin(); + if(message->compress && message->stream->b.size() >= 100) // Only compress messages larger than 100 bytes. + { + // + // Message compressed. Request compressed response, if any. + // + message->stream->b[9] = 2; + + // + // Do compression. + // + BasicStream stream(_instance.get()); + doCompress(*message->stream, stream); + + if(message->outAsync) + { + trace("sending asynchronous request", *message->stream, _logger, _traceLevels); + } + else + { + traceSend(*message->stream, _logger, _traceLevels); + } + + message->adopt(&stream); // Adopt the compressed stream. + message->stream->i = message->stream->b.begin(); + } + else + { + if(message->compress) + { + // + // Message not compressed. Request compressed response, if any. + // + message->stream->b[9] = 1; + } + + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(message->stream->b.size()); + const Byte* p = reinterpret_cast<const Byte*>(&sz); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), message->stream->b.begin() + 10); +#else + copy(p, p + sizeof(Int), message->stream->b.begin() + 10); +#endif + message->stream->i = message->stream->b.begin(); + + if(message->outAsync) + { + trace("sending asynchronous request", *message->stream, _logger, _traceLevels); + } + else + { + traceSend(*message->stream, _logger, _traceLevels); + } + } + } + + // + // Send the first message. + // + assert(message->stream->i); + if(!_transceiver->write(*message->stream, timeout)) + { + assert(timeout == 0); + return false; + } + + // + // Notify the message that it was sent. + // + message->sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread. + _sendStreams.pop_front(); + } + + return true; +} + +bool +Ice::ConnectionI::sendMessage(OutgoingMessage& message, bool queue) +{ + assert(_state != StateClosed); + + // + // TODO: Remove support for foreground send? If set to true, messages are sent + // by the calling thread. Foreground send might still be useful for transports + // that don't support non-blocking send. + // + bool foreground = false; + + message.stream->i = 0; // Reset the message stream iterator before starting sending the message. + + // + // If another thread is currently sending messages, we queue the message in _queuedStreams + // if we're not required to send the message in the foreground. If we're required to send + // the request in the foreground we wait until no more threads send messages. + // + if(_sendInProgress) + { + if(!foreground) + { + _queuedStreams.push_back(message); + _queuedStreams.back().adopt(0); + return false; + } + else if(queue) + { + // + // Add the message to _sendStreams if requested, this is useful for sendResponse() to + // send the close connection message after sending the response. + // + _sendStreams.push_back(message); + return true; // The calling thread must send the messages by calling finishSendMessage() + } + else + { + ++_waitingForSend; + while(_sendInProgress) + { + wait(); + } + --_waitingForSend; + + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } + } + } + + assert(!_sendInProgress); + + // + // Attempt to send the message without blocking. If the send blocks, we register + // the connection with the selector thread or we request the caller to call + // finishSendMessage() outside the synchronization. + // + + message.stream->i = message.stream->b.begin(); + + if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes. + { + // + // Message compressed. Request compressed response, if any. + // + message.stream->b[9] = 2; + + // + // Do compression. + // + BasicStream stream(_instance.get()); + doCompress(*message.stream, stream); + stream.i = stream.b.begin(); + + if(message.outAsync) + { + trace("sending asynchronous request", *message.stream, _logger, _traceLevels); + } + else + { + traceSend(*message.stream, _logger, _traceLevels); + } + + // + // Send the message without blocking. + // + if(!foreground && _transceiver->write(stream, 0)) + { + message.sent(this, false); + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = + IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + } + return false; + } + + _sendStreams.push_back(message); + _sendStreams.back().adopt(&stream); + } + else + { + if(message.compress) + { + // + // Message not compressed. Request compressed response, if any. + // + message.stream->b[9] = 1; + } + + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(message.stream->b.size()); + const Byte* p = reinterpret_cast<const Byte*>(&sz); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), message.stream->b.begin() + 10); +#else + copy(p, p + sizeof(Int), message.stream->b.begin() + 10); +#endif + message.stream->i = message.stream->b.begin(); + + if(message.outAsync) + { + trace("sending asynchronous request", *message.stream, _logger, _traceLevels); + } + else + { + traceSend(*message.stream, _logger, _traceLevels); + } + + // + // Send the message without blocking. + // + if(!foreground && _transceiver->write(*message.stream, 0)) + { + message.sent(this, false); + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = + IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + } + return false; + } + + _sendStreams.push_back(message); + if(!foreground) + { + _sendStreams.back().adopt(0); + } + } + + _sendInProgress = true; + if(!foreground) + { + _selectorThread->_register(_transceiver->fd(), this, NeedWrite, _endpoint->timeout()); + return false; // The selector thread will send the message. + } + else + { + return true; // The calling thread must send the message by calling finishSendMessage() + } +} + +void +Ice::ConnectionI::finishSendMessage() +{ + try + { + // + // Send the send messages with a blocking write(). + // + send(_endpoint->timeout()); + } + catch(const Ice::LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + + for(deque<OutgoingMessage>::const_iterator p = _sendStreams.begin(); p != _sendStreams.end(); ++p) + { + if(p->adopted) + { + delete p->stream; + } + } + _sendStreams.clear(); + _sendInProgress = false; + + if(_threadPerConnection) + { + _transceiver->shutdownReadWrite(); + } + else + { + registerWithPool(); + unregisterWithPool(); // Let finished() do the close. + } + + notifyAll(); + + assert(_exception.get()); + _exception->ice_throw(); + } + + + // + // Clear the _sendInProgress flag and notify waiting threads that we're not + // sending anymore data. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_sendStreams.empty()); + + if(_state == StateClosed) + { + _sendInProgress = false; + if(_threadPerConnection) + { + _transceiver->shutdownReadWrite(); + } + else + { + registerWithPool(); + unregisterWithPool(); // Let finished() do the close. + } + notifyAll(); + } + else if(_waitingForSend > 0) + { + _sendInProgress = false; + notifyAll(); + } + else if(_queuedStreams.empty()) + { + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + } + _sendInProgress = false; + } + else + { + _selectorThread->_register(_transceiver->fd(), this, NeedWrite, _endpoint->timeout()); + } +} + +void +Ice::ConnectionI::finishStart() +{ + // + // We set _startCallback to null to break potential cyclic reference count + // and because the destructor checks for it to ensure that we always invoke + // on the callback. + // + + StartCallbackPtr callback; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + swap(callback, _startCallback); + } + if(callback) + { + callback->connectionStartCompleted(this); + } +} + +void +Ice::ConnectionI::finishStart(const Ice::LocalException& ex) +{ + // + // We set _startCallback to null to break potential cyclic reference count + // and because the destructor checks for it to ensure that we always invoke + // on the callback. + // + + StartCallbackPtr callback; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + swap(callback, _startCallback); + } + if(callback) + { + callback->connectionStartFailed(this, ex); + } } void @@ -2213,7 +2796,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { case closeConnectionMsg: { - traceHeader("received close connection", stream, _logger, _traceLevels); + traceRecv(stream, _logger, _traceLevels); if(_endpoint->datagram()) { if(_warn) @@ -2233,13 +2816,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { if(_state == StateClosing) { - traceRequest("received request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); + trace("received request during closing\n(ignored by server, client will retry)", stream, _logger, + _traceLevels); } else { - traceRequest("received request", stream, _logger, _traceLevels); + traceRecv(stream, _logger, _traceLevels); stream.read(requestId); invokeNum = 1; servantManager = _servantManager; @@ -2253,13 +2835,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { if(_state == StateClosing) { - traceBatchRequest("received batch request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); + trace("received batch request during closing\n(ignored by server, client will retry)", stream, + _logger, _traceLevels); } else { - traceBatchRequest("received batch request", stream, _logger, _traceLevels); + traceRecv(stream, _logger, _traceLevels); stream.read(invokeNum); if(invokeNum < 0) { @@ -2275,12 +2856,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case replyMsg: { - traceReply("received reply", stream, _logger, _traceLevels); + traceRecv(stream, _logger, _traceLevels); stream.read(requestId); map<Int, Outgoing*>::iterator p = _requests.end(); - map<Int, AsyncRequest>::iterator q = _asyncRequests.end(); + map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end(); if(_requestsHint != _requests.end()) { @@ -2334,7 +2915,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { assert(q != _asyncRequests.end()); - outAsync = q->second.p; + outAsync = q->second; if(q == _asyncRequestsHint) { @@ -2352,7 +2933,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case validateConnectionMsg: { - traceHeader("received validate connection", stream, _logger, _traceLevels); + traceRecv(stream, _logger, _traceLevels); if(_warn) { Warning out(_logger); @@ -2363,9 +2944,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request default: { - traceHeader("received unknown message\n" - "(invalid, closing connection)", - stream, _logger, _traceLevels); + trace("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels); throw UnknownMessageException(__FILE__, __LINE__); break; } @@ -2448,30 +3027,26 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B void Ice::ConnectionI::run() { - // - // For non-datagram connections, the thread-per-connection must - // validate and activate this connection, and not in the - // connection factory. Please see the comments in the connection - // factory for details. - // - if(!_endpoint->datagram()) + try + { + // + // Initialize the connection transceiver and validate the connection using + // blocking operations. + // + SocketStatus status; + + status = initialize(); + assert(status == Finished); + + status = validate(); + assert(status == Finished); + } + catch(const LocalException& ex) { - try - { - validate(); - } - catch(const LocalException&) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(_state == StateClosed); - - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - + setState(StateClosed, ex); + if(_transceiver) { try @@ -2482,15 +3057,17 @@ Ice::ConnectionI::run() { // Here we ignore any exceptions in close(). } - + _transceiver = 0; } notifyAll(); - return; } - - activate(); + + finishStart(ex); + return; } + + finishStart(); const bool warnUdp = _instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0; @@ -2615,9 +3192,6 @@ Ice::ConnectionI::run() OutgoingAsyncPtr outAsync; auto_ptr<LocalException> localEx; - - map<Int, Outgoing*> requests; - map<Int, AsyncRequest> asyncRequests; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -2638,12 +3212,24 @@ Ice::ConnectionI::run() // if(_state == StateClosed) { + if(_sendInProgress) + { + _selectorThread->unregister(_transceiver->fd()); + } + // - // We must make sure that nobody is sending when we close - // the transceiver. + // Prevent further writes. // - IceUtil::Mutex::Lock sendSync(_sendMutex); - + _transceiver->shutdownWrite(); + + // + // We must make sure that nobody is sending before closing the transceiver. + // + while(_sendInProgress) + { + wait(); + } + try { _transceiver->close(); @@ -2652,26 +3238,15 @@ Ice::ConnectionI::run() { localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); } - _transceiver = 0; notifyAll(); // - // We cannot simply return here. We have to make sure - // that all requests (regular and async) are notified - // about the closed connection below. + // We cannot simply return here. We have to make sure that all requests (regular and + // async) are notified about the closed connection below. // closed = true; } - - if(_state == StateClosed || _state == StateClosing) - { - requests.swap(_requests); - _requestsHint = _requests.end(); - - asyncRequests.swap(_asyncRequests); - _asyncRequestsHint = _asyncRequests.end(); - } } // @@ -2690,21 +3265,33 @@ Ice::ConnectionI::run() // invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); - for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) - { - p->second->finished(*_exception.get()); // The exception is immutable at this point. - } - - for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) + if(closed) { - q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. - } + // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid. + for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o) + { + o->finished(*_exception.get()); + } + _queuedStreams.clear(); + + for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + _requests.clear(); + for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + { + q->second->__finished(*_exception.get()); // The exception is immutable at this point. + } + _asyncRequests.clear(); + } + if(localEx.get()) { assert(closed); localEx->ice_throw(); - } + } } } |