summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2017-01-30 13:45:21 -0800
committerMark Spruiell <mes@zeroc.com>2017-01-30 13:45:21 -0800
commit61270a10f980933cf582edb766f10c8ac6d86e8a (patch)
tree45ab4a7c2986954054fce613bc3c8f7967e7951e /cpp/src
parentFix slice2cpp build failure (diff)
downloadice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.bz2
ice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.xz
ice-61270a10f980933cf582edb766f10c8ac6d86e8a.zip
merging IceBridge into master
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Glacier2/RoutingTable.cpp2
-rw-r--r--cpp/src/Glacier2/SessionRouterI.cpp10
-rw-r--r--cpp/src/Ice/ConnectionI.cpp211
-rw-r--r--cpp/src/Ice/ConnectionI.h20
-rw-r--r--cpp/src/Ice/Exception.cpp8
-rw-r--r--cpp/src/Ice/ProxyFactory.cpp7
-rw-r--r--cpp/src/IceBridge/IceBridge.cpp837
-rw-r--r--cpp/src/IceBridge/Makefile.mk15
-rw-r--r--cpp/src/IceGrid/SessionManager.cpp2
9 files changed, 1072 insertions, 40 deletions
diff --git a/cpp/src/Glacier2/RoutingTable.cpp b/cpp/src/Glacier2/RoutingTable.cpp
index fde938fba00..e6b090de4cb 100644
--- a/cpp/src/Glacier2/RoutingTable.cpp
+++ b/cpp/src/Glacier2/RoutingTable.cpp
@@ -65,7 +65,7 @@ Glacier2::RoutingTable::add(const ObjectProxySeq& unfiltered, const Current& cur
if(!_verifier->verify(*prx))
{
- current.con->close(true);
+ current.con->close(CloseForcefully);
throw ObjectNotExistException(__FILE__, __LINE__);
}
ObjectPrx proxy = (*prx)->ice_twoway()->ice_secure(false)->ice_facet(""); // We add proxies in default form.
diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp
index 9f774b79a8e..512d371611f 100644
--- a/cpp/src/Glacier2/SessionRouterI.cpp
+++ b/cpp/src/Glacier2/SessionRouterI.cpp
@@ -80,7 +80,7 @@ public:
// Close the connection otherwise the peer has no way to know that
// the session has gone.
//
- _connection->close(true);
+ _connection->close(CloseForcefully);
_router->destroySession(_connection);
}
}
@@ -922,7 +922,7 @@ SessionRouterI::refreshSession(const Ice::ConnectionPtr& con)
// Close the connection otherwise the peer has no way to know that the
// session has gone.
//
- con->close(true);
+ con->close(CloseForcefully);
throw SessionNotExistException();
}
}
@@ -1149,10 +1149,10 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi
if(_rejectTraceLevel >= 1)
{
Trace out(_instance->logger(), "Glacier2");
- out << "rejecting request. no session is associated with the connection.\n";
- out << "identity: " << _instance->communicator()->identityToString(id);
+ out << "rejecting request, no session is associated with the connection.\n";
+ out << "identity: " << identityToString(id);
}
- connection->close(true);
+ connection->close(CloseForcefully);
throw ObjectNotExistException(__FILE__, __LINE__);
}
return 0;
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 7845259855c..23388d399c6 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -409,29 +409,31 @@ Ice::ConnectionI::destroy(DestructionReason reason)
}
void
-Ice::ConnectionI::close(bool force)
+Ice::ConnectionI::close(ConnectionClose mode)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(force)
+ if(mode == CloseForcefully)
{
- setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosed, ConnectionManuallyClosedException(__FILE__, __LINE__, false));
+ }
+ else if(mode == CloseGracefully)
+ {
+ setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true));
}
else
{
+ assert(mode == CloseGracefullyAndWait);
+
//
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise, the
- // CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the server
- // has processed them or not.
+ // Wait until all outstanding requests have been completed.
//
while(!_asyncRequests.empty())
{
wait();
}
- setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true));
}
}
@@ -550,13 +552,13 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
//
// We send a heartbeat if there was no activity in the last
// (timeout / 4) period. Sending a heartbeat sooner than really
- // needed is safer to ensure that the receiver will receive in
- // time the heartbeat. Sending the heartbeat if there was no
+ // needed is safer to ensure that the receiver will receive the
+ // heartbeat in time. Sending the heartbeat if there was no
// activity in the last (timeout / 2) period isn't enough since
// monitor() is called only every (timeout / 2) period.
//
// Note that this doesn't imply that we are sending 4 heartbeats
- // per timeout period because the monitor() method is sill only
+ // per timeout period because the monitor() method is still only
// called every (timeout / 2) period.
//
if(acm.heartbeat == HeartbeatAlways ||
@@ -564,7 +566,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
{
if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0)
{
- heartbeat();
+ sendHeartbeatNow();
}
}
@@ -802,6 +804,174 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
}
#endif
+namespace
+{
+
+const ::std::string __heartbeat_name = "heartbeat";
+
+class HeartbeatAsync : public OutgoingAsyncBase
+{
+public:
+
+ HeartbeatAsync(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance) :
+ OutgoingAsyncBase(instance),
+ _communicator(communicator),
+ _connection(connection)
+ {
+ }
+
+ virtual CommunicatorPtr getCommunicator() const
+ {
+ return _communicator;
+ }
+
+ virtual ConnectionPtr getConnection() const
+ {
+ return _connection;
+ }
+
+ virtual const string& getOperation() const
+ {
+ return __heartbeat_name;
+ }
+
+ void invoke()
+ {
+ _observer.attach(_instance.get(), __heartbeat_name);
+ try
+ {
+ _os.write(magic[0]);
+ _os.write(magic[1]);
+ _os.write(magic[2]);
+ _os.write(magic[3]);
+ _os.write(currentProtocol);
+ _os.write(currentProtocolEncoding);
+ _os.write(validateConnectionMsg);
+ _os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ _os.write(headerSize); // Message size.
+ _os.i = _os.b.begin();
+
+ AsyncStatus status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, 0);
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(const RetryException& ex)
+ {
+ if(exception(*ex.get()))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ catch(const Exception& ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ }
+
+private:
+
+ CommunicatorPtr _communicator;
+ ConnectionIPtr _connection;
+};
+typedef IceUtil::Handle<HeartbeatAsync> HeartbeatAsyncPtr;
+
+}
+
+#ifdef ICE_CPP11_MAPPING
+void
+Ice::ConnectionI::heartbeat()
+{
+ Connection::heartbeatAsync().get();
+}
+
+std::function<void()>
+Ice::ConnectionI::heartbeatAsync(::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent)
+{
+ class HeartbeatLambda : public HeartbeatAsync, public LambdaInvoke
+ {
+ public:
+
+ HeartbeatLambda(std::shared_ptr<Ice::ConnectionI>&& connection,
+ std::shared_ptr<Ice::Communicator>& communicator,
+ const InstancePtr& instance,
+ std::function<void(std::exception_ptr)> ex,
+ std::function<void(bool)> sent) :
+ HeartbeatAsync(connection, communicator, instance), LambdaInvoke(std::move(ex), std::move(sent))
+ {
+ }
+ };
+ auto outAsync = make_shared<HeartbeatLambda>(ICE_SHARED_FROM_THIS, _communicator, _instance, ex, sent);
+ outAsync->invoke();
+ return [outAsync]() { outAsync->cancel(); };
+}
+#else
+void
+Ice::ConnectionI::heartbeat()
+{
+ end_heartbeat(begin_heartbeat());
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat()
+{
+ return _iceI_begin_heartbeat(dummyCallback, 0);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+{
+ return _iceI_begin_heartbeat(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat(const Callback_Connection_heartbeatPtr& cb, const LocalObjectPtr& cookie)
+{
+ return _iceI_begin_heartbeat(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+{
+ class HeartbeatCallback : public HeartbeatAsync, public CallbackCompletion
+ {
+ public:
+
+ HeartbeatCallback(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const CallbackBasePtr& callback,
+ const LocalObjectPtr& cookie) :
+ HeartbeatAsync(connection, communicator, instance),
+ CallbackCompletion(callback, cookie)
+ {
+ _cookie = cookie;
+ }
+ };
+
+ HeartbeatAsyncPtr result = new HeartbeatCallback(this, _communicator, _instance, cb, cookie);
+ result->invoke();
+ return result;
+}
+
+void
+Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r)
+{
+ AsyncResult::check(r, this, __heartbeat_name);
+ r->waitForResponse();
+}
+#endif
+
void
Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback)
{
@@ -1755,7 +1925,7 @@ Ice::ConnectionI::finish(bool close)
out << "closed " << _endpoint->protocol() << " connection\n" << toString();
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get())))
@@ -2076,7 +2246,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
// Don't warn about certain expected exceptions.
//
if(!(dynamic_cast<const CloseConnectionException*>(&ex) ||
- dynamic_cast<const ForcedCloseConnectionException*>(&ex) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(&ex) ||
dynamic_cast<const ConnectionTimeoutException*>(&ex) ||
dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
@@ -2255,7 +2425,7 @@ Ice::ConnectionI::setState(State state)
if(_observer && state == StateClosed && _exception)
{
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
@@ -2285,8 +2455,7 @@ Ice::ConnectionI::setState(State state)
void
Ice::ConnectionI::initiateShutdown()
{
- assert(_state == StateClosing);
- assert(_dispatchCount == 0);
+ assert(_state == StateClosing && _dispatchCount == 0);
if(_shutdownInitiated)
{
@@ -2316,7 +2485,7 @@ Ice::ConnectionI::initiateShutdown()
setState(StateClosingPending);
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
SocketOperation op = _transceiver->closing(true, *_exception);
if(op)
@@ -2329,7 +2498,7 @@ Ice::ConnectionI::initiateShutdown()
}
void
-Ice::ConnectionI::heartbeat()
+Ice::ConnectionI::sendHeartbeatNow()
{
assert(_state == StateActive);
@@ -3016,7 +3185,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request
setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__));
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
SocketOperation op = _transceiver->closing(false, *_exception);
if(op)
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 72652518b94..7fb3781e880 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -157,12 +157,12 @@ public:
void activate();
void hold();
void destroy(DestructionReason);
- virtual void close(bool); // From Connection.
+ virtual void close(ConnectionClose); // From Connection.
bool isActiveOrHolding() const;
bool isFinished() const;
- void throwException() const; // Throws the connection exception if destroyed.
+ virtual void throwException() const; // From Connection. Throws the connection exception if destroyed.
void waitUntilHolding() const;
void waitUntilFinished(); // Not const, as this might close the connection upon timeout.
@@ -193,6 +193,19 @@ public:
virtual void setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK));
virtual void setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK));
+ virtual void heartbeat();
+
+#ifdef ICE_CPP11_MAPPING
+ virtual std::function<void()>
+ heartbeatAsync(::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr);
+#else
+ virtual AsyncResultPtr begin_heartbeat();
+ virtual AsyncResultPtr begin_heartbeat(const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_heartbeat(const Callback_Connection_heartbeatPtr&, const LocalObjectPtr& = 0);
+
+ virtual void end_heartbeat(const AsyncResultPtr&);
+#endif
+
virtual void setACM(const IceUtil::Optional<int>&,
const IceUtil::Optional<ACMClose>&,
const IceUtil::Optional<ACMHeartbeat>&);
@@ -276,7 +289,7 @@ private:
void setState(State);
void initiateShutdown();
- void heartbeat();
+ void sendHeartbeatNow();
bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
@@ -308,6 +321,7 @@ private:
#ifndef ICE_CPP11_MAPPING
AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
#endif
Ice::CommunicatorPtr _communicator;
diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp
index 4ac7d66cdd3..a20c564101e 100644
--- a/cpp/src/Ice/Exception.cpp
+++ b/cpp/src/Ice/Exception.cpp
@@ -632,14 +632,10 @@ Ice::CloseConnectionException::ice_print(ostream& out) const
}
void
-Ice::ForcedCloseConnectionException::ice_print(ostream& out) const
+Ice::ConnectionManuallyClosedException::ice_print(ostream& out) const
{
Exception::ice_print(out);
- out << ":\nprotocol error: connection forcefully closed";
- if(!reason.empty())
- {
- out << ":\n" << reason;
- }
+ out << ":\nprotocol error: connection manually closed (" << (graceful ? "gracefully" : "forcefully") << ")";
}
void
diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp
index eba949508b6..34503ec961b 100644
--- a/cpp/src/Ice/ProxyFactory.cpp
+++ b/cpp/src/Ice/ProxyFactory.cpp
@@ -201,11 +201,12 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co
}
//
- // Don't retry if the communicator is destroyed or object adapter
- // deactivated.
+ // Don't retry if the communicator is destroyed, object adapter is deactivated,
+ // or connection is manually closed.
//
if(dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex))
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(&ex))
{
ex.ice_throw();
}
diff --git a/cpp/src/IceBridge/IceBridge.cpp b/cpp/src/IceBridge/IceBridge.cpp
new file mode 100644
index 00000000000..31ec58aae6c
--- /dev/null
+++ b/cpp/src/IceBridge/IceBridge.cpp
@@ -0,0 +1,837 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Connection.h>
+#include <Ice/ObjectAdapter.h>
+#include <Ice/Service.h>
+#include <Ice/UUID.h>
+#include <IceUtil/Options.h>
+
+using namespace std;
+using namespace Ice;
+
+namespace
+{
+
+//
+// Represents a pending twoway invocation.
+//
+class Invocation : public IceUtil::Shared
+{
+public:
+
+ Invocation(const AMD_Object_ice_invokePtr& cb) :
+ _cb(cb)
+ {
+ }
+
+ void success(bool ok, const pair<const Byte*, const Byte*>& results)
+ {
+ _cb->ice_response(ok, results);
+ }
+
+ void exception(const Exception& ex)
+ {
+ _cb->ice_exception(ex);
+ }
+
+private:
+
+ AMD_Object_ice_invokePtr _cb;
+};
+typedef IceUtil::Handle<Invocation> InvocationPtr;
+
+//
+// Represents a pending oneway invocation.
+//
+class OnewayInvocation : public IceUtil::Shared
+{
+public:
+
+ OnewayInvocation(const AMD_Object_ice_invokePtr& cb) :
+ _cb(cb)
+ {
+ }
+
+ void success(bool, const pair<const Byte*, const Byte*>&)
+ {
+ assert(false);
+ }
+
+ void exception(const Exception& ex)
+ {
+ _cb->ice_exception(ex);
+ }
+
+ void sent(bool sentSynchronously)
+ {
+ _cb->ice_response(true, vector<Byte>());
+ }
+
+private:
+
+ AMD_Object_ice_invokePtr _cb;
+};
+typedef IceUtil::Handle<OnewayInvocation> OnewayInvocationPtr;
+
+//
+// Holds information about an incoming invocation that's been queued until an outgoing connection has
+// been established.
+//
+struct QueuedInvocation : public IceUtil::Shared
+{
+ QueuedInvocation(const AMD_Object_ice_invokePtr& c, const pair<const Byte*, const Byte*>& p, const Current& curr) :
+ cb(c), current(curr)
+ {
+ if(p.first)
+ {
+ //
+ // The pointers in paramData refer to the Ice marshaling buffer and won't remain valid after
+ // ice_invoke_async completes, so we have to make a copy of the parameter data.
+ //
+ vector<Byte> tmp(p.first, p.second);
+ paramData.swap(tmp);
+ }
+ }
+
+ const AMD_Object_ice_invokePtr cb;
+ vector<Byte> paramData;
+ const Current current;
+};
+typedef IceUtil::Handle<QueuedInvocation> QueuedInvocationPtr;
+typedef vector<QueuedInvocationPtr> InvocationList;
+
+//
+// Relays heartbeat messages.
+//
+class HeartbeatCallbackI : public HeartbeatCallback
+{
+public:
+
+ HeartbeatCallbackI(const ConnectionPtr& con) :
+ _connection(con)
+ {
+ }
+
+ virtual void heartbeat(const ConnectionPtr&)
+ {
+ //
+ // When a connection receives a heartbeat message, we send one over its corresponding connection.
+ //
+ try
+ {
+ _connection->begin_heartbeat();
+ }
+ catch(...)
+ {
+ }
+ }
+
+private:
+
+ const ConnectionPtr _connection;
+};
+
+//
+// Represents a pair of bridged connections.
+//
+class BridgeConnection : public IceUtil::Shared
+{
+public:
+
+ BridgeConnection(const ObjectAdapterPtr& adapter, const ObjectPrx& target, const ConnectionPtr& incoming) :
+ _adapter(adapter), _target(target), _incoming(incoming), _closed(false)
+ {
+ }
+
+ void setOutgoing(const ConnectionPtr& outgoing)
+ {
+ if(outgoing)
+ {
+ InvocationList queuedInvocations;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ assert(!_outgoing);
+
+ if(_closed)
+ {
+ //
+ // The incoming connection is already closed. There's no point in leaving the outgoing
+ // connection open.
+ //
+ outgoing->close(CloseGracefully);
+ }
+ else
+ {
+ _outgoing = outgoing;
+
+ //
+ // Save the queued invocations.
+ //
+ queuedInvocations.swap(_queue);
+
+ //
+ // Register hearbeat callbacks on both connections.
+ //
+ _incoming->setHeartbeatCallback(new HeartbeatCallbackI(_outgoing));
+ _outgoing->setHeartbeatCallback(new HeartbeatCallbackI(_incoming));
+
+ //
+ // Configure the outgoing connection for bidirectional requests.
+ //
+ _outgoing->setAdapter(_adapter);
+ }
+ }
+
+ //
+ // Flush any queued invocations.
+ //
+ flush(outgoing, queuedInvocations);
+ }
+ else
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ //
+ // The outgoing connection failed so we close the incoming connection. closed() will eventually
+ // be called for it.
+ //
+ if(_incoming)
+ {
+ _incoming->close(CloseGracefully);
+ }
+ }
+ }
+
+ void closed(const ConnectionPtr& con)
+ {
+ InvocationList queuedInvocations;
+ ConnectionPtr toBeClosed;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ if(con == _incoming)
+ {
+ //
+ // The incoming connection was closed.
+ //
+ toBeClosed = _outgoing;
+ queuedInvocations.swap(_queue);
+ _closed = true;
+ _incoming = 0;
+ }
+ else
+ {
+ assert(con == _outgoing);
+
+ //
+ // An outgoing connection was closed.
+ //
+ toBeClosed = _incoming;
+ _outgoing = 0;
+ }
+ }
+
+ //
+ // Close the corresponding connection.
+ //
+ if(toBeClosed)
+ {
+ //
+ // Examine the exception that caused the connection to be closed. A CloseConnectionException
+ // indicates the connection was closed gracefully and we do the same. We also look for
+ // ConnectionManuallyClosedException, which indicates the connection was closed locally.
+ // We close forcefully for any other exception.
+ //
+ try
+ {
+ con->throwException();
+ }
+ catch(const Ice::CloseConnectionException&)
+ {
+ toBeClosed->close(CloseGracefully);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ //
+ // Connection was manually closed by the bridge.
+ //
+ toBeClosed->close(ex.graceful ? CloseGracefully : CloseForcefully);
+ }
+ catch(...)
+ {
+ toBeClosed->close(CloseForcefully);
+ }
+ }
+
+ //
+ // Even though the connection is already closed, we still need to "complete" the pending invocations so
+ // that the connection's dispatch count is updated correctly.
+ //
+ for(InvocationList::iterator p = queuedInvocations.begin(); p != queuedInvocations.end(); ++p)
+ {
+ (*p)->cb->ice_exception(ConnectionLostException(__FILE__, __LINE__));
+ }
+ }
+
+ void dispatch(const AMD_Object_ice_invokePtr& cb, const pair<const Byte*, const Byte*>& paramData,
+ const Current& current)
+ {
+ //
+ // We've received an invocations, either from the client via the incoming connection, or from
+ // the server via the outgoing (bidirectional) connection. The current.con member tells us
+ // the connection over which the request arrived.
+ //
+
+ ConnectionPtr dest;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ if(_closed)
+ {
+ //
+ // If the incoming connection has already closed, we're done.
+ //
+ cb->ice_exception(ConnectionLostException(__FILE__, __LINE__));
+ }
+ else if(!_outgoing)
+ {
+ //
+ // Queue the invocation until the outgoing connection is established.
+ //
+ _queue.push_back(new QueuedInvocation(cb, paramData, current));
+ }
+ else if(current.con == _incoming)
+ {
+ //
+ // Forward to the outgoing connection.
+ //
+ dest = _outgoing;
+ }
+ else
+ {
+ //
+ // Forward to the incoming connection.
+ //
+ assert(current.con == _outgoing);
+ dest = _incoming;
+ }
+ }
+
+ if(dest)
+ {
+ send(dest, cb, paramData, current);
+ }
+ }
+
+private:
+
+ void flush(const ConnectionPtr& outgoing, const InvocationList& invocations)
+ {
+ for(InvocationList::const_iterator p = invocations.begin(); p != invocations.end(); ++p)
+ {
+ QueuedInvocationPtr i = *p;
+
+ pair<const Byte*, const Byte*> paramData;
+ if(i->paramData.empty())
+ {
+ paramData.first = paramData.second = 0;
+ }
+ else
+ {
+ paramData.first = &i->paramData[0];
+ paramData.second = paramData.first + i->paramData.size();
+ }
+
+ send(outgoing, i->cb, paramData, i->current);
+ }
+ }
+
+ void send(const ConnectionPtr& dest, const AMD_Object_ice_invokePtr& cb,
+ const pair<const Byte*, const Byte*>& paramData, const Current& current)
+ {
+ try
+ {
+ //
+ // Create a proxy having the same identity as the request.
+ //
+ ObjectPrx prx = dest->createProxy(current.id);
+
+ //
+ // Examine the proxy and the request to determine whether it should be forwarded as a oneway or a twoway.
+ //
+ if(!prx->ice_isTwoway() || !current.requestId)
+ {
+ if(prx->ice_isTwoway())
+ {
+ prx = prx->ice_oneway();
+ }
+ OnewayInvocationPtr i = new OnewayInvocation(cb);
+ Callback_Object_ice_invokePtr d =
+ newCallback_Object_ice_invoke(i, &OnewayInvocation::success, &OnewayInvocation::exception,
+ &OnewayInvocation::sent);
+ prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d);
+ }
+ else
+ {
+ InvocationPtr i = new Invocation(cb);
+ Callback_Object_ice_invokePtr d =
+ newCallback_Object_ice_invoke(i, &Invocation::success, &Invocation::exception);
+ prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d);
+ }
+ }
+ catch(const std::exception& ex)
+ {
+ cb->ice_exception(ex);
+ }
+ }
+
+ const ObjectAdapterPtr _adapter;
+ const ObjectPrx _target;
+ ConnectionPtr _incoming;
+
+ IceUtil::Mutex _lock;
+
+ bool _closed;
+ ConnectionPtr _outgoing;
+
+ //
+ // We maintain our own queue for invocations that arrive on the incoming connection before the outgoing
+ // connection has been established. We don't want to forward these to proxies and let the proxies handle
+ // the queuing because then the invocations could be sent out of order (e.g., when invocations are split
+ // among twoway/oneway/datagram proxies).
+ //
+ InvocationList _queue;
+};
+typedef IceUtil::Handle<BridgeConnection> BridgeConnectionPtr;
+
+//
+// Allows the bridge to be used as an Ice router.
+//
+class RouterI : public Router
+{
+public:
+
+ virtual ObjectPrx getClientProxy(const Current&) const
+ {
+ return 0;
+ }
+
+ virtual ObjectPrx getServerProxy(const Current&) const
+ {
+ return 0;
+ }
+
+ virtual ObjectProxySeq addProxies(const ObjectProxySeq&, const Current&)
+ {
+ return ObjectProxySeq();
+ }
+};
+
+class FinderI : public RouterFinder
+{
+public:
+
+ FinderI(const RouterPrx& router) :
+ _router(router)
+ {
+ }
+
+ virtual RouterPrx getRouter(const Current&)
+ {
+ return _router;
+ }
+
+private:
+
+ const RouterPrx _router;
+};
+
+class BridgeI;
+typedef IceUtil::Handle<BridgeI> BridgeIPtr;
+
+class CloseCallbackI : public CloseCallback
+{
+public:
+
+ CloseCallbackI(const BridgeIPtr& bridge) :
+ _bridge(bridge)
+ {
+ }
+
+ virtual void closed(const ConnectionPtr&);
+
+private:
+
+ const BridgeIPtr _bridge;
+};
+
+class GetConnectionCallback : public IceUtil::Shared
+{
+public:
+
+ GetConnectionCallback(const BridgeIPtr& bridge, const BridgeConnectionPtr& bc) :
+ _bridge(bridge), _bc(bc)
+ {
+ }
+
+ void success(const ConnectionPtr&);
+ void exception(const Exception&);
+
+private:
+
+ const BridgeIPtr _bridge;
+ const BridgeConnectionPtr _bc;
+};
+typedef IceUtil::Handle<GetConnectionCallback> GetConnectionCallbackPtr;
+
+//
+// The main bridge servant.
+//
+class BridgeI : public Ice::BlobjectArrayAsync
+{
+public:
+
+ BridgeI(const ObjectAdapterPtr& adapter, const ObjectPrx& target) :
+ _adapter(adapter), _target(target)
+ {
+ }
+
+ virtual void ice_invoke_async(const AMD_Object_ice_invokePtr& cb,
+ const std::pair<const Byte*, const Byte*>& paramData,
+ const Current& current)
+ {
+ BridgeConnectionPtr bc;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ ConnectionMap::iterator p = _connections.find(current.con);
+ if(p == _connections.end())
+ {
+ //
+ // The connection is unknown to us, it must be a new incoming connection.
+ //
+
+ EndpointInfoPtr info = current.con->getEndpoint()->getInfo();
+
+ //
+ // Create a target proxy that matches the configuration of the incoming connection.
+ //
+ ObjectPrx target;
+ if(info->datagram())
+ {
+ target = _target->ice_datagram();
+ }
+ else if(info->secure())
+ {
+ target = _target->ice_secure(true);
+ }
+ else
+ {
+ target = _target;
+ }
+
+ //
+ // Force the proxy to establish a new connection by using a unique connection ID.
+ //
+ target = target->ice_connectionId(Ice::generateUUID());
+
+ bc = new BridgeConnection(_adapter, target, current.con);
+ _connections.insert(make_pair(current.con, bc));
+ current.con->setCloseCallback(new CloseCallbackI(this));
+
+ //
+ // Try to establish the outgoing connection.
+ //
+ try
+ {
+ //
+ // Begin the connection establishment process asynchronously. This can take a while to complete,
+ // especially when using Bluetooth.
+ //
+ GetConnectionCallbackPtr gc = new GetConnectionCallback(this, bc);
+ Callback_Object_ice_getConnectionPtr d =
+ newCallback_Object_ice_getConnection(gc, &GetConnectionCallback::success,
+ &GetConnectionCallback::exception);
+ target->begin_ice_getConnection(d);
+ }
+ catch(const Exception& ex)
+ {
+ cb->ice_exception(ex);
+ return;
+ }
+ }
+ else
+ {
+ bc = p->second;
+ }
+ }
+
+ //
+ // Delegate the invocation to the BridgeConnection object.
+ //
+ bc->dispatch(cb, paramData, current);
+ }
+
+ void closed(const ConnectionPtr& con)
+ {
+ //
+ // Notify the BridgeConnection that a connection has closed. We also need to remove it from our map.
+ //
+
+ BridgeConnectionPtr bc;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ ConnectionMap::iterator p = _connections.find(con);
+ assert(p != _connections.end());
+ bc = p->second;
+ _connections.erase(p);
+ }
+
+ bc->closed(con);
+ }
+
+ void outgoingSuccess(const BridgeConnectionPtr& bc, const ConnectionPtr& outgoing)
+ {
+ //
+ // An outgoing connection was established. Notify the BridgeConnection object.
+ //
+ IceUtil::Mutex::Lock lock(_lock);
+ _connections.insert(make_pair(outgoing, bc));
+ outgoing->setCloseCallback(new CloseCallbackI(this));
+ bc->setOutgoing(outgoing);
+ }
+
+ void outgoingException(const BridgeConnectionPtr& bc, const Exception& ex)
+ {
+ //
+ // An outgoing connection attempt failed. Notify the BridgeConnection object.
+ //
+ bc->setOutgoing(0);
+ }
+
+private:
+
+ const ObjectAdapterPtr _adapter;
+ const ObjectPrx _target;
+
+ IceUtil::Mutex _lock;
+
+ typedef map<ConnectionPtr, BridgeConnectionPtr> ConnectionMap;
+ ConnectionMap _connections;
+};
+
+void
+CloseCallbackI::closed(const ConnectionPtr& con)
+{
+ _bridge->closed(con);
+}
+
+void
+GetConnectionCallback::success(const ConnectionPtr& outgoing)
+{
+ _bridge->outgoingSuccess(_bc, outgoing);
+}
+
+void
+GetConnectionCallback::exception(const Exception& ex)
+{
+ _bridge->outgoingException(_bc, ex);
+}
+
+class BridgeService : public Service
+{
+public:
+
+ BridgeService();
+
+protected:
+
+ virtual bool start(int, char*[], int&);
+ virtual bool stop();
+ virtual CommunicatorPtr initializeCommunicator(int&, char*[], const InitializationData&);
+
+private:
+
+ void usage(const std::string&);
+};
+
+}
+
+BridgeService::BridgeService()
+{
+}
+
+bool
+BridgeService::start(int argc, char* argv[], int& status)
+{
+ IceUtilInternal::Options opts;
+ opts.addOpt("h", "help");
+ opts.addOpt("v", "version");
+
+ vector<string> args;
+ try
+ {
+ args = opts.parse(argc, const_cast<const char**>(argv));
+ }
+ catch(const IceUtilInternal::BadOptException& e)
+ {
+ error(e.reason);
+ usage(argv[0]);
+ return false;
+ }
+
+ if(opts.isSet("help"))
+ {
+ usage(argv[0]);
+ status = EXIT_SUCCESS;
+ return false;
+ }
+ if(opts.isSet("version"))
+ {
+ print(ICE_STRING_VERSION);
+ status = EXIT_SUCCESS;
+ return false;
+ }
+
+ if(!args.empty())
+ {
+ cerr << argv[0] << ": too many arguments" << endl;
+ usage(argv[0]);
+ return false;
+ }
+
+ PropertiesPtr properties = communicator()->getProperties();
+
+ const string targetProperty = "IceBridge.Target.Endpoints";
+ const string targetEndpoints = properties->getProperty(targetProperty);
+ if(targetEndpoints.empty())
+ {
+ error("property '" + targetProperty + "' is not set");
+ return false;
+ }
+
+ Ice::ObjectPrx target;
+
+ try
+ {
+ target = communicator()->stringToProxy("dummy:" + targetEndpoints);
+ }
+ catch(const std::exception& ex)
+ {
+ ServiceError err(this);
+ err << "setting for target endpoints '" << targetEndpoints << "' is invalid:\n" << ex;
+ return false;
+ }
+
+ //
+ // Initialize the object adapter.
+ //
+ const string endpointsProperty = "IceBridge.Source.Endpoints";
+ if(properties->getProperty(endpointsProperty).empty())
+ {
+ error("property '" + endpointsProperty + "' is not set");
+ return false;
+ }
+
+ ObjectAdapterPtr adapter = communicator()->createObjectAdapter("IceBridge.Source");
+
+ adapter->addDefaultServant(new BridgeI(adapter, target), "");
+
+ if(properties->getPropertyAsIntWithDefault("IceBridge.Router", 0) > 0)
+ {
+ RouterPrx router = RouterPrx::uncheckedCast(adapter->add(new RouterI, stringToIdentity("IceBridge/router")));
+ adapter->add(new FinderI(router), stringToIdentity("Ice/RouterFinder"));
+ }
+
+ try
+ {
+ adapter->activate();
+ }
+ catch(const std::exception& ex)
+ {
+ {
+ ServiceError err(this);
+ err << "caught exception activating object adapter\n" << ex;
+ }
+
+ stop();
+ return false;
+ }
+
+ return true;
+}
+
+bool
+BridgeService::stop()
+{
+ return true;
+}
+
+CommunicatorPtr
+BridgeService::initializeCommunicator(int& argc, char* argv[], const InitializationData& initializationData)
+{
+ InitializationData initData = initializationData;
+ initData.properties = createProperties(argc, argv, initializationData.properties);
+
+ StringSeq args = argsToStringSeq(argc, argv);
+ args = initData.properties->parseCommandLineOptions("IceBridge", args);
+ stringSeqToArgs(args, argc, argv);
+
+ //
+ // Disable automatic retry by default.
+ //
+ if(initData.properties->getProperty("Ice.RetryIntervals").empty())
+ {
+ initData.properties->setProperty("Ice.RetryIntervals", "-1");
+ }
+
+ return Service::initializeCommunicator(argc, argv, initData);
+}
+
+void
+BridgeService::usage(const string& appName)
+{
+ string options =
+ "Options:\n"
+ "-h, --help Show this message.\n"
+ "-v, --version Display the Ice version.\n";
+#ifndef _WIN32
+ options.append(
+ "--daemon Run as a daemon.\n"
+ "--pidfile FILE Write process ID into FILE.\n"
+ "--noclose Do not close open file descriptors.\n"
+ "--nochdir Do not change the current working directory.\n"
+ );
+#endif
+ print("Usage: " + appName + " [options]\n" + options);
+}
+
+#ifdef _WIN32
+
+int
+wmain(int argc, wchar_t* argv[])
+
+#else
+
+int
+main(int argc, char* argv[])
+
+#endif
+{
+ BridgeService svc;
+ return svc.main(argc, argv);
+}
diff --git a/cpp/src/IceBridge/Makefile.mk b/cpp/src/IceBridge/Makefile.mk
new file mode 100644
index 00000000000..b3c282b0c39
--- /dev/null
+++ b/cpp/src/IceBridge/Makefile.mk
@@ -0,0 +1,15 @@
+# **********************************************************************
+#
+# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
+#
+# This copy of Ice is licensed to you under the terms described in the
+# ICE_LICENSE file included in this distribution.
+#
+# **********************************************************************
+
+$(project)_programs := icebridge
+$(project)_dependencies := Ice
+
+icebridge_targetdir := $(bindir)
+
+projects += $(project)
diff --git a/cpp/src/IceGrid/SessionManager.cpp b/cpp/src/IceGrid/SessionManager.cpp
index 34caf1f9bd2..57ded43015f 100644
--- a/cpp/src/IceGrid/SessionManager.cpp
+++ b/cpp/src/IceGrid/SessionManager.cpp
@@ -58,7 +58,7 @@ SessionManager::findAllQueryObjects(bool cached)
{
try
{
- connection->close(false);
+ connection->close(Ice::CloseGracefullyAndWait);
}
catch(const Ice::LocalException&)
{