summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-12-01 14:02:05 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-12-01 14:02:05 +0100
commitcd63c3e37dde04051fb2f4631f788bed7b48937b (patch)
treeae2c581bd228ec42c6eb46bf83a88f1174623bac /cpp/src/Ice/ConnectionI.cpp
parentSLES RPM fixes (diff)
downloadice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.bz2
ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.xz
ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.zip
Added support for Ice::Dispatcher
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp139
1 files changed, 136 insertions, 3 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 6218e86c91d..c0ad3585e5f 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -55,6 +55,69 @@ private:
Ice::ConnectionI* _connection;
};
+class DispatchDispatcherCall : public DispatcherCall
+{
+public:
+
+ DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
+ const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId,
+ Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
+ const OutgoingAsyncPtr& outAsync, BasicStream& stream) :
+ _connection(connection),
+ _startCB(startCB),
+ _sentCBs(sentCBs),
+ _compress(compress),
+ _requestId(requestId),
+ _invokeNum(invokeNum),
+ _servantManager(servantManager),
+ _adapter(adapter),
+ _outAsync(outAsync),
+ _stream(stream.instance())
+ {
+ _stream.swap(stream);
+ }
+
+ virtual void
+ run()
+ {
+ _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter,
+ _outAsync, _stream);
+ }
+
+private:
+
+ ConnectionIPtr _connection;
+ ConnectionI::StartCallbackPtr _startCB;
+ vector<OutgoingAsyncMessageCallbackPtr> _sentCBs;
+ Byte _compress;
+ Int _requestId;
+ Int _invokeNum;
+ ServantManagerPtr _servantManager;
+ ObjectAdapterPtr _adapter;
+ OutgoingAsyncPtr _outAsync;
+ BasicStream _stream;
+};
+
+class FinishDispatcherCall : public DispatcherCall
+{
+public:
+
+ FinishDispatcherCall(const Ice::ConnectionIPtr& connection) :
+ _connection(connection)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _connection->finish();
+ }
+
+private:
+
+ ConnectionIPtr _connection;
+};
+
}
void
@@ -1292,6 +1355,41 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
io.completed();
}
+ if(_dispatcher)
+ {
+ try
+ {
+ _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum,
+ servantManager, adapter, outAsync, current.stream), this);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ }
+ }
+ else
+ {
+ dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream);
+ }
+}
+
+void
+ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs,
+ Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
+ const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream)
+{
//
// Notify the factory that the connection establishment and
// validation has completed.
@@ -1315,7 +1413,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
if(outAsync)
{
- outAsync->__finished(current.stream);
+ outAsync->__finished(stream);
}
//
@@ -1325,7 +1423,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
if(invokeNum)
{
- invokeAll(current.stream, invokeNum, requestId, compress, servantManager, adapter);
+ invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
}
}
@@ -1343,11 +1441,45 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
// to call code that will potentially block (this avoids promoting a new leader and
// unecessary thread creation, especially if this is called on shutdown).
//
- if(_startCallback || !_sendStreams.empty() || !_asyncRequests.empty())
+ if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty())
+ {
+ finish();
+ return;
+ }
+
+ if(!_dispatcher)
{
current.ioCompleted();
+ finish();
}
+ else
+ {
+ try
+ {
+ _dispatcher->dispatch(new FinishDispatcherCall(this), this);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ }
+ }
+}
+void
+Ice::ConnectionI::finish()
+{
if(_startCallback)
{
_startCallback->connectionStartFailed(this, *_exception.get());
@@ -1527,6 +1659,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_connector(connector),
_endpoint(endpoint),
_adapter(adapter),
+ _dispatcher(_instance->initializationData().dispatcher), // Cached for better performance.
_logger(_instance->initializationData().logger), // Cached for better performance.
_traceLevels(_instance->traceLevels()), // Cached for better performance.
_timer(_instance->timer()), // Cached for better performance.