From d81701ca8182942b7936f9fd84a019b695e9c890 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Fri, 23 May 2014 11:59:44 +0200 Subject: Added support for invocation timeouts and ACM heartbeats --- cpp/src/Ice/ConnectionI.cpp | 613 ++++++++++++++++++++++++++++++++------------ 1 file changed, 455 insertions(+), 158 deletions(-) (limited to 'cpp/src/Ice/ConnectionI.cpp') diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 9618b1fe781..9dc3ff4efa5 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -16,7 +16,7 @@ #include #include #include -#include +#include #include // For getThreadPool() and getServantManager(). #include #include @@ -66,9 +66,10 @@ class DispatchDispatcherCall : public DispatcherCall public: DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, - const vector& sentCBs, Byte compress, Int requestId, + const vector& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncPtr& outAsync, BasicStream& stream) : + const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, + BasicStream& stream) : _connection(connection), _startCB(startCB), _sentCBs(sentCBs), @@ -78,6 +79,7 @@ public: _servantManager(servantManager), _adapter(adapter), _outAsync(outAsync), + _heartbeatCallback(heartbeatCallback), _stream(stream.instance(), currentProtocolEncoding) { _stream.swap(stream); @@ -87,20 +89,21 @@ public: run() { _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, - _outAsync, _stream); + _outAsync, _heartbeatCallback, _stream); } private: ConnectionIPtr _connection; ConnectionI::StartCallbackPtr _startCB; - vector _sentCBs; + vector _sentCBs; Byte _compress; Int _requestId; Int _invokeNum; ServantManagerPtr _servantManager; ObjectAdapterPtr _adapter; OutgoingAsyncPtr _outAsync; + ConnectionCallbackPtr _heartbeatCallback; BasicStream _stream; }; @@ -137,24 +140,6 @@ ConnectionState connectionStateMap[] = { } -void -IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection) -{ - Lock sync(*this); - _connections.push_back(connection); - if(connection->_observer) - { - connection->_observer.detach(); - } -} - -void -IceInternal::ConnectionReaper::swapConnections(vector& connections) -{ - Lock sync(*this); - _connections.swap(connections); -} - Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0) { } @@ -254,25 +239,39 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) adopted = true; } +void +Ice::ConnectionI::OutgoingMessage::timedOut() +{ + assert((out || outAsync) && !isSent); // Only requests can timeout. + out = 0; + outAsync = 0; + adopt(0); // Adopt the request stream +} + bool -Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify) +Ice::ConnectionI::OutgoingMessage::sent() { isSent = true; // The message is sent. if(adopted) { delete stream; - stream = 0; } + stream = 0; if(out) { - out->sent(notify); // true = notify the waiting thread that the request was sent. + out->sent(); return false; } else if(outAsync) { - return outAsync->__sent(connection); +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + invokeSentCallback = outAsync->__sent(); + return invokeSentCallback || receivedReply; +#else + return outAsync->__sent(); +#endif } else { @@ -295,8 +294,8 @@ Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) if(adopted) { delete stream; - stream = 0; } + stream = 0; } void @@ -368,12 +367,10 @@ Ice::ConnectionI::activate() { return; } - - if(_acmTimeout > 0) + if(_acmLastActivity != IceUtil::Time()) { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } - setState(StateActive); } @@ -538,32 +535,67 @@ Ice::ConnectionI::updateObserver() } void -Ice::ConnectionI::monitor(const IceUtil::Time& now) +Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) { - IceUtil::Monitor::TryLock sync(*this); - if(!sync.acquired()) + IceUtil::Monitor::Lock sync(*this); + if(_state != StateActive) { return; } + assert(acm.timeout != IceUtil::Time()); - if(_state != StateActive) + if(static_cast(_readStream.b.size()) > headerSize || !_writeStream.b.empty()) { + // + // If writing or reading, nothing to do, the connection + // timeout will kick-in if writes or reads don't progress. + // This check is necessary because the actitivy timer is + // only set when a message is fully read/written. + // return; } // - // Active connection management for idle connections. + // We send a heartbeat if there was no activity in the last + // (timeout / 4) period. Sending a heartbeat sooner than really + // needed is safer to ensure that the receiver will receive in + // time the heartbeat. Sending the heartbeat if there was no + // activity in the last (timeout / 2) period isn't enough since + // monitor() is called only every (timeout / 2) period. // - if(_acmTimeout <= 0 || - !_requests.empty() || !_asyncRequests.empty() || _dispatchCount > 0 || - static_cast(_readStream.b.size()) > headerSize || !_writeStream.b.empty() || !_batchStream.b.empty()) + // Note that this doesn't imply that we are sending 4 heartbeats + // per timeout period because the monitor() method is sill only + // called every (timeout / 2) period. + // + + if(acm.heartbeat == HeartbeatAlways || + (acm.heartbeat != HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4))) { - return; + if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0) + { + heartbeat(); + } } - - if(now >= _acmAbsoluteTimeout) + + if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout)) { - setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + if(acm.close == CloseOnIdleForceful || + (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty()))) + { + // + // Close the connection if we didn't receive a heartbeat in + // the last period. + // + setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__)); + } + else if(acm.close != CloseOnInvocation && + _dispatchCount == 0 && _batchStream.b.empty() && _requests.empty() && _asyncRequests.empty()) + { + // + // The connection is idle, close it. + // + setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + } } } @@ -957,11 +989,11 @@ Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const Lo new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie); try { - result->__send(); + result->__invoke(); } catch(const LocalException& __ex) { - result->__exceptionAsync(__ex); + result->__invokeExceptionAsync(__ex); } return result; } @@ -989,7 +1021,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) if(_batchRequestNum == 0) { - out->sent(false); + out->sent(); return true; } @@ -1052,7 +1084,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; - if(outAsync->__sent(this)) + if(outAsync->__sent()) { status = static_cast(status | AsyncStatusInvokeSentCallback); } @@ -1101,6 +1133,194 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) return status; } +void +Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) +{ + IceUtil::Monitor::Lock sync(*this); + if(_state > StateClosing) + { + return; + } + _callback = callback; +} + +void +Ice::ConnectionI::setACM(const IceUtil::Optional& timeout, + const IceUtil::Optional& close, + const IceUtil::Optional& heartbeat) +{ + IceUtil::Monitor::Lock sync(*this); + if(_monitor) + { + if(_state == StateActive) + { + _monitor->remove(this); + } + _monitor = _monitor->acm(timeout, close, heartbeat); + if(_state == StateActive) + { + _monitor->add(this); + } + + if(_monitor->getACM().timeout <= 0) + { + _acmLastActivity = IceUtil::Time(); // Disable the recording of last activity. + } + else if(_acmLastActivity == IceUtil::Time() && _state == StateActive) + { + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); + } + } +} + +ACM +Ice::ConnectionI::getACM() +{ + IceUtil::Monitor::Lock sync(*this); + ACM acm; + acm.timeout = 0; + acm.close = CloseOff; + acm.heartbeat = HeartbeatOff; + return _monitor ? _monitor->getACM() : acm; +} + +void +Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) +{ + IceUtil::Monitor::Lock sync(*this); + for(deque::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) + { + if(o->out == out) + { + if(o->requestId) + { + if(_requestsHint != _requests.end() && _requestsHint->second == dynamic_cast(out)) + { + _requests.erase(_requestsHint); + _requestsHint = _requests.end(); + } + else + { + _requests.erase(o->requestId); + } + } + + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + o->timedOut(); + } + else + { + _sendStreams.erase(o); + } + + InvocationTimeoutException ex(__FILE__, __LINE__); + o->finished(ex); + return; + } + } + + Outgoing* o = dynamic_cast(out); + if(o) + { + if(_requestsHint != _requests.end() && _requestsHint->second == o) + { + InvocationTimeoutException ex(__FILE__, __LINE__); + o->finished(ex, true); + _requests.erase(_requestsHint); + _requestsHint = _requests.end(); + } + else + { + for(map::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->second == o) + { + InvocationTimeoutException ex(__FILE__, __LINE__); + o->finished(ex, true); + assert(p != _requestsHint); + _requests.erase(p); + return; // We're done. + } + } + } + } +} + +void +Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +{ + IceUtil::Monitor::Lock sync(*this); + + for(deque::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) + { + if(o->outAsync.get() == outAsync.get()) + { + if(o->requestId) + { + if(_asyncRequestsHint != _asyncRequests.end() && + _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) + { + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + } + else + { + _asyncRequests.erase(o->requestId); + } + } + + // + // If the request is being sent, don't remove it from the send streams, + // it will be removed once the sending is finished. + // + if(o == _sendStreams.begin()) + { + o->timedOut(); + } + else + { + _sendStreams.erase(o); + } + + InvocationTimeoutException ex(__FILE__, __LINE__); + o->finished(ex); + return; // We're done. + } + } + + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); + if(o) + { + if(_asyncRequestsHint != _asyncRequests.end()) + { + if(_asyncRequestsHint->second == o) + { + InvocationTimeoutException ex(__FILE__, __LINE__); + o->__finished(ex, true); + _asyncRequests.erase(_asyncRequestsHint); + _asyncRequestsHint = _asyncRequests.end(); + } + } + + for(map::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + { + if(p->second.get() == o.get()) + { + InvocationTimeoutException ex(__FILE__, __LINE__); + o->__finished(ex, true); + assert(p != _asyncRequestsHint); + _asyncRequests.erase(p); + return; // We're done. + } + } + } +} + void Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) { @@ -1113,7 +1333,7 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) { if(_state == StateFinished) { - _reaper->add(this); + reap(); } notifyAll(); } @@ -1152,7 +1372,7 @@ Ice::ConnectionI::sendNoResponse() { if(_state == StateFinished) { - _reaper->add(this); + reap(); } notifyAll(); } @@ -1317,13 +1537,14 @@ void Ice::ConnectionI::message(ThreadPoolCurrent& current) { StartCallbackPtr startCB; - vector sentCBs; + vector sentCBs; Byte compress = 0; Int requestId = 0; Int invokeNum = 0; ServantManagerPtr servantManager; ObjectAdapterPtr adapter; OutgoingAsyncPtr outAsync; + ConnectionCallbackPtr heartbeatCallback; ThreadPoolMessage msg(current, *this); @@ -1341,11 +1562,11 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } + SocketOperation readyOp = current.operation; try { unscheduleTimeout(current.operation); - SocketOperation readyOp = current.operation; SocketOperation writeOp = SocketOperationNone; SocketOperation readOp = SocketOperationNone; if(readyOp & SocketOperationWrite) @@ -1478,7 +1699,14 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // We start out in holding state. // setState(StateHolding); - swap(_startCallback, startCB); + if(_startCallback) + { + swap(_startCallback, startCB); + if(startCB) + { + ++_dispatchCount; + } + } } else { @@ -1496,33 +1724,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) compress, servantManager, adapter, - outAsync)); + outAsync, + heartbeatCallback)); } if(readyOp & SocketOperationWrite) { newOp = static_cast(newOp | sendNextMessage(sentCBs)); + if(!sentCBs.empty()) + { + ++_dispatchCount; + } } if(_state < StateClosed) { scheduleTimeout(newOp); _threadPool->update(this, current.operation, newOp); - } + } if(!readyOp) { return; } - - // - // We increment the dispatch count to prevent the - // communicator destruction during the callback. - // - if(!sentCBs.empty() || outAsync) - { - ++_dispatchCount; - } } } catch(const DatagramLimitException&) // Expected. @@ -1561,12 +1785,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } return; } - - if(_acmTimeout > 0) + if(_acmLastActivity != IceUtil::Time()) { - _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } - io.completed(); } @@ -1575,7 +1797,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) try { _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum, - servantManager, adapter, outAsync, current.stream), this); + servantManager, adapter, outAsync, heartbeatCallback, + current.stream), this); } catch(const std::exception& ex) { @@ -1596,15 +1819,19 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { - dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream); + dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback, + current.stream); } } void -ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector& sentCBs, +ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, - const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) + const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, + const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) { + int count = 0; + // // Notify the factory that the connection establishment and // validation has completed. @@ -1612,25 +1839,30 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vectorconnectionStartCompleted(this); + ++count; } // // Notify AMI calls that the message was sent. // - for(vector::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) + if(!sentCBs.empty()) { -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - if(p->outAsync) - { - p->outAsync->__sent(); - } - if(p->replyOutAsync) + for(vector::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) { - p->replyOutAsync->__finished(); - } +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) + if(p->invokeSentCallback) + { + p->outAsync->__invokeSent(); + } + if(p->receivedReply) + { + OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished(); + } #else - p->outAsync->__sent(); + p->outAsync->__invokeSent(); #endif + } + ++count; } // @@ -1640,6 +1872,26 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector__finished(); + ++count; + } + + if(heartbeatCallback) + { + try + { + heartbeatCallback->heartbeat(this); + } + catch(const std::exception& ex) + { + Error out(_instance->initializationData().logger); + out << "connection callback exception:\n" << ex << '\n' << _desc; + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc; + } + ++count; } // @@ -1650,15 +1902,21 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector 0) { IceUtil::Monitor::Lock sync(*this); - if(--_dispatchCount == 0) + _dispatchCount -= count; + if(_dispatchCount == 0) { // // Only initiate shutdown if not already done. It might @@ -1679,7 +1937,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vectoradd(this); + reap(); } notifyAll(); } @@ -1700,7 +1958,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty()) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) { finish(); return; @@ -1762,15 +2020,13 @@ Ice::ConnectionI::finish() // the response has been received in the meantime, we remove the message from // _sendStreams to not call finished on a message which is already done. // - if(message->requestId > 0 && - ((message->out && _requests.find(message->requestId) == _requests.end()) || - (message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end()))) + if(message->receivedReply) { - if(message->sent(this, true)) + if(message->sent() && message->invokeSentCallback) { - assert(message->outAsync); - message->outAsync->__sent(); + message->outAsync->__invokeSent(); } + OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished(); _sendStreams.pop_front(); } #endif @@ -1806,6 +2062,25 @@ Ice::ConnectionI::finish() } _asyncRequests.clear(); + if(_callback) + { + try + { + _callback->closed(this); + } + catch(const std::exception& ex) + { + Error out(_instance->initializationData().logger); + out << "connection callback exception:\n" << ex << '\n' << _desc; + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc; + } + _callback = 0; + } + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -1815,7 +2090,7 @@ Ice::ConnectionI::finish() setState(StateFinished); if(_dispatchCount == 0) { - _reaper->add(this); + reap(); } } } @@ -1893,14 +2168,13 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum) if(invokeNum > 0) { - assert(_dispatchCount > 0); + assert(_dispatchCount >= invokeNum); _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); if(_dispatchCount == 0) { if(_state == StateFinished) { - _reaper->add(this); + reap(); } notifyAll(); } @@ -1909,14 +2183,14 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum) Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, const InstancePtr& instance, - const ConnectionReaperPtr& reaper, + const ACMMonitorPtr& monitor, const TransceiverPtr& transceiver, const ConnectorPtr& connector, const EndpointIPtr& endpoint, const ObjectAdapterPtr& adapter) : _communicator(communicator), _instance(instance), - _reaper(reaper), + _monitor(monitor), _transceiver(transceiver), _desc(transceiver->toString()), _type(transceiver->protocol()), @@ -1933,7 +2207,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _readTimeoutScheduled(false), _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0), - _acmTimeout(0), _compressionLevel(1), _nextRequestId(1), _requestsHint(_requests.end()), @@ -1971,21 +2244,9 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _servantManager = adapterImpl->getServantManager(); } - Int& acmTimeout = const_cast(_acmTimeout); - if(_endpoint->datagram()) + if(_monitor && _monitor->getACM().timeout > 0) { - acmTimeout = 0; - } - else - { - if(adapterImpl) - { - acmTimeout = adapterImpl->getACM(); - } - else - { - acmTimeout = _instance->clientACM(); - } + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } __setNoDelete(true); @@ -2012,6 +2273,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, Ice::ConnectionI::~ConnectionI() { assert(!_startCallback); + assert(!_callback); assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); @@ -2197,15 +2459,19 @@ Ice::ConnectionI::setState(State state) // monitor, but only if we were registered before, i.e., if our // old state was StateActive. // - if(_acmTimeout > 0) + if(_monitor) { if(state == StateActive) { - _instance->connectionMonitor()->add(this); + _monitor->add(this); + if(_acmLastActivity != IceUtil::Time()) + { + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); + } } else if(_state == StateActive) { - _instance->connectionMonitor()->remove(this); + _monitor->remove(this); } } @@ -2296,6 +2562,37 @@ Ice::ConnectionI::initiateShutdown() } } +void +Ice::ConnectionI::heartbeat() +{ + assert(_state == StateActive); + + if(!_endpoint->datagram()) + { + BasicStream os(_instance.get(), Ice::currentProtocolEncoding); + os.write(magic[0]); + os.write(magic[1]); + os.write(magic[2]); + os.write(magic[3]); + os.write(currentProtocol); + os.write(currentProtocolEncoding); + os.write(validateConnectionMsg); + os.write(static_cast(0)); // Compression status (always zero for validate connection). + os.write(headerSize); // Message size. + os.i = os.b.begin(); + try + { + OutgoingMessage message(&os, false); + sendMessage(message); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + assert(_exception.get()); + } + } +} + bool Ice::ConnectionI::initialize(SocketOperation operation) { @@ -2438,7 +2735,7 @@ Ice::ConnectionI::validate(SocketOperation operation) } SocketOperation -Ice::ConnectionI::sendNextMessage(vector& callbacks) +Ice::ConnectionI::sendNextMessage(vector& callbacks) { if(_sendStreams.empty()) { @@ -2461,26 +2758,14 @@ Ice::ConnectionI::sendNextMessage(vector& callbacks) // Notify the message that it was sent. // OutgoingMessage* message = &_sendStreams.front(); - _writeStream.swap(*message->stream); -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - bool sentCB = message->sent(this, true); - if(sentCB || message->replyOutAsync) + if(message->stream) { - if(sentCB) + _writeStream.swap(*message->stream); + if(message->sent()) { - callbacks.push_back(SentCallback(message->outAsync, message->replyOutAsync)); - } - else - { - callbacks.push_back(SentCallback(0, message->replyOutAsync)); + callbacks.push_back(*message); } } -#else - if(message->sent(this, true)) - { - callbacks.push_back(SentCallback(message->outAsync)); - } -#endif _sendStreams.pop_front(); // @@ -2658,7 +2943,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { traceSend(*message.stream, _logger, _traceLevels); } - + // // Send the message without blocking. // @@ -2675,14 +2960,13 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) } AsyncStatus status = AsyncStatusSent; - if(message.sent(this, false)) + if(message.sent()) { status = static_cast(status | AsyncStatusInvokeSentCallback); } - if(_acmTimeout > 0) + if(_acmLastActivity != IceUtil::Time()) { - _acmAbsoluteTimeout = - IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } return status; } @@ -2737,14 +3021,13 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) _observer.finishWrite(*message.stream); } AsyncStatus status = AsyncStatusSent; - if(message.sent(this, false)) + if(message.sent()) { status = static_cast(status | AsyncStatusInvokeSentCallback); } - if(_acmTimeout > 0) + if(_acmLastActivity != IceUtil::Time()) { - _acmAbsoluteTimeout = - IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } return status; } @@ -2911,7 +3194,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync) + OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3062,11 +3345,6 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request q = _asyncRequests.find(requestId); } - if(p == _requests.end() && q == _asyncRequests.end()) - { - throw UnknownRequestIdException(__FILE__, __LINE__); - } - if(p != _requests.end()) { p->second->finished(stream); @@ -3080,11 +3358,10 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { _requests.erase(p); } + notifyAll(); // Notify threads blocked in close(false) } - else + else if(q != _asyncRequests.end()) { - assert(q != _asyncRequests.end()); - outAsync = q->second; if(q == _asyncRequestsHint) @@ -3108,22 +3385,29 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front(); if(message && message->outAsync.get() == outAsync.get()) { - swap(message->replyOutAsync, outAsync); + message->receivedReply = true; + outAsync = 0; } -#endif - + else + { + ++_dispatchCount; + } +#else + ++_dispatchCount; +#endif + notifyAll(); // Notify threads blocked in close(false) } - notifyAll(); // Notify threads blocked in close(false) + break; } case validateConnectionMsg: { traceRecv(stream, _logger, _traceLevels); - if(_warn) + if(_callback) { - Warning out(_logger); - out << "ignoring unexpected validate connection message:\n" << _desc; + heartbeatCallback = _callback; + ++_dispatchCount; } break; } @@ -3301,3 +3585,16 @@ ConnectionI::toConnectionState(State state) const { return connectionStateMap[static_cast(state)]; } + +void +ConnectionI::reap() +{ + if(_monitor) + { + _monitor->reap(this); + } + if(_observer) + { + _observer.detach(); + } +} -- cgit v1.2.3