summaryrefslogtreecommitdiff
path: root/cpp/src/Ice
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2017-01-30 13:45:21 -0800
committerMark Spruiell <mes@zeroc.com>2017-01-30 13:45:21 -0800
commit61270a10f980933cf582edb766f10c8ac6d86e8a (patch)
tree45ab4a7c2986954054fce613bc3c8f7967e7951e /cpp/src/Ice
parentFix slice2cpp build failure (diff)
downloadice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.bz2
ice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.xz
ice-61270a10f980933cf582edb766f10c8ac6d86e8a.zip
merging IceBridge into master
Diffstat (limited to 'cpp/src/Ice')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp211
-rw-r--r--cpp/src/Ice/ConnectionI.h20
-rw-r--r--cpp/src/Ice/Exception.cpp8
-rw-r--r--cpp/src/Ice/ProxyFactory.cpp7
4 files changed, 213 insertions, 33 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 7845259855c..23388d399c6 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -409,29 +409,31 @@ Ice::ConnectionI::destroy(DestructionReason reason)
}
void
-Ice::ConnectionI::close(bool force)
+Ice::ConnectionI::close(ConnectionClose mode)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(force)
+ if(mode == CloseForcefully)
{
- setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosed, ConnectionManuallyClosedException(__FILE__, __LINE__, false));
+ }
+ else if(mode == CloseGracefully)
+ {
+ setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true));
}
else
{
+ assert(mode == CloseGracefullyAndWait);
+
//
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise, the
- // CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the server
- // has processed them or not.
+ // Wait until all outstanding requests have been completed.
//
while(!_asyncRequests.empty())
{
wait();
}
- setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true));
}
}
@@ -550,13 +552,13 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
//
// We send a heartbeat if there was no activity in the last
// (timeout / 4) period. Sending a heartbeat sooner than really
- // needed is safer to ensure that the receiver will receive in
- // time the heartbeat. Sending the heartbeat if there was no
+ // needed is safer to ensure that the receiver will receive the
+ // heartbeat in time. Sending the heartbeat if there was no
// activity in the last (timeout / 2) period isn't enough since
// monitor() is called only every (timeout / 2) period.
//
// Note that this doesn't imply that we are sending 4 heartbeats
- // per timeout period because the monitor() method is sill only
+ // per timeout period because the monitor() method is still only
// called every (timeout / 2) period.
//
if(acm.heartbeat == HeartbeatAlways ||
@@ -564,7 +566,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
{
if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0)
{
- heartbeat();
+ sendHeartbeatNow();
}
}
@@ -802,6 +804,174 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
}
#endif
+namespace
+{
+
+const ::std::string __heartbeat_name = "heartbeat";
+
+class HeartbeatAsync : public OutgoingAsyncBase
+{
+public:
+
+ HeartbeatAsync(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance) :
+ OutgoingAsyncBase(instance),
+ _communicator(communicator),
+ _connection(connection)
+ {
+ }
+
+ virtual CommunicatorPtr getCommunicator() const
+ {
+ return _communicator;
+ }
+
+ virtual ConnectionPtr getConnection() const
+ {
+ return _connection;
+ }
+
+ virtual const string& getOperation() const
+ {
+ return __heartbeat_name;
+ }
+
+ void invoke()
+ {
+ _observer.attach(_instance.get(), __heartbeat_name);
+ try
+ {
+ _os.write(magic[0]);
+ _os.write(magic[1]);
+ _os.write(magic[2]);
+ _os.write(magic[3]);
+ _os.write(currentProtocol);
+ _os.write(currentProtocolEncoding);
+ _os.write(validateConnectionMsg);
+ _os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ _os.write(headerSize); // Message size.
+ _os.i = _os.b.begin();
+
+ AsyncStatus status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, 0);
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(const RetryException& ex)
+ {
+ if(exception(*ex.get()))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ catch(const Exception& ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ }
+
+private:
+
+ CommunicatorPtr _communicator;
+ ConnectionIPtr _connection;
+};
+typedef IceUtil::Handle<HeartbeatAsync> HeartbeatAsyncPtr;
+
+}
+
+#ifdef ICE_CPP11_MAPPING
+void
+Ice::ConnectionI::heartbeat()
+{
+ Connection::heartbeatAsync().get();
+}
+
+std::function<void()>
+Ice::ConnectionI::heartbeatAsync(::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent)
+{
+ class HeartbeatLambda : public HeartbeatAsync, public LambdaInvoke
+ {
+ public:
+
+ HeartbeatLambda(std::shared_ptr<Ice::ConnectionI>&& connection,
+ std::shared_ptr<Ice::Communicator>& communicator,
+ const InstancePtr& instance,
+ std::function<void(std::exception_ptr)> ex,
+ std::function<void(bool)> sent) :
+ HeartbeatAsync(connection, communicator, instance), LambdaInvoke(std::move(ex), std::move(sent))
+ {
+ }
+ };
+ auto outAsync = make_shared<HeartbeatLambda>(ICE_SHARED_FROM_THIS, _communicator, _instance, ex, sent);
+ outAsync->invoke();
+ return [outAsync]() { outAsync->cancel(); };
+}
+#else
+void
+Ice::ConnectionI::heartbeat()
+{
+ end_heartbeat(begin_heartbeat());
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat()
+{
+ return _iceI_begin_heartbeat(dummyCallback, 0);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+{
+ return _iceI_begin_heartbeat(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat(const Callback_Connection_heartbeatPtr& cb, const LocalObjectPtr& cookie)
+{
+ return _iceI_begin_heartbeat(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+{
+ class HeartbeatCallback : public HeartbeatAsync, public CallbackCompletion
+ {
+ public:
+
+ HeartbeatCallback(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const CallbackBasePtr& callback,
+ const LocalObjectPtr& cookie) :
+ HeartbeatAsync(connection, communicator, instance),
+ CallbackCompletion(callback, cookie)
+ {
+ _cookie = cookie;
+ }
+ };
+
+ HeartbeatAsyncPtr result = new HeartbeatCallback(this, _communicator, _instance, cb, cookie);
+ result->invoke();
+ return result;
+}
+
+void
+Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r)
+{
+ AsyncResult::check(r, this, __heartbeat_name);
+ r->waitForResponse();
+}
+#endif
+
void
Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback)
{
@@ -1755,7 +1925,7 @@ Ice::ConnectionI::finish(bool close)
out << "closed " << _endpoint->protocol() << " connection\n" << toString();
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get())))
@@ -2076,7 +2246,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
// Don't warn about certain expected exceptions.
//
if(!(dynamic_cast<const CloseConnectionException*>(&ex) ||
- dynamic_cast<const ForcedCloseConnectionException*>(&ex) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(&ex) ||
dynamic_cast<const ConnectionTimeoutException*>(&ex) ||
dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
@@ -2255,7 +2425,7 @@ Ice::ConnectionI::setState(State state)
if(_observer && state == StateClosed && _exception)
{
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
@@ -2285,8 +2455,7 @@ Ice::ConnectionI::setState(State state)
void
Ice::ConnectionI::initiateShutdown()
{
- assert(_state == StateClosing);
- assert(_dispatchCount == 0);
+ assert(_state == StateClosing && _dispatchCount == 0);
if(_shutdownInitiated)
{
@@ -2316,7 +2485,7 @@ Ice::ConnectionI::initiateShutdown()
setState(StateClosingPending);
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
SocketOperation op = _transceiver->closing(true, *_exception);
if(op)
@@ -2329,7 +2498,7 @@ Ice::ConnectionI::initiateShutdown()
}
void
-Ice::ConnectionI::heartbeat()
+Ice::ConnectionI::sendHeartbeatNow()
{
assert(_state == StateActive);
@@ -3016,7 +3185,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request
setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__));
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
SocketOperation op = _transceiver->closing(false, *_exception);
if(op)
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 72652518b94..7fb3781e880 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -157,12 +157,12 @@ public:
void activate();
void hold();
void destroy(DestructionReason);
- virtual void close(bool); // From Connection.
+ virtual void close(ConnectionClose); // From Connection.
bool isActiveOrHolding() const;
bool isFinished() const;
- void throwException() const; // Throws the connection exception if destroyed.
+ virtual void throwException() const; // From Connection. Throws the connection exception if destroyed.
void waitUntilHolding() const;
void waitUntilFinished(); // Not const, as this might close the connection upon timeout.
@@ -193,6 +193,19 @@ public:
virtual void setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK));
virtual void setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK));
+ virtual void heartbeat();
+
+#ifdef ICE_CPP11_MAPPING
+ virtual std::function<void()>
+ heartbeatAsync(::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr);
+#else
+ virtual AsyncResultPtr begin_heartbeat();
+ virtual AsyncResultPtr begin_heartbeat(const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_heartbeat(const Callback_Connection_heartbeatPtr&, const LocalObjectPtr& = 0);
+
+ virtual void end_heartbeat(const AsyncResultPtr&);
+#endif
+
virtual void setACM(const IceUtil::Optional<int>&,
const IceUtil::Optional<ACMClose>&,
const IceUtil::Optional<ACMHeartbeat>&);
@@ -276,7 +289,7 @@ private:
void setState(State);
void initiateShutdown();
- void heartbeat();
+ void sendHeartbeatNow();
bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
@@ -308,6 +321,7 @@ private:
#ifndef ICE_CPP11_MAPPING
AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
#endif
Ice::CommunicatorPtr _communicator;
diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp
index 4ac7d66cdd3..a20c564101e 100644
--- a/cpp/src/Ice/Exception.cpp
+++ b/cpp/src/Ice/Exception.cpp
@@ -632,14 +632,10 @@ Ice::CloseConnectionException::ice_print(ostream& out) const
}
void
-Ice::ForcedCloseConnectionException::ice_print(ostream& out) const
+Ice::ConnectionManuallyClosedException::ice_print(ostream& out) const
{
Exception::ice_print(out);
- out << ":\nprotocol error: connection forcefully closed";
- if(!reason.empty())
- {
- out << ":\n" << reason;
- }
+ out << ":\nprotocol error: connection manually closed (" << (graceful ? "gracefully" : "forcefully") << ")";
}
void
diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp
index eba949508b6..34503ec961b 100644
--- a/cpp/src/Ice/ProxyFactory.cpp
+++ b/cpp/src/Ice/ProxyFactory.cpp
@@ -201,11 +201,12 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co
}
//
- // Don't retry if the communicator is destroyed or object adapter
- // deactivated.
+ // Don't retry if the communicator is destroyed, object adapter is deactivated,
+ // or connection is manually closed.
//
if(dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex))
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(&ex))
{
ex.ice_throw();
}