summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp307
1 files changed, 105 insertions, 202 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 2c8688b7a00..48542bf5aae 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -296,9 +296,9 @@ Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex)
}
else if(outAsync)
{
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompleted();
+ outAsync->invokeException();
}
}
@@ -317,7 +317,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
{
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
ICE_RETHROW_EXCEPTION(_exception);
}
@@ -339,7 +339,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
if(_state >= StateClosing)
{
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
ICE_RETHROW_EXCEPTION(_exception);
}
}
@@ -487,7 +487,7 @@ Ice::ConnectionI::throwException() const
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(ICE_EXCEPTION_GET(_exception))
+ if(ICE_EXCEPTION_ISSET(_exception))
{
assert(_state >= StateClosing);
ICE_RETHROW_EXCEPTION(_exception);
@@ -622,7 +622,7 @@ Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, i
// to send our request, we always try to send the request
// again.
//
- if(ICE_EXCEPTION_GET(_exception))
+ if(ICE_EXCEPTION_ISSET(_exception))
{
#ifdef ICE_CPP11_MAPPING
throw RetryException(_exception);
@@ -688,7 +688,7 @@ Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, i
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
ICE_RETHROW_EXCEPTION(_exception);
}
@@ -714,7 +714,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres
// to send our request, we always try to send the request
// again.
//
- if(ICE_EXCEPTION_GET(_exception))
+ if(ICE_EXCEPTION_ISSET(_exception))
{
#ifdef ICE_CPP11_MAPPING
throw RetryException(_exception);
@@ -780,7 +780,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
ICE_RETHROW_EXCEPTION(_exception);
}
@@ -801,87 +801,40 @@ Ice::ConnectionI::getBatchRequestQueue() const
return _batchRequestQueue;
}
+#ifdef ICE_CPP11_MAPPING
void
Ice::ConnectionI::flushBatchRequests()
{
- ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name);
- out.invoke();
+ Connection::flushBatchRequests_async().get();
}
-#ifdef ICE_CPP11_MAPPING
-function<void ()>
-Ice::ConnectionI::flushBatchRequests_async(function<void (exception_ptr)> exception,
- function<void (bool)> sent)
+std::function<void ()>
+Ice::ConnectionI::flushBatchRequests_async(::std::function<void (::std::exception_ptr)> ex,
+ ::std::function<void (bool)> sent)
{
- class FlushBatchRequestsCallback : public CallbackBase
+ class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke
{
public:
- FlushBatchRequestsCallback(function<void (exception_ptr)> exception,
- function<void (bool)> sent,
- shared_ptr<Connection> connection) :
- _exception(move(exception)),
- _sent(move(sent)),
- _connection(move(connection))
+ ConnectionFlushBatchLambda(std::shared_ptr<Ice::ConnectionI>&& connection,
+ const InstancePtr& instance,
+ std::function<void (std::exception_ptr)> ex,
+ std::function<void (bool)> sent) :
+ ConnectionFlushBatchAsync(connection, instance), LambdaInvoke(std::move(ex), std::move(sent))
{
}
-
- virtual void sent(const AsyncResultPtr& result) const
- {
- try
- {
- AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name);
- result->__wait();
- }
- catch(const ::Ice::Exception&)
- {
- _exception(current_exception());
- }
-
- if(_sent)
- {
- _sent(result->sentSynchronously());
- }
- }
-
- virtual bool hasSentCallback() const
- {
- return true;
- }
-
-
- virtual void
- completed(const ::Ice::AsyncResultPtr& result) const
- {
- try
- {
- AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name);
- result->__wait();
- }
- catch(const ::Ice::Exception&)
- {
- _exception(current_exception());
- }
- }
-
- private:
-
- function<void (exception_ptr)> _exception;
- function<void (bool)> _sent;
- shared_ptr<Connection> _connection;
};
-
- auto self = dynamic_pointer_cast<ConnectionI>(shared_from_this());
-
- auto result = make_shared<ConnectionFlushBatchAsync>(self, _communicator, _instance, __flushBatchRequests_name,
- make_shared<FlushBatchRequestsCallback>(move(exception), move(sent), self));
- result->invoke();
- return [result]()
- {
- result->cancel();
- };
+ auto outAsync = make_shared<ConnectionFlushBatchLambda>(shared_from_this(), _instance, ex, sent);
+ outAsync->invoke(__flushBatchRequests_name);
+ return [outAsync]() { outAsync->cancel(); };
}
#else
+void
+Ice::ConnectionI::flushBatchRequests()
+{
+ ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name);
+ out.invoke();
+}
AsyncResultPtr
Ice::ConnectionI::begin_flushBatchRequests()
@@ -905,14 +858,47 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR
AsyncResultPtr
Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
- ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(
- shared_from_this(),
- _communicator,
- _instance,
- __flushBatchRequests_name,
- cb,
- cookie);
- result->invoke();
+ class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion
+ {
+ public:
+
+ ConnectionFlushBatchAsyncWithCallback(const Ice::ConnectionIPtr& connection,
+ const Ice::CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const CallbackBasePtr& callback,
+ const Ice::LocalObjectPtr& cookie) :
+ ConnectionFlushBatchAsync(connection, instance),
+ CallbackCompletion(callback, cookie),
+ _communicator(communicator),
+ _connection(connection)
+ {
+ _cookie = cookie;
+ }
+
+ virtual Ice::CommunicatorPtr getCommunicator() const
+ {
+ return _communicator;
+ }
+
+ virtual Ice::ConnectionPtr getConnection() const
+ {
+ return _connection;
+ }
+
+ virtual const std::string&
+ getOperation() const
+ {
+ return __flushBatchRequests_name;
+ }
+
+ private:
+
+ Ice::CommunicatorPtr _communicator;
+ Ice::ConnectionPtr _connection;
+ };
+
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie);
+ result->invoke(__flushBatchRequests_name);
return result;
}
@@ -925,21 +911,14 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
#endif
void
-#ifdef ICE_CPP11_MAPPING
-Ice::ConnectionI::setHeartbeatCallback(std::function<void (std::shared_ptr<::Ice::Connection>)> callback)
-#else
-Ice::ConnectionI::setHeartbeatCallback(const Ice::HeartbeatCallbackPtr& callback)
-#endif
+Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
_heartbeatCallback = callback;
}
+
void
-#ifdef ICE_CPP11_MAPPING
-Ice::ConnectionI::setCloseCallback(std::function<void (std::shared_ptr<::Ice::Connection>)> callback)
-#else
-Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback)
-#endif
+Ice::ConnectionI::setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK) callback)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state >= StateClosed)
@@ -949,15 +928,12 @@ Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback)
class CallbackWorkItem : public DispatchWorkItem
{
public:
-#ifdef ICE_CPP11_MAPPING
- CallbackWorkItem(const ConnectionIPtr& connection,
- std::function<void (std::shared_ptr<Ice::Connection>)> callback) :
+
+ CallbackWorkItem(const ConnectionIPtr& connection, ICE_IN(ICE_CLOSE_CALLBACK) callback) :
_connection(connection),
+#ifdef ICE_CPP11_MAPPING
_callback(move(callback))
#else
- CallbackWorkItem(const ConnectionIPtr& connection,
- const Ice::CloseCallbackPtr& callback) :
- _connection(connection),
_callback(callback)
#endif
{
@@ -1192,9 +1168,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
o->canceled(false);
_sendStreams.erase(o);
}
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
}
return;
@@ -1215,9 +1191,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
{
_asyncRequests.erase(_asyncRequestsHint);
_asyncRequestsHint = _asyncRequests.end();
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
}
return;
@@ -1236,9 +1212,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
{
assert(p != _asyncRequestsHint);
_asyncRequests.erase(p);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
}
return;
@@ -1266,7 +1242,7 @@ Ice::ConnectionI::sendResponse(Int, OutputStream* os, Byte compressFlag, bool /*
if(_state >= StateClosed)
{
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
ICE_RETHROW_EXCEPTION(_exception);
}
@@ -1305,7 +1281,7 @@ Ice::ConnectionI::sendNoResponse()
if(_state >= StateClosed)
{
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
ICE_RETHROW_EXCEPTION(_exception);
}
@@ -1828,10 +1804,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
if(p->receivedReply)
{
- OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync);
- if(outAsync->completed())
+ OutgoingAsyncPtr o = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync);
+ if(o->response())
{
- outAsync->invokeCompleted();
+ o->invokeResponse();
}
}
#else
@@ -1847,7 +1823,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->invokeCompleted();
+ outAsync->invokeResponse();
++dispatchedCount;
}
@@ -1964,20 +1940,15 @@ Ice::ConnectionI::finish(bool close)
{
string verb = _connector ? "establish" : "accept";
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
-#ifdef ICE_CPP11_MAPPING
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const Ice::Exception& ex)
{
out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString()
<< "\n" << ex;
}
-#else
- out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString()
- << "\n" << *_exception.get();
-#endif
}
}
else
@@ -1990,10 +1961,9 @@ Ice::ConnectionI::finish(bool close)
//
// Trace the cause of unexpected connection closures
//
-#ifdef ICE_CPP11_MAPPING
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const Ice::LocalException& ex)
{
@@ -2006,16 +1976,6 @@ Ice::ConnectionI::finish(bool close)
out << "\n" << ex;
}
}
-#else
- if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
- dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get())))
- {
- out << "\n" << *_exception.get();
- }
-#endif
}
}
@@ -2026,18 +1986,14 @@ Ice::ConnectionI::finish(bool close)
if(_startCallback)
{
-#ifdef ICE_CPP11_MAPPING
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const LocalException& ex)
{
_startCallback->connectionStartFailed(shared_from_this(), ex);
}
-#else
- _startCallback->connectionStartFailed(shared_from_this(), *_exception.get());
-#endif
_startCallback = 0;
}
@@ -2067,9 +2023,9 @@ Ice::ConnectionI::finish(bool close)
if(message->receivedReply)
{
OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, message->outAsync);
- if(outAsync->completed())
+ if(outAsync->response())
{
- outAsync->invokeCompleted();
+ outAsync->invokeResponse();
}
}
_sendStreams.pop_front();
@@ -2077,10 +2033,9 @@ Ice::ConnectionI::finish(bool close)
#endif
}
-#ifdef ICE_CPP11_MAPPING
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const Ice::LocalException& ex)
{
@@ -2100,30 +2055,12 @@ Ice::ConnectionI::finish(bool close)
}
}
}
-#else
- for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
- {
- o->completed(*_exception.get());
- if(o->requestId) // Make sure finished isn't called twice.
- {
- if(o->out)
- {
- _requests.erase(o->requestId);
- }
- else
- {
- _asyncRequests.erase(o->requestId);
- }
- }
- }
-#endif
_sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
}
-#ifdef ICE_CPP11_MAPPING
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const Ice::LocalException& ex)
{
@@ -2136,28 +2073,12 @@ Ice::ConnectionI::finish(bool close)
for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- if(q->second->completed(ex))
+ if(q->second->exception(ex))
{
- q->second->invokeCompleted();
+ q->second->invokeException();
}
}
}
-#else
- for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- p->second->completed(*_exception.get());
- }
-
- _requests.clear();
-
- for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
- {
- if(q->second->completed(*_exception.get()))
- {
- q->second->invokeCompleted();
- }
- }
-#endif
_asyncRequests.clear();
@@ -2378,7 +2299,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
return;
}
- if(!ICE_EXCEPTION_GET(_exception))
+ if(!ICE_EXCEPTION_ISSET(_exception))
{
//
// If we are in closed state, an exception must be set.
@@ -2510,18 +2431,15 @@ Ice::ConnectionI::setState(State state)
return;
}
-#ifdef ICE_CPP11_MAPPING
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const Ice::LocalException& ex)
{
_batchRequestQueue->destroy(ex);
}
-#else
- _batchRequestQueue->destroy(*_exception.get());
-#endif
+
//
// Don't need to close now for connections so only close the transceiver
// if the selector request it.
@@ -2580,12 +2498,11 @@ Ice::ConnectionI::setState(State state)
newState,
_observer.get()));
}
-#ifdef ICE_CPP11_MAPPING
- if(_observer && state == StateClosed && _exception)
+ if(_observer && state == StateClosed && ICE_EXCEPTION_ISSET(_exception))
{
try
{
- rethrow_exception(_exception);
+ ICE_RETHROW_EXCEPTION(_exception);
}
catch(const Ice::LocalException& ex)
{
@@ -2600,20 +2517,6 @@ Ice::ConnectionI::setState(State state)
}
}
}
-#else
- if(_observer && state == StateClosed && _exception.get())
- {
- if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
- dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
- (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing)))
- {
- _observer->failed(_exception->ice_id());
- }
- }
-#endif
}
_state = state;
@@ -2708,7 +2611,7 @@ Ice::ConnectionI::heartbeat()
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- assert(ICE_EXCEPTION_GET(_exception));
+ assert(ICE_EXCEPTION_ISSET(_exception));
}
}
}
@@ -3540,7 +3443,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
- else if(outAsync->completed())
+ else if(outAsync->response())
{
++dispatchCount;
}
@@ -3549,7 +3452,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request
outAsync = 0;
}
#else
- if(outAsync->completed())
+ if(outAsync->response())
{
++dispatchCount;
}