summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp104
1 files changed, 63 insertions, 41 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index afd26ebdb32..f86bf0e6b58 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -47,7 +47,6 @@ namespace
const ::std::string __flushBatchRequests_name = "flushBatchRequests";
-
class TimeoutCallback : public IceUtil::TimerTask
{
public:
@@ -74,7 +73,7 @@ public:
DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
- const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
+ const OutgoingAsyncBasePtr& outAsync, const ICE_HEARTBEAT_CALLBACK& heartbeatCallback,
BasicStream& stream) :
DispatchWorkItem(connection),
_connection(connection),
@@ -110,7 +109,7 @@ private:
const ServantManagerPtr _servantManager;
const ObjectAdapterPtr _adapter;
const OutgoingAsyncBasePtr _outAsync;
- const ConnectionCallbackPtr _heartbeatCallback;
+ const ICE_HEARTBEAT_CALLBACK _heartbeatCallback;
BasicStream _stream;
};
@@ -540,7 +539,7 @@ Ice::ConnectionI::updateObserver()
}
assert(_instance->initializationData().observer);
-
+
ConnectionObserverPtr o = _instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
_endpoint,
toConnectionState(_state),
@@ -959,50 +958,66 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
#endif
void
-Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
+#ifdef ICE_CPP11_MAPPING
+Ice::ConnectionI::setHeartbeatCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback)
+#else
+Ice::ConnectionI::setHeartbeatCallback(const Ice::HeartbeatCallbackPtr& callback)
+#endif
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _heartbeatCallback = callback;
+}
+void
+#ifdef ICE_CPP11_MAPPING
+Ice::ConnectionI::setCloseCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback)
+#else
+Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback)
+#endif
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_state >= StateClosed)
+ if(callback)
{
- if(callback)
+ class CallbackWorkItem : public DispatchWorkItem
{
- class CallbackWorkItem : public DispatchWorkItem
- {
- public:
+ public:
- CallbackWorkItem(const ConnectionIPtr& connection, const ConnectionCallbackPtr& callback) :
- _connection(connection),
- _callback(callback)
- {
- }
+ CallbackWorkItem(const ConnectionIPtr& connection, const ICE_CLOSE_CALLBACK& callback) :
+ _connection(connection),
+ _callback(callback)
+ {
+ }
- virtual void run()
- {
- _connection->closeCallback(_callback);
- }
+ virtual void run()
+ {
+ _connection->closeCallback(_callback);
+ }
- private:
+ private:
- const ConnectionIPtr _connection;
- const ConnectionCallbackPtr _callback;
- };
- _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback));
- }
- }
- else
- {
- _callback = callback;
+ const ConnectionIPtr _connection;
+ const ICE_CLOSE_CALLBACK _callback;
+ };
+ _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback));
}
}
+ else
+ {
+ _closeCallback = callback;
+ }
}
void
-Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback)
+Ice::ConnectionI::closeCallback(const ICE_CLOSE_CALLBACK& callback)
{
try
{
+#ifdef ICE_CPP11_MAPPING
+ callback(shared_from_this());
+#else
callback->closed(shared_from_this());
+#endif
}
catch(const std::exception& ex)
{
@@ -1537,7 +1552,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
ServantManagerPtr servantManager;
ObjectAdapterPtr adapter;
OutgoingAsyncBasePtr outAsync;
- ConnectionCallbackPtr heartbeatCallback;
+ ICE_HEARTBEAT_CALLBACK heartbeatCallback;
int dispatchCount = 0;
ThreadPoolMessage<ConnectionI> msg(current, *this);
@@ -1807,7 +1822,7 @@ void
ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs,
Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync,
- const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
+ const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream)
{
int dispatchedCount = 0;
@@ -1862,7 +1877,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
{
try
{
+#ifdef ICE_CPP11_MAPPING
+ heartbeatCallback(shared_from_this());
+#else
heartbeatCallback->heartbeat(shared_from_this());
+#endif
}
catch(const std::exception& ex)
{
@@ -1941,7 +1960,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
// to call code that will potentially block (this avoids promoting a new leader and
// unecessary thread creation, especially if this is called on shutdown).
//
- if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback)
+ if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_closeCallback && !_heartbeatCallback)
{
finish(close);
return;
@@ -2080,12 +2099,14 @@ Ice::ConnectionI::finish(bool close)
_readStream.clear();
_readStream.b.clear();
- if(_callback)
+ if(_closeCallback)
{
- closeCallback(_callback);
- _callback = 0;
+ closeCallback(_closeCallback);
+ _closeCallback = 0;
}
+ _heartbeatCallback = 0;
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -2264,7 +2285,8 @@ Ice::ConnectionI::create(const CommunicatorPtr& communicator,
Ice::ConnectionI::~ConnectionI()
{
assert(!_startCallback);
- assert(!_callback);
+ assert(!_closeCallback);
+ assert(!_heartbeatCallback);
assert(_state == StateFinished);
assert(_dispatchCount == 0);
assert(_sendStreams.empty());
@@ -3206,7 +3228,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ OutgoingAsyncBasePtr& outAsync, ICE_HEARTBEAT_CALLBACK& heartbeatCallback,
int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3433,9 +3455,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
case validateConnectionMsg:
{
traceRecv(stream, _logger, _traceLevels);
- if(_callback)
+ if(_heartbeatCallback)
{
- heartbeatCallback = _callback;
+ heartbeatCallback = _heartbeatCallback;
++dispatchCount;
}
break;