summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp433
1 files changed, 274 insertions, 159 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index fbf9b2ea5f6..9618b1fe781 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1,6 +1,6 @@
// **********************************************************************
//
-// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
@@ -130,6 +130,7 @@ ConnectionState connectionStateMap[] = {
ConnectionStateActive, // StateActive
ConnectionStateHolding, // StateHolding
ConnectionStateClosing, // StateClosing
+ ConnectionStateClosing, // StateClosingPending
ConnectionStateClosed, // StateClosed
ConnectionStateClosed, // StateFinished
};
@@ -159,46 +160,50 @@ Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0)
}
void
-Ice::ConnectionI::Observer::startRead(Ice::Byte* i)
+Ice::ConnectionI::Observer::startRead(const Buffer& buf)
{
if(_readStreamPos)
{
- _observer->receivedBytes(static_cast<int>(i - _readStreamPos));
+ assert(!buf.b.empty());
+ _observer->receivedBytes(static_cast<int>(buf.i - _readStreamPos));
}
- _readStreamPos = i;
+ _readStreamPos = buf.b.empty() ? 0 : buf.i;
}
void
-Ice::ConnectionI::Observer::finishRead(Ice::Byte* i)
+Ice::ConnectionI::Observer::finishRead(const Buffer& buf)
{
if(_readStreamPos == 0)
{
return;
}
- assert(i >= _readStreamPos);
- _observer->receivedBytes(static_cast<int>(i - _readStreamPos));
+ assert(buf.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(buf.i - _readStreamPos));
_readStreamPos = 0;
}
void
-Ice::ConnectionI::Observer::startWrite(Ice::Byte* i)
+Ice::ConnectionI::Observer::startWrite(const Buffer& buf)
{
if(_writeStreamPos)
{
- _observer->sentBytes(static_cast<int>(i - _writeStreamPos));
+ assert(!buf.b.empty());
+ _observer->sentBytes(static_cast<int>(buf.i - _writeStreamPos));
}
- _writeStreamPos = i;
+ _writeStreamPos = buf.b.empty() ? 0 : buf.i;
}
void
-Ice::ConnectionI::Observer::finishWrite(Ice::Byte* i)
+Ice::ConnectionI::Observer::finishWrite(const Buffer& buf)
{
if(_writeStreamPos == 0)
{
return;
}
- assert(i >= _writeStreamPos);
- _observer->sentBytes(static_cast<int>(i - _writeStreamPos));
+ if(buf.i > _writeStreamPos)
+ {
+ _observer->sentBytes(static_cast<int>(buf.i - _writeStreamPos));
+ }
_writeStreamPos = 0;
}
@@ -1251,7 +1256,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(_observer)
{
- _observer.startWrite(_writeStream.i);
+ _observer.startWrite(_writeStream);
}
if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
@@ -1264,7 +1269,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(_observer && !_readHeader)
{
- _observer.startRead(_readStream.i);
+ _observer.startRead(_readStream);
}
_transceiver->startRead(_readStream);
@@ -1288,7 +1293,7 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
_transceiver->finishWrite(_writeStream);
if(_observer)
{
- _observer.finishWrite(_writeStream.i);
+ _observer.finishWrite(_writeStream);
}
}
else if(operation & SocketOperationRead)
@@ -1296,7 +1301,7 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
_transceiver->finishRead(_readStream);
if(_observer && !_readHeader)
{
- _observer.finishRead(_readStream.i);
+ _observer.finishRead(_readStream);
}
}
}
@@ -1339,38 +1344,43 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
try
{
unscheduleTimeout(current.operation);
- if(current.operation & SocketOperationWrite && !_writeStream.b.empty())
+
+ SocketOperation readyOp = current.operation;
+ SocketOperation writeOp = SocketOperationNone;
+ SocketOperation readOp = SocketOperationNone;
+ if(readyOp & SocketOperationWrite)
{
- if(_writeStream.i != _writeStream.b.end())
+ if(_observer)
{
- if(_observer)
- {
- _observer.startWrite(_writeStream.i);
- }
-
- if(!_transceiver->write(_writeStream))
- {
- assert(!_writeStream.b.empty());
- scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
- return;
- }
-
- if(_observer)
- {
- _observer.finishWrite(_writeStream.i);
- }
+ _observer.startWrite(_writeStream);
+ }
+ writeOp = _transceiver->write(_writeStream);
+ if(_observer && !(writeOp & SocketOperationWrite))
+ {
+ _observer.finishWrite(_writeStream);
}
- assert(_writeStream.i == _writeStream.b.end());
}
- if(current.operation & SocketOperationRead && !_readStream.b.empty())
+
+ while(readyOp & SocketOperationRead)
{
- if(_readHeader) // Read header if necessary.
+ if(_observer && !_readHeader)
+ {
+ _observer.startRead(_readStream);
+ }
+
+ readOp = _transceiver->read(_readStream, _hasMoreData);
+ if(readOp & SocketOperationRead)
+ {
+ break;
+ }
+ if(_observer && !_readHeader)
{
- if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
- {
- return;
- }
assert(_readStream.i == _readStream.b.end());
+ _observer.finishRead(_readStream);
+ }
+
+ if(_readHeader) // Read header if necessary.
+ {
_readHeader = false;
if(_observer)
@@ -1386,7 +1396,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
throw IllegalMessageSizeException(__FILE__, __LINE__);
}
-
+
_readStream.i = _readStream.b.begin();
const Byte* m;
_readStream.readBlob(m, static_cast<Int>(sizeof(magic)));
@@ -1402,7 +1412,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
EncodingVersion ev;
_readStream.read(ev);
checkSupportedProtocolEncoding(ev);
-
+
Byte messageType;
_readStream.read(messageType);
Byte compress;
@@ -1423,39 +1433,35 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
_readStream.i = _readStream.b.begin() + pos;
}
-
+
if(_readStream.i != _readStream.b.end())
{
if(_endpoint->datagram())
{
throw DatagramLimitException(__FILE__, __LINE__); // The message was truncated.
}
- else
- {
- if(_observer)
- {
- _observer.startRead(_readStream.i);
- }
-
- if(!_transceiver->read(_readStream))
- {
- assert(!_readStream.b.empty());
- scheduleTimeout(SocketOperationRead, _endpoint->timeout());
- return;
- }
-
- if(_observer)
- {
- _observer.finishRead(_readStream.i);
- }
- assert(_readStream.i == _readStream.b.end());
- }
+ continue;
}
-
+ break;
}
-
+
+ SocketOperation newOp = static_cast<SocketOperation>(readOp | writeOp);
+ readyOp = static_cast<SocketOperation>(readyOp & ~newOp);
+ assert(readyOp || newOp);
+
if(_state <= StateNotValidated)
{
+ if(newOp)
+ {
+ //
+ // Wait for all the transceiver conditions to be
+ // satisfied before continuing.
+ //
+ scheduleTimeout(newOp);
+ _threadPool->update(this, current.operation, newOp);
+ return;
+ }
+
if(_state == StateNotInitialized && !initialize(current.operation))
{
return;
@@ -1476,22 +1482,39 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
else
{
- assert(_state <= StateClosing);
+ assert(_state <= StateClosingPending);
//
// We parse messages first, if we receive a close
// connection message we won't send more messages.
//
- if(current.operation & SocketOperationRead)
+ if(readyOp & SocketOperationRead)
{
- parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
+ newOp = static_cast<SocketOperation>(newOp | parseMessage(current.stream,
+ invokeNum,
+ requestId,
+ compress,
+ servantManager,
+ adapter,
+ outAsync));
}
- if(current.operation & SocketOperationWrite)
+ if(readyOp & SocketOperationWrite)
{
- sendNextMessage(sentCBs);
+ newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs));
}
-
+
+ if(_state < StateClosed)
+ {
+ scheduleTimeout(newOp);
+ _threadPool->update(this, current.operation, newOp);
+ }
+
+ if(!readyOp)
+ {
+ return;
+ }
+
//
// We increment the dispatch count to prevent the
// communicator destruction during the callback.
@@ -1643,7 +1666,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback
// callback was dispatched when the connection was already
// in the closing state.
//
- if(_state == StateClosing && !_shutdownInitiated)
+ if(_state == StateClosing)
{
try
{
@@ -1821,7 +1844,7 @@ Ice::ConnectionI::timedOut()
{
setState(StateClosed, TimeoutException(__FILE__, __LINE__));
}
- else if(_state == StateClosing)
+ else if(_state < StateClosed)
{
setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
}
@@ -1896,7 +1919,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_reaper(reaper),
_transceiver(transceiver),
_desc(transceiver->toString()),
- _type(transceiver->type()),
+ _type(transceiver->protocol()),
_connector(connector),
_endpoint(endpoint),
_adapter(adapter),
@@ -2032,7 +2055,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
- (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
+ (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing)))
{
Warning out(_logger);
out << "connection exception:\n" << *_exception.get() << '\n' << _desc;
@@ -2125,18 +2148,15 @@ Ice::ConnectionI::setState(State state)
}
case StateClosing:
+ case StateClosingPending:
{
//
- // Can't change back from closed.
+ // Can't change back from closing pending.
//
- if(_state >= StateClosed)
+ if(_state >= StateClosingPending)
{
return;
}
- if(_state == StateHolding)
- {
- _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state.
- }
break;
}
@@ -2146,6 +2166,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
+
_threadPool->finish(this);
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
_transceiver->close();
@@ -2206,7 +2227,7 @@ Ice::ConnectionI::setState(State state)
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
- (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
+ (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing)))
{
_observer->failed(_exception->ice_name());
}
@@ -2234,8 +2255,11 @@ Ice::ConnectionI::initiateShutdown()
{
assert(_state == StateClosing);
assert(_dispatchCount == 0);
- assert(!_shutdownInitiated);
+ if(_shutdownInitiated)
+ {
+ return;
+ }
_shutdownInitiated = true;
if(!_endpoint->datagram())
@@ -2257,33 +2281,28 @@ Ice::ConnectionI::initiateShutdown()
OutgoingMessage message(&os, false);
if(sendMessage(message) & AsyncStatusSent)
{
+ setState(StateClosingPending);
+
//
- // Schedule the close timeout to wait for the peer to close the connection. If
- // the message was queued for sending, sendNextMessage will schedule the timeout
- // once all messages were sent.
- //
- scheduleTimeout(SocketOperationWrite, closeTimeout());
+ // Notify the the transceiver of the graceful connection closure.
+ //
+ SocketOperation op = _transceiver->closing(true, *_exception.get());
+ if(op)
+ {
+ scheduleTimeout(op);
+ _threadPool->_register(this, op);
+ }
}
-
- //
- // The CloseConnection message should be sufficient. Closing the write
- // end of the socket is probably an artifact of how things were done
- // in IIOP. In fact, shutting down the write end of the socket causes
- // problems on Windows by preventing the peer from using the socket.
- // For example, the peer is no longer able to continue writing a large
- // message after the socket is shutdown.
- //
- //_transceiver->shutdownWrite();
}
}
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
- SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
+ SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData);
if(s != SocketOperationNone)
{
- scheduleTimeout(s, connectTimeout());
+ scheduleTimeout(s);
_threadPool->update(this, operation, s);
return false;
}
@@ -2321,19 +2340,23 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(_observer)
{
- _observer.startWrite(_writeStream.i);
+ _observer.startWrite(_writeStream);
}
- if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
+ if(_writeStream.i != _writeStream.b.end())
{
- scheduleTimeout(SocketOperationWrite, connectTimeout());
- _threadPool->update(this, operation, SocketOperationWrite);
- return false;
+ SocketOperation op = _transceiver->write(_writeStream);
+ if(op)
+ {
+ scheduleTimeout(op);
+ _threadPool->update(this, operation, op);
+ return false;
+ }
}
if(_observer)
{
- _observer.finishWrite(_writeStream.i);
+ _observer.finishWrite(_writeStream);
}
}
else // The client side has the passive role for connection validation.
@@ -2346,19 +2369,23 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(_observer)
{
- _observer.startRead(_readStream.i);
+ _observer.startRead(_readStream);
}
- if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
+ if(_readStream.i != _readStream.b.end())
{
- scheduleTimeout(SocketOperationRead, connectTimeout());
- _threadPool->update(this, operation, SocketOperationRead);
- return false;
+ SocketOperation op = _transceiver->read(_readStream, _hasMoreData);
+ if(op)
+ {
+ scheduleTimeout(op);
+ _threadPool->update(this, operation, op);
+ return false;
+ }
}
if(_observer)
{
- _observer.finishRead(_readStream.i);
+ _observer.finishRead(_readStream);
}
assert(_readStream.i == _readStream.b.end());
@@ -2410,10 +2437,21 @@ Ice::ConnectionI::validate(SocketOperation operation)
return true;
}
-void
+SocketOperation
Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
{
- assert(!_sendStreams.empty());
+ if(_sendStreams.empty())
+ {
+ return SocketOperationNone;
+ }
+ else if(_state == StateClosingPending && _writeStream.i == _writeStream.b.begin())
+ {
+ // Message wasn't sent, empty the _writeStream, we're not going to send more data.
+ OutgoingMessage* message = &_sendStreams.front();
+ _writeStream.swap(*message->stream);
+ return SocketOperationNone;
+ }
+
assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end());
try
{
@@ -2454,15 +2492,15 @@ Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
}
//
- // If we are in the closed state, don't continue sending.
+ // If we are in the closed state or if the close is
+ // pending, don't continue sending.
//
- // The connection can be in the closed state if parseMessage
- // (called before sendNextMessage by message()) closes the
- // connection.
+ // This can occur if parseMessage (called before
+ // sendNextMessage by message()) closes the connection.
//
- if(_state >= StateClosed)
+ if(_state >= StateClosingPending)
{
- return;
+ return SocketOperationNone;
}
//
@@ -2536,38 +2574,44 @@ Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
//
if(_observer)
{
- _observer.startWrite(_writeStream.i);
+ _observer.startWrite(_writeStream);
}
assert(_writeStream.i);
- if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
+ if(_writeStream.i != _writeStream.b.end())
{
- assert(!_writeStream.b.empty());
- scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
- return;
+ SocketOperation op = _transceiver->write(_writeStream);
+ if(op)
+ {
+ return op;
+ }
}
if(_observer)
{
- _observer.finishWrite(_writeStream.i);
+ _observer.finishWrite(_writeStream);
}
}
}
catch(const Ice::LocalException& ex)
{
setState(StateClosed, ex);
- return;
+ return SocketOperationNone;
}
- assert(_writeStream.b.empty());
- _threadPool->unregister(this, SocketOperationWrite);
-
//
// If all the messages were sent and we are in the closing state, we schedule
// the close timeout to wait for the peer to close the connection.
//
if(_state == StateClosing)
{
- scheduleTimeout(SocketOperationWrite, closeTimeout());
+ setState(StateClosingPending);
+ SocketOperation op = _transceiver->closing(true, *_exception.get());
+ if(op)
+ {
+ return op;
+ }
}
+
+ return SocketOperationNone;
}
AsyncStatus
@@ -2590,6 +2634,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
message.stream->i = message.stream->b.begin();
+ SocketOperation op;
#ifndef ICE_OS_WINRT
if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes.
{
@@ -2619,13 +2664,14 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
if(_observer)
{
- _observer.startWrite(stream.i);
+ _observer.startWrite(stream);
}
- if(_transceiver->write(stream))
+ op = _transceiver->write(stream);
+ if(!op)
{
if(_observer)
{
- _observer.finishWrite(stream.i);
+ _observer.finishWrite(stream);
}
AsyncStatus status = AsyncStatusSent;
@@ -2681,13 +2727,14 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
if(_observer)
{
- _observer.startWrite(message.stream->i);
+ _observer.startWrite(*message.stream);
}
- if(_transceiver->write(*message.stream))
+ op = _transceiver->write(*message.stream);
+ if(!op)
{
if(_observer)
{
- _observer.finishWrite(message.stream->i);
+ _observer.finishWrite(*message.stream);
}
AsyncStatus status = AsyncStatusSent;
if(message.sent(this, false))
@@ -2709,8 +2756,8 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
#endif
_writeStream.swap(*_sendStreams.back().stream);
- scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
- _threadPool->_register(this, SocketOperationWrite);
+ scheduleTimeout(op);
+ _threadPool->_register(this, op);
return AsyncStatusQueued;
}
@@ -2861,7 +2908,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
}
#endif
-void
+SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
OutgoingAsyncPtr& outAsync)
@@ -2921,14 +2968,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
else
{
- setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__));
+
+ //
+ // Notify the the transceiver of the graceful connection closure.
+ //
+ SocketOperation op = _transceiver->closing(false, *_exception.get());
+ if(op)
+ {
+ return op;
+ }
+ setState(StateClosed);
}
- break;
}
case requestMsg:
{
- if(_state == StateClosing)
+ if(_state >= StateClosing)
{
trace("received request during closing\n(ignored by server, client will retry)", stream, _logger,
_traceLevels);
@@ -2947,7 +3003,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
case requestBatchMsg:
{
- if(_state == StateClosing)
+ if(_state >= StateClosing)
{
trace("received batch request during closing\n(ignored by server, client will retry)", stream,
_logger, _traceLevels);
@@ -3095,6 +3151,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
setState(StateClosed, ex);
}
}
+
+ return _state == StateHolding ? SocketOperationNone : SocketOperationRead;
}
void
@@ -3134,31 +3192,88 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
}
}
-int
-Ice::ConnectionI::connectTimeout()
+void
+Ice::ConnectionI::scheduleTimeout(SocketOperation status)
{
- DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- if(defaultsAndOverrides->overrideConnectTimeout)
+ int timeout;
+ if(_state < StateActive)
+ {
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideConnectTimeout)
+ {
+ timeout = defaultsAndOverrides->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
+ }
+ else if(_state < StateClosingPending)
{
- return defaultsAndOverrides->overrideConnectTimeoutValue;
+ if(_readHeader) // No timeout for reading the header.
+ {
+ status = static_cast<SocketOperation>(status & ~SocketOperationRead);
+ }
+ timeout = _endpoint->timeout();
}
else
{
- return _endpoint->timeout();
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideCloseTimeout)
+ {
+ timeout = defaultsAndOverrides->overrideCloseTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
+ }
+
+ if(timeout < 0)
+ {
+ return;
+ }
+
+ try
+ {
+ if(status & IceInternal::SocketOperationRead)
+ {
+ if(_readTimeoutScheduled)
+ {
+ _timer->cancel(_readTimeout);
+ }
+ _timer->schedule(_readTimeout, IceUtil::Time::milliSeconds(timeout));
+ _readTimeoutScheduled = true;
+ }
+ if(status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect))
+ {
+ if(_writeTimeoutScheduled)
+ {
+ _timer->cancel(_writeTimeout);
+ }
+ _timer->schedule(_writeTimeout, IceUtil::Time::milliSeconds(timeout));
+ _writeTimeoutScheduled = true;
+ }
+ }
+ catch(const IceUtil::Exception&)
+ {
+ assert(false);
}
}
-int
-Ice::ConnectionI::closeTimeout()
+void
+Ice::ConnectionI::unscheduleTimeout(SocketOperation status)
{
- DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- if(defaultsAndOverrides->overrideCloseTimeout)
+ if((status & IceInternal::SocketOperationRead) && _readTimeoutScheduled)
{
- return defaultsAndOverrides->overrideCloseTimeoutValue;
+ _timer->cancel(_readTimeout);
+ _readTimeoutScheduled = false;
}
- else
+ if((status & (IceInternal::SocketOperationWrite | IceInternal::SocketOperationConnect)) &&
+ _writeTimeoutScheduled)
{
- return _endpoint->timeout();
+ _timer->cancel(_writeTimeout);
+ _writeTimeoutScheduled = false;
}
}