diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceBridge/IceBridge.cpp | 124 |
1 files changed, 78 insertions, 46 deletions
diff --git a/cpp/src/IceBridge/IceBridge.cpp b/cpp/src/IceBridge/IceBridge.cpp index 326a73a496b..71f0435eae4 100644 --- a/cpp/src/IceBridge/IceBridge.cpp +++ b/cpp/src/IceBridge/IceBridge.cpp @@ -146,68 +146,98 @@ class BridgeConnection : public IceUtil::Shared public: BridgeConnection(const ObjectAdapterPtr& adapter, const ObjectPrx& target, const ConnectionPtr& incoming) : - _adapter(adapter), _target(target), _incoming(incoming), _closed(false) + _adapter(adapter), _target(target), _incoming(incoming), _state(Queuing) { } - void setOutgoing(const ConnectionPtr& outgoing) + void outgoingSuccess(const ConnectionPtr& outgoing) { - if(outgoing) - { - InvocationList queuedInvocations; + assert(outgoing); - { - IceUtil::Mutex::Lock lock(_lock); + InvocationList queuedInvocations; - assert(!_outgoing); + { + IceUtil::Mutex::Lock lock(_lock); + + assert(!_outgoing); - if(_closed) + if(_state == Closed) + { + // + // The incoming connection is already closed. There's no point in leaving the outgoing + // connection open. + // + outgoing->close(ICE_SCOPED_ENUM(ConnectionClose, Gracefully)); + } + else + { + _outgoing = outgoing; + if(_state == Queuing) { - // - // The incoming connection is already closed. There's no point in leaving the outgoing - // connection open. - // - outgoing->close(ICE_SCOPED_ENUM(ConnectionClose, Gracefully)); + _state = Active; } - else - { - _outgoing = outgoing; - // - // Save the queued invocations. - // - queuedInvocations.swap(_queue); + // + // Save the queued invocations. + // + queuedInvocations.swap(_queue); - // - // Register hearbeat callbacks on both connections. - // - _incoming->setHeartbeatCallback(new HeartbeatCallbackI(_outgoing)); - _outgoing->setHeartbeatCallback(new HeartbeatCallbackI(_incoming)); + // + // Register hearbeat callbacks on both connections. + // + _incoming->setHeartbeatCallback(new HeartbeatCallbackI(_outgoing)); + _outgoing->setHeartbeatCallback(new HeartbeatCallbackI(_incoming)); - // - // Configure the outgoing connection for bidirectional requests. - // - _outgoing->setAdapter(_adapter); - } + // + // Configure the outgoing connection for bidirectional requests. + // + _outgoing->setAdapter(_adapter); } - - // - // Flush any queued invocations. - // - flush(outgoing, queuedInvocations); } - else + + // + // Flush any queued invocations. + // + flush(outgoing, queuedInvocations); + } + + void outgoingException(const Exception& ex) + { + InvocationList queuedInvocations; + { IceUtil::Mutex::Lock lock(_lock); // + // Prevent any more incoming invocations from being queued. + // + _state = Closing; + + // // The outgoing connection failed so we close the incoming connection. closed() will eventually - // be called for it. + // be called for it when the connection's dispatch count reaches zero. // if(_incoming) { _incoming->close(ICE_SCOPED_ENUM(ConnectionClose, Gracefully)); } + + // + // Save the queued invocations. + // + queuedInvocations.swap(_queue); + } + + // + // Complete the queued incoming invocations, otherwise the incoming connection will never + // complete its graceful closure. This is only necessary on the server side. + // + // The client will receive an UnknownLocalException whose reason member contains information + // about the failure. + // + for(InvocationList::iterator p = queuedInvocations.begin(); p != queuedInvocations.end(); ++p) + { + (*p)->cb->ice_exception(ex); } } @@ -226,7 +256,7 @@ public: // toBeClosed = _outgoing; queuedInvocations.swap(_queue); - _closed = true; + _state = Closed; _incoming = 0; } else @@ -288,7 +318,7 @@ public: const Current& current) { // - // We've received an invocations, either from the client via the incoming connection, or from + // We've received an invocation, either from the client via the incoming connection, or from // the server via the outgoing (bidirectional) connection. The current.con member tells us // the connection over which the request arrived. // @@ -298,18 +328,19 @@ public: { IceUtil::Mutex::Lock lock(_lock); - if(_closed) + if(_state == Closing || _state == Closed) { // - // If the incoming connection has already closed, we're done. + // If the incoming connection is closing or closed, we're done. // cb->ice_exception(ConnectionLostException(__FILE__, __LINE__)); } - else if(!_outgoing) + else if(_state == Queuing) { // // Queue the invocation until the outgoing connection is established. // + assert(!_outgoing); _queue.push_back(new QueuedInvocation(cb, paramData, current)); } else if(current.con == _incoming) @@ -403,7 +434,8 @@ private: IceUtil::Mutex _lock; - bool _closed; + enum State { Queuing, Active, Closing, Closed }; + State _state; ConnectionPtr _outgoing; // @@ -614,7 +646,7 @@ public: IceUtil::Mutex::Lock lock(_lock); _connections.insert(make_pair(outgoing, bc)); outgoing->setCloseCallback(new CloseCallbackI(this)); - bc->setOutgoing(outgoing); + bc->outgoingSuccess(outgoing); } void outgoingException(const BridgeConnectionPtr& bc, const Exception& ex) @@ -622,7 +654,7 @@ public: // // An outgoing connection attempt failed. Notify the BridgeConnection object. // - bc->setOutgoing(0); + bc->outgoingException(ex); } private: |