diff options
author | Mark Spruiell <mes@zeroc.com> | 2017-01-30 13:45:21 -0800 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2017-01-30 13:45:21 -0800 |
commit | 61270a10f980933cf582edb766f10c8ac6d86e8a (patch) | |
tree | 45ab4a7c2986954054fce613bc3c8f7967e7951e /cpp/src | |
parent | Fix slice2cpp build failure (diff) | |
download | ice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.bz2 ice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.xz ice-61270a10f980933cf582edb766f10c8ac6d86e8a.zip |
merging IceBridge into master
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Glacier2/RoutingTable.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Glacier2/SessionRouterI.cpp | 10 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 211 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 20 | ||||
-rw-r--r-- | cpp/src/Ice/Exception.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/ProxyFactory.cpp | 7 | ||||
-rw-r--r-- | cpp/src/IceBridge/IceBridge.cpp | 837 | ||||
-rw-r--r-- | cpp/src/IceBridge/Makefile.mk | 15 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.cpp | 2 |
9 files changed, 1072 insertions, 40 deletions
diff --git a/cpp/src/Glacier2/RoutingTable.cpp b/cpp/src/Glacier2/RoutingTable.cpp index fde938fba00..e6b090de4cb 100644 --- a/cpp/src/Glacier2/RoutingTable.cpp +++ b/cpp/src/Glacier2/RoutingTable.cpp @@ -65,7 +65,7 @@ Glacier2::RoutingTable::add(const ObjectProxySeq& unfiltered, const Current& cur if(!_verifier->verify(*prx)) { - current.con->close(true); + current.con->close(CloseForcefully); throw ObjectNotExistException(__FILE__, __LINE__); } ObjectPrx proxy = (*prx)->ice_twoway()->ice_secure(false)->ice_facet(""); // We add proxies in default form. diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp index 9f774b79a8e..512d371611f 100644 --- a/cpp/src/Glacier2/SessionRouterI.cpp +++ b/cpp/src/Glacier2/SessionRouterI.cpp @@ -80,7 +80,7 @@ public: // Close the connection otherwise the peer has no way to know that // the session has gone. // - _connection->close(true); + _connection->close(CloseForcefully); _router->destroySession(_connection); } } @@ -922,7 +922,7 @@ SessionRouterI::refreshSession(const Ice::ConnectionPtr& con) // Close the connection otherwise the peer has no way to know that the // session has gone. // - con->close(true); + con->close(CloseForcefully); throw SessionNotExistException(); } } @@ -1149,10 +1149,10 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi if(_rejectTraceLevel >= 1) { Trace out(_instance->logger(), "Glacier2"); - out << "rejecting request. no session is associated with the connection.\n"; - out << "identity: " << _instance->communicator()->identityToString(id); + out << "rejecting request, no session is associated with the connection.\n"; + out << "identity: " << identityToString(id); } - connection->close(true); + connection->close(CloseForcefully); throw ObjectNotExistException(__FILE__, __LINE__); } return 0; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 7845259855c..23388d399c6 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -409,29 +409,31 @@ Ice::ConnectionI::destroy(DestructionReason reason) } void -Ice::ConnectionI::close(bool force) +Ice::ConnectionI::close(ConnectionClose mode) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(force) + if(mode == CloseForcefully) { - setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + setState(StateClosed, ConnectionManuallyClosedException(__FILE__, __LINE__, false)); + } + else if(mode == CloseGracefully) + { + setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true)); } else { + assert(mode == CloseGracefullyAndWait); + // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, the - // CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the server - // has processed them or not. + // Wait until all outstanding requests have been completed. // while(!_asyncRequests.empty()) { wait(); } - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true)); } } @@ -550,13 +552,13 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // // We send a heartbeat if there was no activity in the last // (timeout / 4) period. Sending a heartbeat sooner than really - // needed is safer to ensure that the receiver will receive in - // time the heartbeat. Sending the heartbeat if there was no + // needed is safer to ensure that the receiver will receive the + // heartbeat in time. Sending the heartbeat if there was no // activity in the last (timeout / 2) period isn't enough since // monitor() is called only every (timeout / 2) period. // // Note that this doesn't imply that we are sending 4 heartbeats - // per timeout period because the monitor() method is sill only + // per timeout period because the monitor() method is still only // called every (timeout / 2) period. // if(acm.heartbeat == HeartbeatAlways || @@ -564,7 +566,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) { if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0) { - heartbeat(); + sendHeartbeatNow(); } } @@ -802,6 +804,174 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) } #endif +namespace +{ + +const ::std::string __heartbeat_name = "heartbeat"; + +class HeartbeatAsync : public OutgoingAsyncBase +{ +public: + + HeartbeatAsync(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance) : + OutgoingAsyncBase(instance), + _communicator(communicator), + _connection(connection) + { + } + + virtual CommunicatorPtr getCommunicator() const + { + return _communicator; + } + + virtual ConnectionPtr getConnection() const + { + return _connection; + } + + virtual const string& getOperation() const + { + return __heartbeat_name; + } + + void invoke() + { + _observer.attach(_instance.get(), __heartbeat_name); + try + { + _os.write(magic[0]); + _os.write(magic[1]); + _os.write(magic[2]); + _os.write(magic[3]); + _os.write(currentProtocol); + _os.write(currentProtocolEncoding); + _os.write(validateConnectionMsg); + _os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). + _os.write(headerSize); // Message size. + _os.i = _os.b.begin(); + + AsyncStatus status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, 0); + if(status & AsyncStatusSent) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); + } + } + } + catch(const RetryException& ex) + { + if(exception(*ex.get())) + { + invokeExceptionAsync(); + } + } + catch(const Exception& ex) + { + if(exception(ex)) + { + invokeExceptionAsync(); + } + } + } + +private: + + CommunicatorPtr _communicator; + ConnectionIPtr _connection; +}; +typedef IceUtil::Handle<HeartbeatAsync> HeartbeatAsyncPtr; + +} + +#ifdef ICE_CPP11_MAPPING +void +Ice::ConnectionI::heartbeat() +{ + Connection::heartbeatAsync().get(); +} + +std::function<void()> +Ice::ConnectionI::heartbeatAsync(::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent) +{ + class HeartbeatLambda : public HeartbeatAsync, public LambdaInvoke + { + public: + + HeartbeatLambda(std::shared_ptr<Ice::ConnectionI>&& connection, + std::shared_ptr<Ice::Communicator>& communicator, + const InstancePtr& instance, + std::function<void(std::exception_ptr)> ex, + std::function<void(bool)> sent) : + HeartbeatAsync(connection, communicator, instance), LambdaInvoke(std::move(ex), std::move(sent)) + { + } + }; + auto outAsync = make_shared<HeartbeatLambda>(ICE_SHARED_FROM_THIS, _communicator, _instance, ex, sent); + outAsync->invoke(); + return [outAsync]() { outAsync->cancel(); }; +} +#else +void +Ice::ConnectionI::heartbeat() +{ + end_heartbeat(begin_heartbeat()); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat() +{ + return _iceI_begin_heartbeat(dummyCallback, 0); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat(const CallbackPtr& cb, const LocalObjectPtr& cookie) +{ + return _iceI_begin_heartbeat(cb, cookie); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat(const Callback_Connection_heartbeatPtr& cb, const LocalObjectPtr& cookie) +{ + return _iceI_begin_heartbeat(cb, cookie); +} + +AsyncResultPtr +Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) +{ + class HeartbeatCallback : public HeartbeatAsync, public CallbackCompletion + { + public: + + HeartbeatCallback(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const CallbackBasePtr& callback, + const LocalObjectPtr& cookie) : + HeartbeatAsync(connection, communicator, instance), + CallbackCompletion(callback, cookie) + { + _cookie = cookie; + } + }; + + HeartbeatAsyncPtr result = new HeartbeatCallback(this, _communicator, _instance, cb, cookie); + result->invoke(); + return result; +} + +void +Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r) +{ + AsyncResult::check(r, this, __heartbeat_name); + r->waitForResponse(); +} +#endif + void Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback) { @@ -1755,7 +1925,7 @@ Ice::ConnectionI::finish(bool close) out << "closed " << _endpoint->protocol() << " connection\n" << toString(); if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()))) @@ -2076,7 +2246,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) // Don't warn about certain expected exceptions. // if(!(dynamic_cast<const CloseConnectionException*>(&ex) || - dynamic_cast<const ForcedCloseConnectionException*>(&ex) || + dynamic_cast<const ConnectionManuallyClosedException*>(&ex) || dynamic_cast<const ConnectionTimeoutException*>(&ex) || dynamic_cast<const CommunicatorDestroyedException*>(&ex) || dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || @@ -2255,7 +2425,7 @@ Ice::ConnectionI::setState(State state) if(_observer && state == StateClosed && _exception) { if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || @@ -2285,8 +2455,7 @@ Ice::ConnectionI::setState(State state) void Ice::ConnectionI::initiateShutdown() { - assert(_state == StateClosing); - assert(_dispatchCount == 0); + assert(_state == StateClosing && _dispatchCount == 0); if(_shutdownInitiated) { @@ -2316,7 +2485,7 @@ Ice::ConnectionI::initiateShutdown() setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // SocketOperation op = _transceiver->closing(true, *_exception); if(op) @@ -2329,7 +2498,7 @@ Ice::ConnectionI::initiateShutdown() } void -Ice::ConnectionI::heartbeat() +Ice::ConnectionI::sendHeartbeatNow() { assert(_state == StateActive); @@ -3016,7 +3185,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__)); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // SocketOperation op = _transceiver->closing(false, *_exception); if(op) diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 72652518b94..7fb3781e880 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -157,12 +157,12 @@ public: void activate(); void hold(); void destroy(DestructionReason); - virtual void close(bool); // From Connection. + virtual void close(ConnectionClose); // From Connection. bool isActiveOrHolding() const; bool isFinished() const; - void throwException() const; // Throws the connection exception if destroyed. + virtual void throwException() const; // From Connection. Throws the connection exception if destroyed. void waitUntilHolding() const; void waitUntilFinished(); // Not const, as this might close the connection upon timeout. @@ -193,6 +193,19 @@ public: virtual void setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK)); virtual void setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK)); + virtual void heartbeat(); + +#ifdef ICE_CPP11_MAPPING + virtual std::function<void()> + heartbeatAsync(::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr); +#else + virtual AsyncResultPtr begin_heartbeat(); + virtual AsyncResultPtr begin_heartbeat(const CallbackPtr&, const LocalObjectPtr& = 0); + virtual AsyncResultPtr begin_heartbeat(const Callback_Connection_heartbeatPtr&, const LocalObjectPtr& = 0); + + virtual void end_heartbeat(const AsyncResultPtr&); +#endif + virtual void setACM(const IceUtil::Optional<int>&, const IceUtil::Optional<ACMClose>&, const IceUtil::Optional<ACMHeartbeat>&); @@ -276,7 +289,7 @@ private: void setState(State); void initiateShutdown(); - void heartbeat(); + void sendHeartbeatNow(); bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone); bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone); @@ -308,6 +321,7 @@ private: #ifndef ICE_CPP11_MAPPING AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); + AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); #endif Ice::CommunicatorPtr _communicator; diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp index 4ac7d66cdd3..a20c564101e 100644 --- a/cpp/src/Ice/Exception.cpp +++ b/cpp/src/Ice/Exception.cpp @@ -632,14 +632,10 @@ Ice::CloseConnectionException::ice_print(ostream& out) const } void -Ice::ForcedCloseConnectionException::ice_print(ostream& out) const +Ice::ConnectionManuallyClosedException::ice_print(ostream& out) const { Exception::ice_print(out); - out << ":\nprotocol error: connection forcefully closed"; - if(!reason.empty()) - { - out << ":\n" << reason; - } + out << ":\nprotocol error: connection manually closed (" << (graceful ? "gracefully" : "forcefully") << ")"; } void diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index eba949508b6..34503ec961b 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -201,11 +201,12 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co } // - // Don't retry if the communicator is destroyed or object adapter - // deactivated. + // Don't retry if the communicator is destroyed, object adapter is deactivated, + // or connection is manually closed. // if(dynamic_cast<const CommunicatorDestroyedException*>(&ex) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex)) + dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || + dynamic_cast<const ConnectionManuallyClosedException*>(&ex)) { ex.ice_throw(); } diff --git a/cpp/src/IceBridge/IceBridge.cpp b/cpp/src/IceBridge/IceBridge.cpp new file mode 100644 index 00000000000..31ec58aae6c --- /dev/null +++ b/cpp/src/IceBridge/IceBridge.cpp @@ -0,0 +1,837 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/Connection.h> +#include <Ice/ObjectAdapter.h> +#include <Ice/Service.h> +#include <Ice/UUID.h> +#include <IceUtil/Options.h> + +using namespace std; +using namespace Ice; + +namespace +{ + +// +// Represents a pending twoway invocation. +// +class Invocation : public IceUtil::Shared +{ +public: + + Invocation(const AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + void success(bool ok, const pair<const Byte*, const Byte*>& results) + { + _cb->ice_response(ok, results); + } + + void exception(const Exception& ex) + { + _cb->ice_exception(ex); + } + +private: + + AMD_Object_ice_invokePtr _cb; +}; +typedef IceUtil::Handle<Invocation> InvocationPtr; + +// +// Represents a pending oneway invocation. +// +class OnewayInvocation : public IceUtil::Shared +{ +public: + + OnewayInvocation(const AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + void success(bool, const pair<const Byte*, const Byte*>&) + { + assert(false); + } + + void exception(const Exception& ex) + { + _cb->ice_exception(ex); + } + + void sent(bool sentSynchronously) + { + _cb->ice_response(true, vector<Byte>()); + } + +private: + + AMD_Object_ice_invokePtr _cb; +}; +typedef IceUtil::Handle<OnewayInvocation> OnewayInvocationPtr; + +// +// Holds information about an incoming invocation that's been queued until an outgoing connection has +// been established. +// +struct QueuedInvocation : public IceUtil::Shared +{ + QueuedInvocation(const AMD_Object_ice_invokePtr& c, const pair<const Byte*, const Byte*>& p, const Current& curr) : + cb(c), current(curr) + { + if(p.first) + { + // + // The pointers in paramData refer to the Ice marshaling buffer and won't remain valid after + // ice_invoke_async completes, so we have to make a copy of the parameter data. + // + vector<Byte> tmp(p.first, p.second); + paramData.swap(tmp); + } + } + + const AMD_Object_ice_invokePtr cb; + vector<Byte> paramData; + const Current current; +}; +typedef IceUtil::Handle<QueuedInvocation> QueuedInvocationPtr; +typedef vector<QueuedInvocationPtr> InvocationList; + +// +// Relays heartbeat messages. +// +class HeartbeatCallbackI : public HeartbeatCallback +{ +public: + + HeartbeatCallbackI(const ConnectionPtr& con) : + _connection(con) + { + } + + virtual void heartbeat(const ConnectionPtr&) + { + // + // When a connection receives a heartbeat message, we send one over its corresponding connection. + // + try + { + _connection->begin_heartbeat(); + } + catch(...) + { + } + } + +private: + + const ConnectionPtr _connection; +}; + +// +// Represents a pair of bridged connections. +// +class BridgeConnection : public IceUtil::Shared +{ +public: + + BridgeConnection(const ObjectAdapterPtr& adapter, const ObjectPrx& target, const ConnectionPtr& incoming) : + _adapter(adapter), _target(target), _incoming(incoming), _closed(false) + { + } + + void setOutgoing(const ConnectionPtr& outgoing) + { + if(outgoing) + { + InvocationList queuedInvocations; + + { + IceUtil::Mutex::Lock lock(_lock); + + assert(!_outgoing); + + if(_closed) + { + // + // The incoming connection is already closed. There's no point in leaving the outgoing + // connection open. + // + outgoing->close(CloseGracefully); + } + else + { + _outgoing = outgoing; + + // + // Save the queued invocations. + // + queuedInvocations.swap(_queue); + + // + // Register hearbeat callbacks on both connections. + // + _incoming->setHeartbeatCallback(new HeartbeatCallbackI(_outgoing)); + _outgoing->setHeartbeatCallback(new HeartbeatCallbackI(_incoming)); + + // + // Configure the outgoing connection for bidirectional requests. + // + _outgoing->setAdapter(_adapter); + } + } + + // + // Flush any queued invocations. + // + flush(outgoing, queuedInvocations); + } + else + { + IceUtil::Mutex::Lock lock(_lock); + + // + // The outgoing connection failed so we close the incoming connection. closed() will eventually + // be called for it. + // + if(_incoming) + { + _incoming->close(CloseGracefully); + } + } + } + + void closed(const ConnectionPtr& con) + { + InvocationList queuedInvocations; + ConnectionPtr toBeClosed; + + { + IceUtil::Mutex::Lock lock(_lock); + + if(con == _incoming) + { + // + // The incoming connection was closed. + // + toBeClosed = _outgoing; + queuedInvocations.swap(_queue); + _closed = true; + _incoming = 0; + } + else + { + assert(con == _outgoing); + + // + // An outgoing connection was closed. + // + toBeClosed = _incoming; + _outgoing = 0; + } + } + + // + // Close the corresponding connection. + // + if(toBeClosed) + { + // + // Examine the exception that caused the connection to be closed. A CloseConnectionException + // indicates the connection was closed gracefully and we do the same. We also look for + // ConnectionManuallyClosedException, which indicates the connection was closed locally. + // We close forcefully for any other exception. + // + try + { + con->throwException(); + } + catch(const Ice::CloseConnectionException&) + { + toBeClosed->close(CloseGracefully); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + // + // Connection was manually closed by the bridge. + // + toBeClosed->close(ex.graceful ? CloseGracefully : CloseForcefully); + } + catch(...) + { + toBeClosed->close(CloseForcefully); + } + } + + // + // Even though the connection is already closed, we still need to "complete" the pending invocations so + // that the connection's dispatch count is updated correctly. + // + for(InvocationList::iterator p = queuedInvocations.begin(); p != queuedInvocations.end(); ++p) + { + (*p)->cb->ice_exception(ConnectionLostException(__FILE__, __LINE__)); + } + } + + void dispatch(const AMD_Object_ice_invokePtr& cb, const pair<const Byte*, const Byte*>& paramData, + const Current& current) + { + // + // We've received an invocations, either from the client via the incoming connection, or from + // the server via the outgoing (bidirectional) connection. The current.con member tells us + // the connection over which the request arrived. + // + + ConnectionPtr dest; + + { + IceUtil::Mutex::Lock lock(_lock); + + if(_closed) + { + // + // If the incoming connection has already closed, we're done. + // + cb->ice_exception(ConnectionLostException(__FILE__, __LINE__)); + } + else if(!_outgoing) + { + // + // Queue the invocation until the outgoing connection is established. + // + _queue.push_back(new QueuedInvocation(cb, paramData, current)); + } + else if(current.con == _incoming) + { + // + // Forward to the outgoing connection. + // + dest = _outgoing; + } + else + { + // + // Forward to the incoming connection. + // + assert(current.con == _outgoing); + dest = _incoming; + } + } + + if(dest) + { + send(dest, cb, paramData, current); + } + } + +private: + + void flush(const ConnectionPtr& outgoing, const InvocationList& invocations) + { + for(InvocationList::const_iterator p = invocations.begin(); p != invocations.end(); ++p) + { + QueuedInvocationPtr i = *p; + + pair<const Byte*, const Byte*> paramData; + if(i->paramData.empty()) + { + paramData.first = paramData.second = 0; + } + else + { + paramData.first = &i->paramData[0]; + paramData.second = paramData.first + i->paramData.size(); + } + + send(outgoing, i->cb, paramData, i->current); + } + } + + void send(const ConnectionPtr& dest, const AMD_Object_ice_invokePtr& cb, + const pair<const Byte*, const Byte*>& paramData, const Current& current) + { + try + { + // + // Create a proxy having the same identity as the request. + // + ObjectPrx prx = dest->createProxy(current.id); + + // + // Examine the proxy and the request to determine whether it should be forwarded as a oneway or a twoway. + // + if(!prx->ice_isTwoway() || !current.requestId) + { + if(prx->ice_isTwoway()) + { + prx = prx->ice_oneway(); + } + OnewayInvocationPtr i = new OnewayInvocation(cb); + Callback_Object_ice_invokePtr d = + newCallback_Object_ice_invoke(i, &OnewayInvocation::success, &OnewayInvocation::exception, + &OnewayInvocation::sent); + prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d); + } + else + { + InvocationPtr i = new Invocation(cb); + Callback_Object_ice_invokePtr d = + newCallback_Object_ice_invoke(i, &Invocation::success, &Invocation::exception); + prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d); + } + } + catch(const std::exception& ex) + { + cb->ice_exception(ex); + } + } + + const ObjectAdapterPtr _adapter; + const ObjectPrx _target; + ConnectionPtr _incoming; + + IceUtil::Mutex _lock; + + bool _closed; + ConnectionPtr _outgoing; + + // + // We maintain our own queue for invocations that arrive on the incoming connection before the outgoing + // connection has been established. We don't want to forward these to proxies and let the proxies handle + // the queuing because then the invocations could be sent out of order (e.g., when invocations are split + // among twoway/oneway/datagram proxies). + // + InvocationList _queue; +}; +typedef IceUtil::Handle<BridgeConnection> BridgeConnectionPtr; + +// +// Allows the bridge to be used as an Ice router. +// +class RouterI : public Router +{ +public: + + virtual ObjectPrx getClientProxy(const Current&) const + { + return 0; + } + + virtual ObjectPrx getServerProxy(const Current&) const + { + return 0; + } + + virtual ObjectProxySeq addProxies(const ObjectProxySeq&, const Current&) + { + return ObjectProxySeq(); + } +}; + +class FinderI : public RouterFinder +{ +public: + + FinderI(const RouterPrx& router) : + _router(router) + { + } + + virtual RouterPrx getRouter(const Current&) + { + return _router; + } + +private: + + const RouterPrx _router; +}; + +class BridgeI; +typedef IceUtil::Handle<BridgeI> BridgeIPtr; + +class CloseCallbackI : public CloseCallback +{ +public: + + CloseCallbackI(const BridgeIPtr& bridge) : + _bridge(bridge) + { + } + + virtual void closed(const ConnectionPtr&); + +private: + + const BridgeIPtr _bridge; +}; + +class GetConnectionCallback : public IceUtil::Shared +{ +public: + + GetConnectionCallback(const BridgeIPtr& bridge, const BridgeConnectionPtr& bc) : + _bridge(bridge), _bc(bc) + { + } + + void success(const ConnectionPtr&); + void exception(const Exception&); + +private: + + const BridgeIPtr _bridge; + const BridgeConnectionPtr _bc; +}; +typedef IceUtil::Handle<GetConnectionCallback> GetConnectionCallbackPtr; + +// +// The main bridge servant. +// +class BridgeI : public Ice::BlobjectArrayAsync +{ +public: + + BridgeI(const ObjectAdapterPtr& adapter, const ObjectPrx& target) : + _adapter(adapter), _target(target) + { + } + + virtual void ice_invoke_async(const AMD_Object_ice_invokePtr& cb, + const std::pair<const Byte*, const Byte*>& paramData, + const Current& current) + { + BridgeConnectionPtr bc; + + { + IceUtil::Mutex::Lock lock(_lock); + + ConnectionMap::iterator p = _connections.find(current.con); + if(p == _connections.end()) + { + // + // The connection is unknown to us, it must be a new incoming connection. + // + + EndpointInfoPtr info = current.con->getEndpoint()->getInfo(); + + // + // Create a target proxy that matches the configuration of the incoming connection. + // + ObjectPrx target; + if(info->datagram()) + { + target = _target->ice_datagram(); + } + else if(info->secure()) + { + target = _target->ice_secure(true); + } + else + { + target = _target; + } + + // + // Force the proxy to establish a new connection by using a unique connection ID. + // + target = target->ice_connectionId(Ice::generateUUID()); + + bc = new BridgeConnection(_adapter, target, current.con); + _connections.insert(make_pair(current.con, bc)); + current.con->setCloseCallback(new CloseCallbackI(this)); + + // + // Try to establish the outgoing connection. + // + try + { + // + // Begin the connection establishment process asynchronously. This can take a while to complete, + // especially when using Bluetooth. + // + GetConnectionCallbackPtr gc = new GetConnectionCallback(this, bc); + Callback_Object_ice_getConnectionPtr d = + newCallback_Object_ice_getConnection(gc, &GetConnectionCallback::success, + &GetConnectionCallback::exception); + target->begin_ice_getConnection(d); + } + catch(const Exception& ex) + { + cb->ice_exception(ex); + return; + } + } + else + { + bc = p->second; + } + } + + // + // Delegate the invocation to the BridgeConnection object. + // + bc->dispatch(cb, paramData, current); + } + + void closed(const ConnectionPtr& con) + { + // + // Notify the BridgeConnection that a connection has closed. We also need to remove it from our map. + // + + BridgeConnectionPtr bc; + + { + IceUtil::Mutex::Lock lock(_lock); + + ConnectionMap::iterator p = _connections.find(con); + assert(p != _connections.end()); + bc = p->second; + _connections.erase(p); + } + + bc->closed(con); + } + + void outgoingSuccess(const BridgeConnectionPtr& bc, const ConnectionPtr& outgoing) + { + // + // An outgoing connection was established. Notify the BridgeConnection object. + // + IceUtil::Mutex::Lock lock(_lock); + _connections.insert(make_pair(outgoing, bc)); + outgoing->setCloseCallback(new CloseCallbackI(this)); + bc->setOutgoing(outgoing); + } + + void outgoingException(const BridgeConnectionPtr& bc, const Exception& ex) + { + // + // An outgoing connection attempt failed. Notify the BridgeConnection object. + // + bc->setOutgoing(0); + } + +private: + + const ObjectAdapterPtr _adapter; + const ObjectPrx _target; + + IceUtil::Mutex _lock; + + typedef map<ConnectionPtr, BridgeConnectionPtr> ConnectionMap; + ConnectionMap _connections; +}; + +void +CloseCallbackI::closed(const ConnectionPtr& con) +{ + _bridge->closed(con); +} + +void +GetConnectionCallback::success(const ConnectionPtr& outgoing) +{ + _bridge->outgoingSuccess(_bc, outgoing); +} + +void +GetConnectionCallback::exception(const Exception& ex) +{ + _bridge->outgoingException(_bc, ex); +} + +class BridgeService : public Service +{ +public: + + BridgeService(); + +protected: + + virtual bool start(int, char*[], int&); + virtual bool stop(); + virtual CommunicatorPtr initializeCommunicator(int&, char*[], const InitializationData&); + +private: + + void usage(const std::string&); +}; + +} + +BridgeService::BridgeService() +{ +} + +bool +BridgeService::start(int argc, char* argv[], int& status) +{ + IceUtilInternal::Options opts; + opts.addOpt("h", "help"); + opts.addOpt("v", "version"); + + vector<string> args; + try + { + args = opts.parse(argc, const_cast<const char**>(argv)); + } + catch(const IceUtilInternal::BadOptException& e) + { + error(e.reason); + usage(argv[0]); + return false; + } + + if(opts.isSet("help")) + { + usage(argv[0]); + status = EXIT_SUCCESS; + return false; + } + if(opts.isSet("version")) + { + print(ICE_STRING_VERSION); + status = EXIT_SUCCESS; + return false; + } + + if(!args.empty()) + { + cerr << argv[0] << ": too many arguments" << endl; + usage(argv[0]); + return false; + } + + PropertiesPtr properties = communicator()->getProperties(); + + const string targetProperty = "IceBridge.Target.Endpoints"; + const string targetEndpoints = properties->getProperty(targetProperty); + if(targetEndpoints.empty()) + { + error("property '" + targetProperty + "' is not set"); + return false; + } + + Ice::ObjectPrx target; + + try + { + target = communicator()->stringToProxy("dummy:" + targetEndpoints); + } + catch(const std::exception& ex) + { + ServiceError err(this); + err << "setting for target endpoints '" << targetEndpoints << "' is invalid:\n" << ex; + return false; + } + + // + // Initialize the object adapter. + // + const string endpointsProperty = "IceBridge.Source.Endpoints"; + if(properties->getProperty(endpointsProperty).empty()) + { + error("property '" + endpointsProperty + "' is not set"); + return false; + } + + ObjectAdapterPtr adapter = communicator()->createObjectAdapter("IceBridge.Source"); + + adapter->addDefaultServant(new BridgeI(adapter, target), ""); + + if(properties->getPropertyAsIntWithDefault("IceBridge.Router", 0) > 0) + { + RouterPrx router = RouterPrx::uncheckedCast(adapter->add(new RouterI, stringToIdentity("IceBridge/router"))); + adapter->add(new FinderI(router), stringToIdentity("Ice/RouterFinder")); + } + + try + { + adapter->activate(); + } + catch(const std::exception& ex) + { + { + ServiceError err(this); + err << "caught exception activating object adapter\n" << ex; + } + + stop(); + return false; + } + + return true; +} + +bool +BridgeService::stop() +{ + return true; +} + +CommunicatorPtr +BridgeService::initializeCommunicator(int& argc, char* argv[], const InitializationData& initializationData) +{ + InitializationData initData = initializationData; + initData.properties = createProperties(argc, argv, initializationData.properties); + + StringSeq args = argsToStringSeq(argc, argv); + args = initData.properties->parseCommandLineOptions("IceBridge", args); + stringSeqToArgs(args, argc, argv); + + // + // Disable automatic retry by default. + // + if(initData.properties->getProperty("Ice.RetryIntervals").empty()) + { + initData.properties->setProperty("Ice.RetryIntervals", "-1"); + } + + return Service::initializeCommunicator(argc, argv, initData); +} + +void +BridgeService::usage(const string& appName) +{ + string options = + "Options:\n" + "-h, --help Show this message.\n" + "-v, --version Display the Ice version.\n"; +#ifndef _WIN32 + options.append( + "--daemon Run as a daemon.\n" + "--pidfile FILE Write process ID into FILE.\n" + "--noclose Do not close open file descriptors.\n" + "--nochdir Do not change the current working directory.\n" + ); +#endif + print("Usage: " + appName + " [options]\n" + options); +} + +#ifdef _WIN32 + +int +wmain(int argc, wchar_t* argv[]) + +#else + +int +main(int argc, char* argv[]) + +#endif +{ + BridgeService svc; + return svc.main(argc, argv); +} diff --git a/cpp/src/IceBridge/Makefile.mk b/cpp/src/IceBridge/Makefile.mk new file mode 100644 index 00000000000..b3c282b0c39 --- /dev/null +++ b/cpp/src/IceBridge/Makefile.mk @@ -0,0 +1,15 @@ +# ********************************************************************** +# +# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +$(project)_programs := icebridge +$(project)_dependencies := Ice + +icebridge_targetdir := $(bindir) + +projects += $(project) diff --git a/cpp/src/IceGrid/SessionManager.cpp b/cpp/src/IceGrid/SessionManager.cpp index 34caf1f9bd2..57ded43015f 100644 --- a/cpp/src/IceGrid/SessionManager.cpp +++ b/cpp/src/IceGrid/SessionManager.cpp @@ -58,7 +58,7 @@ SessionManager::findAllQueryObjects(bool cached) { try { - connection->close(false); + connection->close(Ice::CloseGracefullyAndWait); } catch(const Ice::LocalException&) { |