diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-12-01 14:02:05 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-12-01 14:02:05 +0100 |
commit | cd63c3e37dde04051fb2f4631f788bed7b48937b (patch) | |
tree | ae2c581bd228ec42c6eb46bf83a88f1174623bac /cpp/src/Ice/ConnectionI.cpp | |
parent | SLES RPM fixes (diff) | |
download | ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.bz2 ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.xz ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.zip |
Added support for Ice::Dispatcher
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 139 |
1 files changed, 136 insertions, 3 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 6218e86c91d..c0ad3585e5f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -55,6 +55,69 @@ private: Ice::ConnectionI* _connection; }; +class DispatchDispatcherCall : public DispatcherCall +{ +public: + + DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, + const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId, + Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, + const OutgoingAsyncPtr& outAsync, BasicStream& stream) : + _connection(connection), + _startCB(startCB), + _sentCBs(sentCBs), + _compress(compress), + _requestId(requestId), + _invokeNum(invokeNum), + _servantManager(servantManager), + _adapter(adapter), + _outAsync(outAsync), + _stream(stream.instance()) + { + _stream.swap(stream); + } + + virtual void + run() + { + _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, + _outAsync, _stream); + } + +private: + + ConnectionIPtr _connection; + ConnectionI::StartCallbackPtr _startCB; + vector<OutgoingAsyncMessageCallbackPtr> _sentCBs; + Byte _compress; + Int _requestId; + Int _invokeNum; + ServantManagerPtr _servantManager; + ObjectAdapterPtr _adapter; + OutgoingAsyncPtr _outAsync; + BasicStream _stream; +}; + +class FinishDispatcherCall : public DispatcherCall +{ +public: + + FinishDispatcherCall(const Ice::ConnectionIPtr& connection) : + _connection(connection) + { + } + + virtual void + run() + { + _connection->finish(); + } + +private: + + ConnectionIPtr _connection; +}; + } void @@ -1292,6 +1355,41 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) io.completed(); } + if(_dispatcher) + { + try + { + _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum, + servantManager, adapter, outAsync, current.stream), this); + } + catch(const std::exception& ex) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\n" << ex << '\n' << _desc; + } + } + catch(...) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc; + } + } + } + else + { + dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream); + } +} + +void +ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, + Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, + const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) +{ // // Notify the factory that the connection establishment and // validation has completed. @@ -1315,7 +1413,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // if(outAsync) { - outAsync->__finished(current.stream); + outAsync->__finished(stream); } // @@ -1325,7 +1423,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // if(invokeNum) { - invokeAll(current.stream, invokeNum, requestId, compress, servantManager, adapter); + invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); } } @@ -1343,11 +1441,45 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(_startCallback || !_sendStreams.empty() || !_asyncRequests.empty()) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty()) + { + finish(); + return; + } + + if(!_dispatcher) { current.ioCompleted(); + finish(); } + else + { + try + { + _dispatcher->dispatch(new FinishDispatcherCall(this), this); + } + catch(const std::exception& ex) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\n" << ex << '\n' << _desc; + } + } + catch(...) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc; + } + } + } +} +void +Ice::ConnectionI::finish() +{ if(_startCallback) { _startCallback->connectionStartFailed(this, *_exception.get()); @@ -1527,6 +1659,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _connector(connector), _endpoint(endpoint), _adapter(adapter), + _dispatcher(_instance->initializationData().dispatcher), // Cached for better performance. _logger(_instance->initializationData().logger), // Cached for better performance. _traceLevels(_instance->traceLevels()), // Cached for better performance. _timer(_instance->timer()), // Cached for better performance. |