summaryrefslogtreecommitdiff
path: root/cppe
diff options
context:
space:
mode:
Diffstat (limited to 'cppe')
-rwxr-xr-xcppe/include/IceE/Connection.h5
-rwxr-xr-xcppe/include/IceE/Current.h4
-rw-r--r--cppe/include/IceE/Incoming.h37
-rwxr-xr-xcppe/src/IceE/Connection.cpp80
-rw-r--r--cppe/src/IceE/Incoming.cpp66
5 files changed, 81 insertions, 111 deletions
diff --git a/cppe/include/IceE/Connection.h b/cppe/include/IceE/Connection.h
index 1ad42e6804b..40fa1553f2b 100755
--- a/cppe/include/IceE/Connection.h
+++ b/cppe/include/IceE/Connection.h
@@ -40,6 +40,7 @@ namespace IceInternal
class Outgoing;
#ifndef ICEE_PURE_BLOCKING_CLIENT
class OutgoingM;
+class Incoming;
#endif
class BasicStream;
@@ -153,8 +154,8 @@ private:
void initiateShutdown() const;
#ifndef ICEE_PURE_CLIENT
- void parseMessage(IceInternal::BasicStream&, Int&, Int&, IceInternal::ServantManagerPtr&, ObjectAdapterPtr&);
- void invokeAll(IceInternal::BasicStream&, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&);
+ void parseMessage(IceInternal::BasicStream&, Int&, Int&, IceInternal::ServantManager*&, ObjectAdapter*&);
+ void invokeAll(IceInternal::Incoming&, Int, Int, IceInternal::ServantManager*, ObjectAdapter*);
#else
void parseMessage(IceInternal::BasicStream&, Int&);
#endif
diff --git a/cppe/include/IceE/Current.h b/cppe/include/IceE/Current.h
index 9ff51eccb71..d62d294ea24 100755
--- a/cppe/include/IceE/Current.h
+++ b/cppe/include/IceE/Current.h
@@ -20,8 +20,8 @@ namespace Ice
struct Current
{
- ::Ice::ObjectAdapterPtr adapter;
- ::Ice::ConnectionPtr con;
+ ::Ice::ObjectAdapter* adapter;
+ ::Ice::Connection* con;
::Ice::Identity id;
::std::string facet;
::std::string operation;
diff --git a/cppe/include/IceE/Incoming.h b/cppe/include/IceE/Incoming.h
index f0e94e520b0..fc20704b5fc 100644
--- a/cppe/include/IceE/Incoming.h
+++ b/cppe/include/IceE/Incoming.h
@@ -19,23 +19,27 @@
namespace IceInternal
{
-class ICE_API IncomingBase : private IceUtil::noncopyable
+class ICE_API Incoming : private IceUtil::noncopyable
{
-protected:
+public:
+
+ Incoming(Instance*, Ice::Connection*, BasicStream&);
+
+ void invoke(bool, Ice::ObjectAdapter*, ServantManager*);
+
+ // Inlined for speed optimization.
+ BasicStream* os() { return &_os; }
+ BasicStream* is() { return &_is; }
- IncomingBase(Instance*, Ice::Connection*, const Ice::ObjectAdapterPtr&, bool);
- IncomingBase(IncomingBase& in); // Adopts the argument. It must not be used afterwards.
+protected:
void __warning(const Ice::Exception&) const;
void __warning(const std::string&) const;
Ice::Current _current;
- Ice::ObjectPtr _servant;
- Ice::LocalObjectPtr _cookie;
-
bool _response;
-
BasicStream _os;
+ BasicStream& _is;
//
// Optimization. The connection may not be deleted while a
@@ -44,23 +48,6 @@ protected:
Ice::Connection* _connection;
};
-class ICE_API Incoming : public IncomingBase
-{
-public:
-
- Incoming(Instance*, Ice::Connection*, const Ice::ObjectAdapterPtr&, bool);
-
- void invoke(const ServantManagerPtr&);
-
- // Inlined for speed optimization.
- BasicStream* is() { return &_is; }
- BasicStream* os() { return &_os; }
-
-private:
-
- BasicStream _is;
-};
-
}
#endif
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index 4c6c2c5cfc7..c4e57f4644a 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -401,8 +401,8 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing*
{
#ifndef ICEE_PURE_CLIENT
Int invokeNum = 0;
- ServantManagerPtr servantManager;
- ObjectAdapterPtr adapter;
+ ServantManager* servantManager;
+ ObjectAdapter* adapter;
parseMessage(*is, requestId, invokeNum, servantManager, adapter);
#else
@@ -817,6 +817,20 @@ Ice::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ //
+ // TODO: Add support for blocking mode here!
+ //
+
+ //
+ // Wait for all the incoming to be dispatched. We can't modify the
+ // _adapter and _servantManager if there's incoming because the
+ // Incoming object is using plain pointers for these objects.
+ //
+ while(_dispatchCount > 0)
+ {
+ wait();
+ }
+
if(_exception.get())
{
_exception->ice_throw();
@@ -834,11 +848,6 @@ Ice::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
_servantManager = 0;
}
-
- //
- // We never change the thread pool with which we were initially
- // registered, even if we add or remove an object adapter.
- //
}
ObjectAdapterPtr
@@ -1433,7 +1442,7 @@ Ice::Connection::initiateShutdown() const
void
Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
#ifndef ICEE_PURE_CLIENT
- ,Int& invokeNum, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter
+ ,Int& invokeNum, ServantManager*& servantManager, ObjectAdapter*& adapter
#endif
)
{
@@ -1528,8 +1537,8 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
traceRequest("received request", stream, _logger, _traceLevels);
stream.read(requestId);
invokeNum = 1;
- servantManager = _servantManager;
- adapter = _adapter;
+ servantManager = _servantManager.get();
+ adapter = _adapter.get();
++_dispatchCount;
}
break;
@@ -1552,8 +1561,8 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
invokeNum = 0;
throw NegativeSizeException(__FILE__, __LINE__);
}
- servantManager = _servantManager;
- adapter = _adapter;
+ servantManager = _servantManager.get();
+ adapter = _adapter.get();
_dispatchCount += invokeNum;
}
break;
@@ -1635,8 +1644,8 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId
#ifndef ICEE_PURE_CLIENT
void
-Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId,
- const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter)
+Ice::Connection::invokeAll(Incoming& in, Int invokeNum, Int requestId, ServantManager* servantManager,
+ ObjectAdapter* adapter)
{
//
// Note: In contrast to other private or protected methods, this
@@ -1651,11 +1660,8 @@ Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId,
// Prepare the invocation.
//
bool response = requestId != 0;
- Incoming in(_instance.get(), this, adapter, response);
- BasicStream* is = in.is();
- stream.swap(*is);
BasicStream* os = in.os();
-
+
//
// Prepare the response if necessary.
//
@@ -1670,15 +1676,9 @@ Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId,
os->write(requestId);
}
- in.invoke(servantManager);
-
- //
- // If there are more invocations, we need the stream back.
- //
- if(--invokeNum > 0)
- {
- stream.swap(*is);
- }
+ in.invoke(response, adapter, servantManager);
+
+ --invokeNum;
}
}
catch(const LocalException& ex)
@@ -1848,25 +1848,25 @@ Ice::Connection::run()
bool closed = false;
+ BasicStream stream(_instance.get());
+#ifndef ICEE_PURE_CLIENT
+ Incoming in(_instance.get(), this, stream);
+#endif
+
while(!closed)
{
- //
- // We must accept new connections outside the thread
- // synchronization, because we use blocking accept.
- //
-
- BasicStream stream(_instance.get());
- readStream(stream);
-
Int requestId = 0;
#ifndef ICEE_PURE_CLIENT
Int invokeNum = 0;
- ServantManagerPtr servantManager;
- ObjectAdapterPtr adapter;
+ ServantManager* servantManager;
+ ObjectAdapter* adapter;
+ in.os()->resize(0);
+ in.is()->resize(0);
#endif
+
+ readStream(stream);
- auto_ptr<LocalException> exception;
-
+ auto_ptr<LocalException> exception;
map<Int, OutgoingM*> requests;
{
@@ -1932,7 +1932,7 @@ Ice::Connection::run()
// so that nested calls are possible.
//
#ifndef ICEE_PURE_CLIENT
- invokeAll(stream, invokeNum, requestId, servantManager, adapter);
+ invokeAll(in, invokeNum, requestId, servantManager, adapter);
#endif
for(map<Int, OutgoingM*>::iterator p = requests.begin(); p != requests.end(); ++p)
{
diff --git a/cppe/src/IceE/Incoming.cpp b/cppe/src/IceE/Incoming.cpp
index ea0a0fbba57..a9ecb60edd2 100644
--- a/cppe/src/IceE/Incoming.cpp
+++ b/cppe/src/IceE/Incoming.cpp
@@ -23,36 +23,22 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
-IceInternal::IncomingBase::IncomingBase(Instance* instance, Connection* connection,
- const ObjectAdapterPtr& adapter,
- bool response) :
- _response(response),
+IceInternal::Incoming::Incoming(Instance* instance, Connection* connection, BasicStream& is) :
_os(instance),
+ _is(is),
_connection(connection)
{
- _current.adapter = adapter;
- _current.con = _connection;
-}
-
-IceInternal::IncomingBase::IncomingBase(IncomingBase& in) :
- _current(in._current),
- _servant(in._servant),
- _cookie(in._cookie),
- _response(in._response),
- _os(in._os.instance()),
- _connection(in._connection)
-{
- _os.swap(in._os);
+ _current.con = connection;
}
void
-IceInternal::IncomingBase::__warning(const Exception& ex) const
+IceInternal::Incoming::__warning(const Exception& ex) const
{
__warning(ex.toString());
}
void
-IceInternal::IncomingBase::__warning(const string& msg) const
+IceInternal::Incoming::__warning(const string& msg) const
{
Warning out(_os.instance()->logger());
out << "dispatch exception: " << msg;
@@ -61,17 +47,12 @@ IceInternal::IncomingBase::__warning(const string& msg) const
out << "\noperation: " << _current.operation;
}
-IceInternal::Incoming::Incoming(Instance* instance, Connection* connection,
- const ObjectAdapterPtr& adapter,
- bool response) :
- IncomingBase(instance, connection, adapter, response),
- _is(instance)
-{
-}
-
void
-IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
+IceInternal::Incoming::invoke(bool response, ObjectAdapter* adapter, ServantManager* servantManager)
{
+ _current.adapter = adapter;
+ _current.ctx.clear();
+
//
// Read the current.
//
@@ -111,7 +92,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
_is.startReadEncaps();
- if(_response)
+ if(response)
{
assert(_os.b.size() == headerSize + 4); // Dispatch status position.
_os.write(static_cast<Byte>(0));
@@ -129,12 +110,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
try
{
+ Ice::ObjectPtr servant;
if(servantManager)
{
- _servant = servantManager->findServant(_current.id, _current.facet);
+ servant = servantManager->findServant(_current.id, _current.facet);
}
- if(!_servant)
+ if(!servant)
{
if(servantManager && servantManager->hasServant(_current.id))
{
@@ -147,7 +129,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
}
else
{
- status = _servant->__dispatch(*this, _current);
+ status = servant->__dispatch(*this, _current);
}
}
catch(RequestFailedException& ex)
@@ -174,7 +156,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -229,7 +211,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -253,7 +235,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -277,7 +259,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -301,7 +283,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -325,7 +307,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -349,7 +331,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(ex);
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -373,7 +355,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning(string("std::exception: ") + ex.what());
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -398,7 +380,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
__warning("unknown c++ exception");
}
- if(_response)
+ if(response)
{
_os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Dispatch status position.
@@ -422,7 +404,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
_is.endReadEncaps();
- if(_response)
+ if(response)
{
_os.endWriteEncaps();