diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 433 |
1 files changed, 274 insertions, 159 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index fbf9b2ea5f6..9618b1fe781 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1,6 +1,6 @@ // ********************************************************************** // -// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. @@ -130,6 +130,7 @@ ConnectionState connectionStateMap[] = { ConnectionStateActive, // StateActive ConnectionStateHolding, // StateHolding ConnectionStateClosing, // StateClosing + ConnectionStateClosing, // StateClosingPending ConnectionStateClosed, // StateClosed ConnectionStateClosed, // StateFinished }; @@ -159,46 +160,50 @@ Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0) } void -Ice::ConnectionI::Observer::startRead(Ice::Byte* i) +Ice::ConnectionI::Observer::startRead(const Buffer& buf) { if(_readStreamPos) { - _observer->receivedBytes(static_cast<int>(i - _readStreamPos)); + assert(!buf.b.empty()); + _observer->receivedBytes(static_cast<int>(buf.i - _readStreamPos)); } - _readStreamPos = i; + _readStreamPos = buf.b.empty() ? 0 : buf.i; } void -Ice::ConnectionI::Observer::finishRead(Ice::Byte* i) +Ice::ConnectionI::Observer::finishRead(const Buffer& buf) { if(_readStreamPos == 0) { return; } - assert(i >= _readStreamPos); - _observer->receivedBytes(static_cast<int>(i - _readStreamPos)); + assert(buf.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(buf.i - _readStreamPos)); _readStreamPos = 0; } void -Ice::ConnectionI::Observer::startWrite(Ice::Byte* i) +Ice::ConnectionI::Observer::startWrite(const Buffer& buf) { if(_writeStreamPos) { - _observer->sentBytes(static_cast<int>(i - _writeStreamPos)); + assert(!buf.b.empty()); + _observer->sentBytes(static_cast<int>(buf.i - _writeStreamPos)); } - _writeStreamPos = i; + _writeStreamPos = buf.b.empty() ? 0 : buf.i; } void -Ice::ConnectionI::Observer::finishWrite(Ice::Byte* i) +Ice::ConnectionI::Observer::finishWrite(const Buffer& buf) { if(_writeStreamPos == 0) { return; } - assert(i >= _writeStreamPos); - _observer->sentBytes(static_cast<int>(i - _writeStreamPos)); + if(buf.i > _writeStreamPos) + { + _observer->sentBytes(static_cast<int>(buf.i - _writeStreamPos)); + } _writeStreamPos = 0; } @@ -1251,7 +1256,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(_observer) { - _observer.startWrite(_writeStream.i); + _observer.startWrite(_writeStream); } if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) @@ -1264,7 +1269,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(_observer && !_readHeader) { - _observer.startRead(_readStream.i); + _observer.startRead(_readStream); } _transceiver->startRead(_readStream); @@ -1288,7 +1293,7 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) _transceiver->finishWrite(_writeStream); if(_observer) { - _observer.finishWrite(_writeStream.i); + _observer.finishWrite(_writeStream); } } else if(operation & SocketOperationRead) @@ -1296,7 +1301,7 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) _transceiver->finishRead(_readStream); if(_observer && !_readHeader) { - _observer.finishRead(_readStream.i); + _observer.finishRead(_readStream); } } } @@ -1339,38 +1344,43 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) try { unscheduleTimeout(current.operation); - if(current.operation & SocketOperationWrite && !_writeStream.b.empty()) + + SocketOperation readyOp = current.operation; + SocketOperation writeOp = SocketOperationNone; + SocketOperation readOp = SocketOperationNone; + if(readyOp & SocketOperationWrite) { - if(_writeStream.i != _writeStream.b.end()) + if(_observer) { - if(_observer) - { - _observer.startWrite(_writeStream.i); - } - - if(!_transceiver->write(_writeStream)) - { - assert(!_writeStream.b.empty()); - scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); - return; - } - - if(_observer) - { - _observer.finishWrite(_writeStream.i); - } + _observer.startWrite(_writeStream); + } + writeOp = _transceiver->write(_writeStream); + if(_observer && !(writeOp & SocketOperationWrite)) + { + _observer.finishWrite(_writeStream); } - assert(_writeStream.i == _writeStream.b.end()); } - if(current.operation & SocketOperationRead && !_readStream.b.empty()) + + while(readyOp & SocketOperationRead) { - if(_readHeader) // Read header if necessary. + if(_observer && !_readHeader) + { + _observer.startRead(_readStream); + } + + readOp = _transceiver->read(_readStream, _hasMoreData); + if(readOp & SocketOperationRead) + { + break; + } + if(_observer && !_readHeader) { - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) - { - return; - } assert(_readStream.i == _readStream.b.end()); + _observer.finishRead(_readStream); + } + + if(_readHeader) // Read header if necessary. + { _readHeader = false; if(_observer) @@ -1386,7 +1396,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // throw IllegalMessageSizeException(__FILE__, __LINE__); } - + _readStream.i = _readStream.b.begin(); const Byte* m; _readStream.readBlob(m, static_cast<Int>(sizeof(magic))); @@ -1402,7 +1412,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) EncodingVersion ev; _readStream.read(ev); checkSupportedProtocolEncoding(ev); - + Byte messageType; _readStream.read(messageType); Byte compress; @@ -1423,39 +1433,35 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } _readStream.i = _readStream.b.begin() + pos; } - + if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) { throw DatagramLimitException(__FILE__, __LINE__); // The message was truncated. } - else - { - if(_observer) - { - _observer.startRead(_readStream.i); - } - - if(!_transceiver->read(_readStream)) - { - assert(!_readStream.b.empty()); - scheduleTimeout(SocketOperationRead, _endpoint->timeout()); - return; - } - - if(_observer) - { - _observer.finishRead(_readStream.i); - } - assert(_readStream.i == _readStream.b.end()); - } + continue; } - + break; } - + + SocketOperation newOp = static_cast<SocketOperation>(readOp | writeOp); + readyOp = static_cast<SocketOperation>(readyOp & ~newOp); + assert(readyOp || newOp); + if(_state <= StateNotValidated) { + if(newOp) + { + // + // Wait for all the transceiver conditions to be + // satisfied before continuing. + // + scheduleTimeout(newOp); + _threadPool->update(this, current.operation, newOp); + return; + } + if(_state == StateNotInitialized && !initialize(current.operation)) { return; @@ -1476,22 +1482,39 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { - assert(_state <= StateClosing); + assert(_state <= StateClosingPending); // // We parse messages first, if we receive a close // connection message we won't send more messages. // - if(current.operation & SocketOperationRead) + if(readyOp & SocketOperationRead) { - parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); + newOp = static_cast<SocketOperation>(newOp | parseMessage(current.stream, + invokeNum, + requestId, + compress, + servantManager, + adapter, + outAsync)); } - if(current.operation & SocketOperationWrite) + if(readyOp & SocketOperationWrite) { - sendNextMessage(sentCBs); + newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs)); } - + + if(_state < StateClosed) + { + scheduleTimeout(newOp); + _threadPool->update(this, current.operation, newOp); + } + + if(!readyOp) + { + return; + } + // // We increment the dispatch count to prevent the // communicator destruction during the callback. @@ -1643,7 +1666,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback // callback was dispatched when the connection was already // in the closing state. // - if(_state == StateClosing && !_shutdownInitiated) + if(_state == StateClosing) { try { @@ -1821,7 +1844,7 @@ Ice::ConnectionI::timedOut() { setState(StateClosed, TimeoutException(__FILE__, __LINE__)); } - else if(_state == StateClosing) + else if(_state < StateClosed) { setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); } @@ -1896,7 +1919,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _reaper(reaper), _transceiver(transceiver), _desc(transceiver->toString()), - _type(transceiver->type()), + _type(transceiver->protocol()), _connector(connector), _endpoint(endpoint), _adapter(adapter), @@ -2032,7 +2055,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || - (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing))) { Warning out(_logger); out << "connection exception:\n" << *_exception.get() << '\n' << _desc; @@ -2125,18 +2148,15 @@ Ice::ConnectionI::setState(State state) } case StateClosing: + case StateClosingPending: { // - // Can't change back from closed. + // Can't change back from closing pending. // - if(_state >= StateClosed) + if(_state >= StateClosingPending) { return; } - if(_state == StateHolding) - { - _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state. - } break; } @@ -2146,6 +2166,7 @@ Ice::ConnectionI::setState(State state) { return; } + _threadPool->finish(this); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) _transceiver->close(); @@ -2206,7 +2227,7 @@ Ice::ConnectionI::setState(State state) dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || - (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing))) { _observer->failed(_exception->ice_name()); } @@ -2234,8 +2255,11 @@ Ice::ConnectionI::initiateShutdown() { assert(_state == StateClosing); assert(_dispatchCount == 0); - assert(!_shutdownInitiated); + if(_shutdownInitiated) + { + return; + } _shutdownInitiated = true; if(!_endpoint->datagram()) @@ -2257,33 +2281,28 @@ Ice::ConnectionI::initiateShutdown() OutgoingMessage message(&os, false); if(sendMessage(message) & AsyncStatusSent) { + setState(StateClosingPending); + // - // Schedule the close timeout to wait for the peer to close the connection. If - // the message was queued for sending, sendNextMessage will schedule the timeout - // once all messages were sent. - // - scheduleTimeout(SocketOperationWrite, closeTimeout()); + // Notify the the transceiver of the graceful connection closure. + // + SocketOperation op = _transceiver->closing(true, *_exception.get()); + if(op) + { + scheduleTimeout(op); + _threadPool->_register(this, op); + } } - - // - // The CloseConnection message should be sufficient. Closing the write - // end of the socket is probably an artifact of how things were done - // in IIOP. In fact, shutting down the write end of the socket causes - // problems on Windows by preventing the peer from using the socket. - // For example, the peer is no longer able to continue writing a large - // message after the socket is shutdown. - // - //_transceiver->shutdownWrite(); } } bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(_readStream, _writeStream); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData); if(s != SocketOperationNone) { - scheduleTimeout(s, connectTimeout()); + scheduleTimeout(s); _threadPool->update(this, operation, s); return false; } @@ -2321,19 +2340,23 @@ Ice::ConnectionI::validate(SocketOperation operation) if(_observer) { - _observer.startWrite(_writeStream.i); + _observer.startWrite(_writeStream); } - if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) + if(_writeStream.i != _writeStream.b.end()) { - scheduleTimeout(SocketOperationWrite, connectTimeout()); - _threadPool->update(this, operation, SocketOperationWrite); - return false; + SocketOperation op = _transceiver->write(_writeStream); + if(op) + { + scheduleTimeout(op); + _threadPool->update(this, operation, op); + return false; + } } if(_observer) { - _observer.finishWrite(_writeStream.i); + _observer.finishWrite(_writeStream); } } else // The client side has the passive role for connection validation. @@ -2346,19 +2369,23 @@ Ice::ConnectionI::validate(SocketOperation operation) if(_observer) { - _observer.startRead(_readStream.i); + _observer.startRead(_readStream); } - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) + if(_readStream.i != _readStream.b.end()) { - scheduleTimeout(SocketOperationRead, connectTimeout()); - _threadPool->update(this, operation, SocketOperationRead); - return false; + SocketOperation op = _transceiver->read(_readStream, _hasMoreData); + if(op) + { + scheduleTimeout(op); + _threadPool->update(this, operation, op); + return false; + } } if(_observer) { - _observer.finishRead(_readStream.i); + _observer.finishRead(_readStream); } assert(_readStream.i == _readStream.b.end()); @@ -2410,10 +2437,21 @@ Ice::ConnectionI::validate(SocketOperation operation) return true; } -void +SocketOperation Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks) { - assert(!_sendStreams.empty()); + if(_sendStreams.empty()) + { + return SocketOperationNone; + } + else if(_state == StateClosingPending && _writeStream.i == _writeStream.b.begin()) + { + // Message wasn't sent, empty the _writeStream, we're not going to send more data. + OutgoingMessage* message = &_sendStreams.front(); + _writeStream.swap(*message->stream); + return SocketOperationNone; + } + assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end()); try { @@ -2454,15 +2492,15 @@ Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks) } // - // If we are in the closed state, don't continue sending. + // If we are in the closed state or if the close is + // pending, don't continue sending. // - // The connection can be in the closed state if parseMessage - // (called before sendNextMessage by message()) closes the - // connection. + // This can occur if parseMessage (called before + // sendNextMessage by message()) closes the connection. // - if(_state >= StateClosed) + if(_state >= StateClosingPending) { - return; + return SocketOperationNone; } // @@ -2536,38 +2574,44 @@ Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks) // if(_observer) { - _observer.startWrite(_writeStream.i); + _observer.startWrite(_writeStream); } assert(_writeStream.i); - if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) + if(_writeStream.i != _writeStream.b.end()) { - assert(!_writeStream.b.empty()); - scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); - return; + SocketOperation op = _transceiver->write(_writeStream); + if(op) + { + return op; + } } if(_observer) { - _observer.finishWrite(_writeStream.i); + _observer.finishWrite(_writeStream); } } } catch(const Ice::LocalException& ex) { setState(StateClosed, ex); - return; + return SocketOperationNone; } - assert(_writeStream.b.empty()); - _threadPool->unregister(this, SocketOperationWrite); - // // If all the messages were sent and we are in the closing state, we schedule // the close timeout to wait for the peer to close the connection. // if(_state == StateClosing) { - scheduleTimeout(SocketOperationWrite, closeTimeout()); + setState(StateClosingPending); + SocketOperation op = _transceiver->closing(true, *_exception.get()); + if(op) + { + return op; + } } + + return SocketOperationNone; } AsyncStatus @@ -2590,6 +2634,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // message.stream->i = message.stream->b.begin(); + SocketOperation op; #ifndef ICE_OS_WINRT if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes. { @@ -2619,13 +2664,14 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // if(_observer) { - _observer.startWrite(stream.i); + _observer.startWrite(stream); } - if(_transceiver->write(stream)) + op = _transceiver->write(stream); + if(!op) { if(_observer) { - _observer.finishWrite(stream.i); + _observer.finishWrite(stream); } AsyncStatus status = AsyncStatusSent; @@ -2681,13 +2727,14 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // if(_observer) { - _observer.startWrite(message.stream->i); + _observer.startWrite(*message.stream); } - if(_transceiver->write(*message.stream)) + op = _transceiver->write(*message.stream); + if(!op) { if(_observer) { - _observer.finishWrite(message.stream->i); + _observer.finishWrite(*message.stream); } AsyncStatus status = AsyncStatusSent; if(message.sent(this, false)) @@ -2709,8 +2756,8 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) #endif _writeStream.swap(*_sendStreams.back().stream); - scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); - _threadPool->_register(this, SocketOperationWrite); + scheduleTimeout(op); + _threadPool->_register(this, op); return AsyncStatusQueued; } @@ -2861,7 +2908,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse } #endif -void +SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, OutgoingAsyncPtr& outAsync) @@ -2921,14 +2968,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } else { - setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); + setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__)); + + // + // Notify the the transceiver of the graceful connection closure. + // + SocketOperation op = _transceiver->closing(false, *_exception.get()); + if(op) + { + return op; + } + setState(StateClosed); } - break; } case requestMsg: { - if(_state == StateClosing) + if(_state >= StateClosing) { trace("received request during closing\n(ignored by server, client will retry)", stream, _logger, _traceLevels); @@ -2947,7 +3003,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case requestBatchMsg: { - if(_state == StateClosing) + if(_state >= StateClosing) { trace("received batch request during closing\n(ignored by server, client will retry)", stream, _logger, _traceLevels); @@ -3095,6 +3151,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request setState(StateClosed, ex); } } + + return _state == StateHolding ? SocketOperationNone : SocketOperationRead; } void @@ -3134,31 +3192,88 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B } } -int -Ice::ConnectionI::connectTimeout() +void +Ice::ConnectionI::scheduleTimeout(SocketOperation status) { - DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - if(defaultsAndOverrides->overrideConnectTimeout) + int timeout; + if(_state < StateActive) + { + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideConnectTimeout) + { + timeout = defaultsAndOverrides->overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint->timeout(); + } + } + else if(_state < StateClosingPending) { - return defaultsAndOverrides->overrideConnectTimeoutValue; + if(_readHeader) // No timeout for reading the header. + { + status = static_cast<SocketOperation>(status & ~SocketOperationRead); + } + timeout = _endpoint->timeout(); } else { - return _endpoint->timeout(); + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideCloseTimeout) + { + timeout = defaultsAndOverrides->overrideCloseTimeoutValue; + } + else + { + timeout = _endpoint->timeout(); + } + } + + if(timeout < 0) + { + return; + } + + try + { + if(status & IceInternal::SocketOperationRead) + { + if(_readTimeoutScheduled) + { + _timer->cancel(_readTimeout); + } + _timer->schedule(_readTimeout, IceUtil::Time::milliSeconds(timeout)); + _readTimeoutScheduled = true; + } + if(status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) + { + if(_writeTimeoutScheduled) + { + _timer->cancel(_writeTimeout); + } + _timer->schedule(_writeTimeout, IceUtil::Time::milliSeconds(timeout)); + _writeTimeoutScheduled = true; + } + } + catch(const IceUtil::Exception&) + { + assert(false); } } -int -Ice::ConnectionI::closeTimeout() +void +Ice::ConnectionI::unscheduleTimeout(SocketOperation status) { - DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - if(defaultsAndOverrides->overrideCloseTimeout) + if((status & IceInternal::SocketOperationRead) && _readTimeoutScheduled) { - return defaultsAndOverrides->overrideCloseTimeoutValue; + _timer->cancel(_readTimeout); + _readTimeoutScheduled = false; } - else + if((status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) && + _writeTimeoutScheduled) { - return _endpoint->timeout(); + _timer->cancel(_writeTimeout); + _writeTimeoutScheduled = false; } } |