summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
commitd81701ca8182942b7936f9fd84a019b695e9c890 (patch)
treedc036c9d701fbbe1afad67782bd78572c0f61974 /cpp/src/Ice/ConnectionI.cpp
parentFixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff)
downloadice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2
ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz
ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp613
1 files changed, 455 insertions, 158 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 9618b1fe781..9dc3ff4efa5 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -16,7 +16,7 @@
#include <Ice/DefaultsAndOverrides.h>
#include <Ice/Transceiver.h>
#include <Ice/ThreadPool.h>
-#include <Ice/ConnectionMonitor.h>
+#include <Ice/ACM.h>
#include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager().
#include <Ice/EndpointI.h>
#include <Ice/Outgoing.h>
@@ -66,9 +66,10 @@ class DispatchDispatcherCall : public DispatcherCall
public:
DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
- const vector<ConnectionI::SentCallback>& sentCBs, Byte compress, Int requestId,
+ const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
- const OutgoingAsyncPtr& outAsync, BasicStream& stream) :
+ const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
+ BasicStream& stream) :
_connection(connection),
_startCB(startCB),
_sentCBs(sentCBs),
@@ -78,6 +79,7 @@ public:
_servantManager(servantManager),
_adapter(adapter),
_outAsync(outAsync),
+ _heartbeatCallback(heartbeatCallback),
_stream(stream.instance(), currentProtocolEncoding)
{
_stream.swap(stream);
@@ -87,20 +89,21 @@ public:
run()
{
_connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter,
- _outAsync, _stream);
+ _outAsync, _heartbeatCallback, _stream);
}
private:
ConnectionIPtr _connection;
ConnectionI::StartCallbackPtr _startCB;
- vector<ConnectionI::SentCallback> _sentCBs;
+ vector<ConnectionI::OutgoingMessage> _sentCBs;
Byte _compress;
Int _requestId;
Int _invokeNum;
ServantManagerPtr _servantManager;
ObjectAdapterPtr _adapter;
OutgoingAsyncPtr _outAsync;
+ ConnectionCallbackPtr _heartbeatCallback;
BasicStream _stream;
};
@@ -137,24 +140,6 @@ ConnectionState connectionStateMap[] = {
}
-void
-IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection)
-{
- Lock sync(*this);
- _connections.push_back(connection);
- if(connection->_observer)
- {
- connection->_observer.detach();
- }
-}
-
-void
-IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connections)
-{
- Lock sync(*this);
- _connections.swap(connections);
-}
-
Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0)
{
}
@@ -254,25 +239,39 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
adopted = true;
}
+void
+Ice::ConnectionI::OutgoingMessage::timedOut()
+{
+ assert((out || outAsync) && !isSent); // Only requests can timeout.
+ out = 0;
+ outAsync = 0;
+ adopt(0); // Adopt the request stream
+}
+
bool
-Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify)
+Ice::ConnectionI::OutgoingMessage::sent()
{
isSent = true; // The message is sent.
if(adopted)
{
delete stream;
- stream = 0;
}
+ stream = 0;
if(out)
{
- out->sent(notify); // true = notify the waiting thread that the request was sent.
+ out->sent();
return false;
}
else if(outAsync)
{
- return outAsync->__sent(connection);
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ invokeSentCallback = outAsync->__sent();
+ return invokeSentCallback || receivedReply;
+#else
+ return outAsync->__sent();
+#endif
}
else
{
@@ -295,8 +294,8 @@ Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
if(adopted)
{
delete stream;
- stream = 0;
}
+ stream = 0;
}
void
@@ -368,12 +367,10 @@ Ice::ConnectionI::activate()
{
return;
}
-
- if(_acmTimeout > 0)
+ if(_acmLastActivity != IceUtil::Time())
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
-
setState(StateActive);
}
@@ -538,32 +535,67 @@ Ice::ConnectionI::updateObserver()
}
void
-Ice::ConnectionI::monitor(const IceUtil::Time& now)
+Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
{
- IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
- if(!sync.acquired())
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state != StateActive)
{
return;
}
+ assert(acm.timeout != IceUtil::Time());
- if(_state != StateActive)
+ if(static_cast<Int>(_readStream.b.size()) > headerSize || !_writeStream.b.empty())
{
+ //
+ // If writing or reading, nothing to do, the connection
+ // timeout will kick-in if writes or reads don't progress.
+ // This check is necessary because the actitivy timer is
+ // only set when a message is fully read/written.
+ //
return;
}
//
- // Active connection management for idle connections.
+ // We send a heartbeat if there was no activity in the last
+ // (timeout / 4) period. Sending a heartbeat sooner than really
+ // needed is safer to ensure that the receiver will receive in
+ // time the heartbeat. Sending the heartbeat if there was no
+ // activity in the last (timeout / 2) period isn't enough since
+ // monitor() is called only every (timeout / 2) period.
//
- if(_acmTimeout <= 0 ||
- !_requests.empty() || !_asyncRequests.empty() || _dispatchCount > 0 ||
- static_cast<Int>(_readStream.b.size()) > headerSize || !_writeStream.b.empty() || !_batchStream.b.empty())
+ // Note that this doesn't imply that we are sending 4 heartbeats
+ // per timeout period because the monitor() method is sill only
+ // called every (timeout / 2) period.
+ //
+
+ if(acm.heartbeat == HeartbeatAlways ||
+ (acm.heartbeat != HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4)))
{
- return;
+ if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0)
+ {
+ heartbeat();
+ }
}
-
- if(now >= _acmAbsoluteTimeout)
+
+ if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout))
{
- setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
+ if(acm.close == CloseOnIdleForceful ||
+ (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty())))
+ {
+ //
+ // Close the connection if we didn't receive a heartbeat in
+ // the last period.
+ //
+ setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__));
+ }
+ else if(acm.close != CloseOnInvocation &&
+ _dispatchCount == 0 && _batchStream.b.empty() && _requests.empty() && _asyncRequests.empty())
+ {
+ //
+ // The connection is idle, close it.
+ //
+ setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
+ }
}
}
@@ -957,11 +989,11 @@ Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const Lo
new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie);
try
{
- result->__send();
+ result->__invoke();
}
catch(const LocalException& __ex)
{
- result->__exceptionAsync(__ex);
+ result->__invokeExceptionAsync(__ex);
}
return result;
}
@@ -989,7 +1021,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
if(_batchRequestNum == 0)
{
- out->sent(false);
+ out->sent();
return true;
}
@@ -1052,7 +1084,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
if(_batchRequestNum == 0)
{
AsyncStatus status = AsyncStatusSent;
- if(outAsync->__sent(this))
+ if(outAsync->__sent())
{
status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
}
@@ -1101,6 +1133,194 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
return status;
}
+void
+Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state > StateClosing)
+ {
+ return;
+ }
+ _callback = callback;
+}
+
+void
+Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
+ const IceUtil::Optional<Ice::ACMClose>& close,
+ const IceUtil::Optional<Ice::ACMHeartbeat>& heartbeat)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_monitor)
+ {
+ if(_state == StateActive)
+ {
+ _monitor->remove(this);
+ }
+ _monitor = _monitor->acm(timeout, close, heartbeat);
+ if(_state == StateActive)
+ {
+ _monitor->add(this);
+ }
+
+ if(_monitor->getACM().timeout <= 0)
+ {
+ _acmLastActivity = IceUtil::Time(); // Disable the recording of last activity.
+ }
+ else if(_acmLastActivity == IceUtil::Time() && _state == StateActive)
+ {
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ }
+ }
+}
+
+ACM
+Ice::ConnectionI::getACM()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ ACM acm;
+ acm.timeout = 0;
+ acm.close = CloseOff;
+ acm.heartbeat = HeartbeatOff;
+ return _monitor ? _monitor->getACM() : acm;
+}
+
+void
+Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
+ {
+ if(o->out == out)
+ {
+ if(o->requestId)
+ {
+ if(_requestsHint != _requests.end() && _requestsHint->second == dynamic_cast<Outgoing*>(out))
+ {
+ _requests.erase(_requestsHint);
+ _requestsHint = _requests.end();
+ }
+ else
+ {
+ _requests.erase(o->requestId);
+ }
+ }
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.begin())
+ {
+ o->timedOut();
+ }
+ else
+ {
+ _sendStreams.erase(o);
+ }
+
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->finished(ex);
+ return;
+ }
+ }
+
+ Outgoing* o = dynamic_cast<Outgoing*>(out);
+ if(o)
+ {
+ if(_requestsHint != _requests.end() && _requestsHint->second == o)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->finished(ex, true);
+ _requests.erase(_requestsHint);
+ _requestsHint = _requests.end();
+ }
+ else
+ {
+ for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->second == o)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->finished(ex, true);
+ assert(p != _requestsHint);
+ _requests.erase(p);
+ return; // We're done.
+ }
+ }
+ }
+ }
+}
+
+void
+Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
+ {
+ if(o->outAsync.get() == outAsync.get())
+ {
+ if(o->requestId)
+ {
+ if(_asyncRequestsHint != _asyncRequests.end() &&
+ _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
+ {
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
+ else
+ {
+ _asyncRequests.erase(o->requestId);
+ }
+ }
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.begin())
+ {
+ o->timedOut();
+ }
+ else
+ {
+ _sendStreams.erase(o);
+ }
+
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->finished(ex);
+ return; // We're done.
+ }
+ }
+
+ OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
+ if(o)
+ {
+ if(_asyncRequestsHint != _asyncRequests.end())
+ {
+ if(_asyncRequestsHint->second == o)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->__finished(ex, true);
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
+ }
+
+ for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
+ {
+ if(p->second.get() == o.get())
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->__finished(ex, true);
+ assert(p != _asyncRequestsHint);
+ _asyncRequests.erase(p);
+ return; // We're done.
+ }
+ }
+ }
+}
+
void
Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
{
@@ -1113,7 +1333,7 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
{
if(_state == StateFinished)
{
- _reaper->add(this);
+ reap();
}
notifyAll();
}
@@ -1152,7 +1372,7 @@ Ice::ConnectionI::sendNoResponse()
{
if(_state == StateFinished)
{
- _reaper->add(this);
+ reap();
}
notifyAll();
}
@@ -1317,13 +1537,14 @@ void
Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
StartCallbackPtr startCB;
- vector<SentCallback> sentCBs;
+ vector<OutgoingMessage> sentCBs;
Byte compress = 0;
Int requestId = 0;
Int invokeNum = 0;
ServantManagerPtr servantManager;
ObjectAdapterPtr adapter;
OutgoingAsyncPtr outAsync;
+ ConnectionCallbackPtr heartbeatCallback;
ThreadPoolMessage<ConnectionI> msg(current, *this);
@@ -1341,11 +1562,11 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
return;
}
+ SocketOperation readyOp = current.operation;
try
{
unscheduleTimeout(current.operation);
- SocketOperation readyOp = current.operation;
SocketOperation writeOp = SocketOperationNone;
SocketOperation readOp = SocketOperationNone;
if(readyOp & SocketOperationWrite)
@@ -1478,7 +1699,14 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
// We start out in holding state.
//
setState(StateHolding);
- swap(_startCallback, startCB);
+ if(_startCallback)
+ {
+ swap(_startCallback, startCB);
+ if(startCB)
+ {
+ ++_dispatchCount;
+ }
+ }
}
else
{
@@ -1496,33 +1724,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
compress,
servantManager,
adapter,
- outAsync));
+ outAsync,
+ heartbeatCallback));
}
if(readyOp & SocketOperationWrite)
{
newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs));
+ if(!sentCBs.empty())
+ {
+ ++_dispatchCount;
+ }
}
if(_state < StateClosed)
{
scheduleTimeout(newOp);
_threadPool->update(this, current.operation, newOp);
- }
+ }
if(!readyOp)
{
return;
}
-
- //
- // We increment the dispatch count to prevent the
- // communicator destruction during the callback.
- //
- if(!sentCBs.empty() || outAsync)
- {
- ++_dispatchCount;
- }
}
}
catch(const DatagramLimitException&) // Expected.
@@ -1561,12 +1785,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
return;
}
-
- if(_acmTimeout > 0)
+ if(_acmLastActivity != IceUtil::Time())
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
-
io.completed();
}
@@ -1575,7 +1797,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
try
{
_dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum,
- servantManager, adapter, outAsync, current.stream), this);
+ servantManager, adapter, outAsync, heartbeatCallback,
+ current.stream), this);
}
catch(const std::exception& ex)
{
@@ -1596,15 +1819,19 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
else
{
- dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream);
+ dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback,
+ current.stream);
}
}
void
-ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback>& sentCBs,
+ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs,
Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
- const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream)
+ const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
+ const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
{
+ int count = 0;
+
//
// Notify the factory that the connection establishment and
// validation has completed.
@@ -1612,25 +1839,30 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback
if(startCB)
{
startCB->connectionStartCompleted(this);
+ ++count;
}
//
// Notify AMI calls that the message was sent.
//
- for(vector<SentCallback>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
+ if(!sentCBs.empty())
{
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- if(p->outAsync)
- {
- p->outAsync->__sent();
- }
- if(p->replyOutAsync)
+ for(vector<OutgoingMessage>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
{
- p->replyOutAsync->__finished();
- }
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ if(p->invokeSentCallback)
+ {
+ p->outAsync->__invokeSent();
+ }
+ if(p->receivedReply)
+ {
+ OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished();
+ }
#else
- p->outAsync->__sent();
+ p->outAsync->__invokeSent();
#endif
+ }
+ ++count;
}
//
@@ -1640,6 +1872,26 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback
if(outAsync)
{
outAsync->__finished();
+ ++count;
+ }
+
+ if(heartbeatCallback)
+ {
+ try
+ {
+ heartbeatCallback->heartbeat(this);
+ }
+ catch(const std::exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "connection callback exception:\n" << ex << '\n' << _desc;
+ }
+ catch(...)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ ++count;
}
//
@@ -1650,15 +1902,21 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback
if(invokeNum)
{
invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
+
+ //
+ // Don't increase count, the dispatch count is
+ // decreased when the incoming reply is sent.
+ //
}
//
// Decrease dispatch count.
//
- if(!sentCBs.empty() || outAsync)
+ if(count > 0)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(--_dispatchCount == 0)
+ _dispatchCount -= count;
+ if(_dispatchCount == 0)
{
//
// Only initiate shutdown if not already done. It might
@@ -1679,7 +1937,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<SentCallback
}
else if(_state == StateFinished)
{
- _reaper->add(this);
+ reap();
}
notifyAll();
}
@@ -1700,7 +1958,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
// to call code that will potentially block (this avoids promoting a new leader and
// unecessary thread creation, especially if this is called on shutdown).
//
- if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty())
+ if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback)
{
finish();
return;
@@ -1762,15 +2020,13 @@ Ice::ConnectionI::finish()
// the response has been received in the meantime, we remove the message from
// _sendStreams to not call finished on a message which is already done.
//
- if(message->requestId > 0 &&
- ((message->out && _requests.find(message->requestId) == _requests.end()) ||
- (message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end())))
+ if(message->receivedReply)
{
- if(message->sent(this, true))
+ if(message->sent() && message->invokeSentCallback)
{
- assert(message->outAsync);
- message->outAsync->__sent();
+ message->outAsync->__invokeSent();
}
+ OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished();
_sendStreams.pop_front();
}
#endif
@@ -1806,6 +2062,25 @@ Ice::ConnectionI::finish()
}
_asyncRequests.clear();
+ if(_callback)
+ {
+ try
+ {
+ _callback->closed(this);
+ }
+ catch(const std::exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "connection callback exception:\n" << ex << '\n' << _desc;
+ }
+ catch(...)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ _callback = 0;
+ }
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -1815,7 +2090,7 @@ Ice::ConnectionI::finish()
setState(StateFinished);
if(_dispatchCount == 0)
{
- _reaper->add(this);
+ reap();
}
}
}
@@ -1893,14 +2168,13 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum)
if(invokeNum > 0)
{
- assert(_dispatchCount > 0);
+ assert(_dispatchCount >= invokeNum);
_dispatchCount -= invokeNum;
- assert(_dispatchCount >= 0);
if(_dispatchCount == 0)
{
if(_state == StateFinished)
{
- _reaper->add(this);
+ reap();
}
notifyAll();
}
@@ -1909,14 +2183,14 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum)
Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
const InstancePtr& instance,
- const ConnectionReaperPtr& reaper,
+ const ACMMonitorPtr& monitor,
const TransceiverPtr& transceiver,
const ConnectorPtr& connector,
const EndpointIPtr& endpoint,
const ObjectAdapterPtr& adapter) :
_communicator(communicator),
_instance(instance),
- _reaper(reaper),
+ _monitor(monitor),
_transceiver(transceiver),
_desc(transceiver->toString()),
_type(transceiver->protocol()),
@@ -1933,7 +2207,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_readTimeoutScheduled(false),
_warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
_warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0),
- _acmTimeout(0),
_compressionLevel(1),
_nextRequestId(1),
_requestsHint(_requests.end()),
@@ -1971,21 +2244,9 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_servantManager = adapterImpl->getServantManager();
}
- Int& acmTimeout = const_cast<Int&>(_acmTimeout);
- if(_endpoint->datagram())
+ if(_monitor && _monitor->getACM().timeout > 0)
{
- acmTimeout = 0;
- }
- else
- {
- if(adapterImpl)
- {
- acmTimeout = adapterImpl->getACM();
- }
- else
- {
- acmTimeout = _instance->clientACM();
- }
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
__setNoDelete(true);
@@ -2012,6 +2273,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
Ice::ConnectionI::~ConnectionI()
{
assert(!_startCallback);
+ assert(!_callback);
assert(_state == StateFinished);
assert(_dispatchCount == 0);
assert(_sendStreams.empty());
@@ -2197,15 +2459,19 @@ Ice::ConnectionI::setState(State state)
// monitor, but only if we were registered before, i.e., if our
// old state was StateActive.
//
- if(_acmTimeout > 0)
+ if(_monitor)
{
if(state == StateActive)
{
- _instance->connectionMonitor()->add(this);
+ _monitor->add(this);
+ if(_acmLastActivity != IceUtil::Time())
+ {
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ }
}
else if(_state == StateActive)
{
- _instance->connectionMonitor()->remove(this);
+ _monitor->remove(this);
}
}
@@ -2296,6 +2562,37 @@ Ice::ConnectionI::initiateShutdown()
}
}
+void
+Ice::ConnectionI::heartbeat()
+{
+ assert(_state == StateActive);
+
+ if(!_endpoint->datagram())
+ {
+ BasicStream os(_instance.get(), Ice::currentProtocolEncoding);
+ os.write(magic[0]);
+ os.write(magic[1]);
+ os.write(magic[2]);
+ os.write(magic[3]);
+ os.write(currentProtocol);
+ os.write(currentProtocolEncoding);
+ os.write(validateConnectionMsg);
+ os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ os.write(headerSize); // Message size.
+ os.i = os.b.begin();
+ try
+ {
+ OutgoingMessage message(&os, false);
+ sendMessage(message);
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ }
+ }
+}
+
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
@@ -2438,7 +2735,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
}
SocketOperation
-Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
+Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
{
if(_sendStreams.empty())
{
@@ -2461,26 +2758,14 @@ Ice::ConnectionI::sendNextMessage(vector<SentCallback>& callbacks)
// Notify the message that it was sent.
//
OutgoingMessage* message = &_sendStreams.front();
- _writeStream.swap(*message->stream);
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- bool sentCB = message->sent(this, true);
- if(sentCB || message->replyOutAsync)
+ if(message->stream)
{
- if(sentCB)
+ _writeStream.swap(*message->stream);
+ if(message->sent())
{
- callbacks.push_back(SentCallback(message->outAsync, message->replyOutAsync));
- }
- else
- {
- callbacks.push_back(SentCallback(0, message->replyOutAsync));
+ callbacks.push_back(*message);
}
}
-#else
- if(message->sent(this, true))
- {
- callbacks.push_back(SentCallback(message->outAsync));
- }
-#endif
_sendStreams.pop_front();
//
@@ -2658,7 +2943,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
traceSend(*message.stream, _logger, _traceLevels);
}
-
+
//
// Send the message without blocking.
//
@@ -2675,14 +2960,13 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
}
AsyncStatus status = AsyncStatusSent;
- if(message.sent(this, false))
+ if(message.sent())
{
status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
}
- if(_acmTimeout > 0)
+ if(_acmLastActivity != IceUtil::Time())
{
- _acmAbsoluteTimeout =
- IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
return status;
}
@@ -2737,14 +3021,13 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
_observer.finishWrite(*message.stream);
}
AsyncStatus status = AsyncStatusSent;
- if(message.sent(this, false))
+ if(message.sent())
{
status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
}
- if(_acmTimeout > 0)
+ if(_acmLastActivity != IceUtil::Time())
{
- _acmAbsoluteTimeout =
- IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
return status;
}
@@ -2911,7 +3194,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync)
+ OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3062,11 +3345,6 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
q = _asyncRequests.find(requestId);
}
- if(p == _requests.end() && q == _asyncRequests.end())
- {
- throw UnknownRequestIdException(__FILE__, __LINE__);
- }
-
if(p != _requests.end())
{
p->second->finished(stream);
@@ -3080,11 +3358,10 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
_requests.erase(p);
}
+ notifyAll(); // Notify threads blocked in close(false)
}
- else
+ else if(q != _asyncRequests.end())
{
- assert(q != _asyncRequests.end());
-
outAsync = q->second;
if(q == _asyncRequestsHint)
@@ -3108,22 +3385,29 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front();
if(message && message->outAsync.get() == outAsync.get())
{
- swap(message->replyOutAsync, outAsync);
+ message->receivedReply = true;
+ outAsync = 0;
}
-#endif
-
+ else
+ {
+ ++_dispatchCount;
+ }
+#else
+ ++_dispatchCount;
+#endif
+ notifyAll(); // Notify threads blocked in close(false)
}
- notifyAll(); // Notify threads blocked in close(false)
+
break;
}
case validateConnectionMsg:
{
traceRecv(stream, _logger, _traceLevels);
- if(_warn)
+ if(_callback)
{
- Warning out(_logger);
- out << "ignoring unexpected validate connection message:\n" << _desc;
+ heartbeatCallback = _callback;
+ ++_dispatchCount;
}
break;
}
@@ -3301,3 +3585,16 @@ ConnectionI::toConnectionState(State state) const
{
return connectionStateMap[static_cast<int>(state)];
}
+
+void
+ConnectionI::reap()
+{
+ if(_monitor)
+ {
+ _monitor->reap(this);
+ }
+ if(_observer)
+ {
+ _observer.detach();
+ }
+}