diff options
Diffstat (limited to 'cppe/src')
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 80 | ||||
-rw-r--r-- | cppe/src/IceE/Incoming.cpp | 66 |
2 files changed, 64 insertions, 82 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index 4c6c2c5cfc7..c4e57f4644a 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -401,8 +401,8 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* { #ifndef ICEE_PURE_CLIENT Int invokeNum = 0; - ServantManagerPtr servantManager; - ObjectAdapterPtr adapter; + ServantManager* servantManager; + ObjectAdapter* adapter; parseMessage(*is, requestId, invokeNum, servantManager, adapter); #else @@ -817,6 +817,20 @@ Ice::Connection::setAdapter(const ObjectAdapterPtr& adapter) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // + // TODO: Add support for blocking mode here! + // + + // + // Wait for all the incoming to be dispatched. We can't modify the + // _adapter and _servantManager if there's incoming because the + // Incoming object is using plain pointers for these objects. + // + while(_dispatchCount > 0) + { + wait(); + } + if(_exception.get()) { _exception->ice_throw(); @@ -834,11 +848,6 @@ Ice::Connection::setAdapter(const ObjectAdapterPtr& adapter) { _servantManager = 0; } - - // - // We never change the thread pool with which we were initially - // registered, even if we add or remove an object adapter. - // } ObjectAdapterPtr @@ -1433,7 +1442,7 @@ Ice::Connection::initiateShutdown() const void Ice::Connection::parseMessage(BasicStream& stream, Int& requestId #ifndef ICEE_PURE_CLIENT - ,Int& invokeNum, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter + ,Int& invokeNum, ServantManager*& servantManager, ObjectAdapter*& adapter #endif ) { @@ -1528,8 +1537,8 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId traceRequest("received request", stream, _logger, _traceLevels); stream.read(requestId); invokeNum = 1; - servantManager = _servantManager; - adapter = _adapter; + servantManager = _servantManager.get(); + adapter = _adapter.get(); ++_dispatchCount; } break; @@ -1552,8 +1561,8 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId invokeNum = 0; throw NegativeSizeException(__FILE__, __LINE__); } - servantManager = _servantManager; - adapter = _adapter; + servantManager = _servantManager.get(); + adapter = _adapter.get(); _dispatchCount += invokeNum; } break; @@ -1635,8 +1644,8 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId #ifndef ICEE_PURE_CLIENT void -Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, - const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter) +Ice::Connection::invokeAll(Incoming& in, Int invokeNum, Int requestId, ServantManager* servantManager, + ObjectAdapter* adapter) { // // Note: In contrast to other private or protected methods, this @@ -1651,11 +1660,8 @@ Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, // Prepare the invocation. // bool response = requestId != 0; - Incoming in(_instance.get(), this, adapter, response); - BasicStream* is = in.is(); - stream.swap(*is); BasicStream* os = in.os(); - + // // Prepare the response if necessary. // @@ -1670,15 +1676,9 @@ Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, os->write(requestId); } - in.invoke(servantManager); - - // - // If there are more invocations, we need the stream back. - // - if(--invokeNum > 0) - { - stream.swap(*is); - } + in.invoke(response, adapter, servantManager); + + --invokeNum; } } catch(const LocalException& ex) @@ -1848,25 +1848,25 @@ Ice::Connection::run() bool closed = false; + BasicStream stream(_instance.get()); +#ifndef ICEE_PURE_CLIENT + Incoming in(_instance.get(), this, stream); +#endif + while(!closed) { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - - BasicStream stream(_instance.get()); - readStream(stream); - Int requestId = 0; #ifndef ICEE_PURE_CLIENT Int invokeNum = 0; - ServantManagerPtr servantManager; - ObjectAdapterPtr adapter; + ServantManager* servantManager; + ObjectAdapter* adapter; + in.os()->resize(0); + in.is()->resize(0); #endif + + readStream(stream); - auto_ptr<LocalException> exception; - + auto_ptr<LocalException> exception; map<Int, OutgoingM*> requests; { @@ -1932,7 +1932,7 @@ Ice::Connection::run() // so that nested calls are possible. // #ifndef ICEE_PURE_CLIENT - invokeAll(stream, invokeNum, requestId, servantManager, adapter); + invokeAll(in, invokeNum, requestId, servantManager, adapter); #endif for(map<Int, OutgoingM*>::iterator p = requests.begin(); p != requests.end(); ++p) { diff --git a/cppe/src/IceE/Incoming.cpp b/cppe/src/IceE/Incoming.cpp index ea0a0fbba57..a9ecb60edd2 100644 --- a/cppe/src/IceE/Incoming.cpp +++ b/cppe/src/IceE/Incoming.cpp @@ -23,36 +23,22 @@ using namespace std; using namespace Ice; using namespace IceInternal; -IceInternal::IncomingBase::IncomingBase(Instance* instance, Connection* connection, - const ObjectAdapterPtr& adapter, - bool response) : - _response(response), +IceInternal::Incoming::Incoming(Instance* instance, Connection* connection, BasicStream& is) : _os(instance), + _is(is), _connection(connection) { - _current.adapter = adapter; - _current.con = _connection; -} - -IceInternal::IncomingBase::IncomingBase(IncomingBase& in) : - _current(in._current), - _servant(in._servant), - _cookie(in._cookie), - _response(in._response), - _os(in._os.instance()), - _connection(in._connection) -{ - _os.swap(in._os); + _current.con = connection; } void -IceInternal::IncomingBase::__warning(const Exception& ex) const +IceInternal::Incoming::__warning(const Exception& ex) const { __warning(ex.toString()); } void -IceInternal::IncomingBase::__warning(const string& msg) const +IceInternal::Incoming::__warning(const string& msg) const { Warning out(_os.instance()->logger()); out << "dispatch exception: " << msg; @@ -61,17 +47,12 @@ IceInternal::IncomingBase::__warning(const string& msg) const out << "\noperation: " << _current.operation; } -IceInternal::Incoming::Incoming(Instance* instance, Connection* connection, - const ObjectAdapterPtr& adapter, - bool response) : - IncomingBase(instance, connection, adapter, response), - _is(instance) -{ -} - void -IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) +IceInternal::Incoming::invoke(bool response, ObjectAdapter* adapter, ServantManager* servantManager) { + _current.adapter = adapter; + _current.ctx.clear(); + // // Read the current. // @@ -111,7 +92,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) _is.startReadEncaps(); - if(_response) + if(response) { assert(_os.b.size() == headerSize + 4); // Dispatch status position. _os.write(static_cast<Byte>(0)); @@ -129,12 +110,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) try { + Ice::ObjectPtr servant; if(servantManager) { - _servant = servantManager->findServant(_current.id, _current.facet); + servant = servantManager->findServant(_current.id, _current.facet); } - if(!_servant) + if(!servant) { if(servantManager && servantManager->hasServant(_current.id)) { @@ -147,7 +129,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) } else { - status = _servant->__dispatch(*this, _current); + status = servant->__dispatch(*this, _current); } } catch(RequestFailedException& ex) @@ -174,7 +156,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -229,7 +211,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -253,7 +235,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -277,7 +259,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -301,7 +283,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -325,7 +307,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -349,7 +331,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(ex); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -373,7 +355,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning(string("std::exception: ") + ex.what()); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -398,7 +380,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) __warning("unknown c++ exception"); } - if(_response) + if(response) { _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Dispatch status position. @@ -422,7 +404,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) _is.endReadEncaps(); - if(_response) + if(response) { _os.endWriteEncaps(); |