summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Incoming.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Incoming.cpp')
-rw-r--r--cpp/src/Ice/Incoming.cpp269
1 files changed, 149 insertions, 120 deletions
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index 648ef8de4a3..387e2ac30e3 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -39,7 +39,7 @@ IceInternal::IncomingBase::IncomingBase(Instance* instance, ConnectionI* connect
bool response, Byte compress, Int requestId) :
_response(response),
_compress(compress),
- _os(instance),
+ _os(instance, Ice::currentProtocolEncoding),
_connection(connection)
{
_current.adapter = adapter;
@@ -49,14 +49,14 @@ IceInternal::IncomingBase::IncomingBase(Instance* instance, ConnectionI* connect
IceInternal::IncomingBase::IncomingBase(IncomingBase& in) :
_current(in._current), // copy
- _os(in._os.instance()),
+ _os(in._os.instance(), Ice::currentProtocolEncoding),
_interceptorAsyncCallbackQueue(in._interceptorAsyncCallbackQueue) // copy
{
- adopt(in); // adopt everything else
+ __adopt(in); // adopt everything else
}
void
-IceInternal::IncomingBase::adopt(IncomingBase& other)
+IceInternal::IncomingBase::__adopt(IncomingBase& other)
{
_servant = other._servant;
other._servant = 0;
@@ -79,6 +79,68 @@ IceInternal::IncomingBase::adopt(IncomingBase& other)
other._connection = 0;
}
+BasicStream*
+IncomingBase::__startWriteParams()
+{
+ if(_response)
+ {
+ assert(_os.b.size() == headerSize + 4); // Reply status position.
+ assert(_current.encoding >= Ice::Encoding_1_0); // Encoding for reply is known.
+ _os.write(static_cast<Ice::Byte>(0));
+ _os.startWriteEncaps(_current.encoding);
+ }
+
+ //
+ // We still return the stream even if no response is expected. The
+ // servant code might still write some out parameters if for
+ // example a method with out parameters somehow and erroneously
+ // invoked as oneway (or if the invocation is invoked on a
+ // blobject and the blobject erroneously writes a response).
+ //
+ return &_os;
+}
+
+void
+IncomingBase::__endWriteParams(bool ok)
+{
+ if(_response)
+ {
+ *(_os.b.begin() + headerSize + 4) = ok ? replyOK : replyUserException; // Reply status position.
+ _os.endWriteEncaps();
+ }
+}
+
+void
+IncomingBase::__writeEmptyParams()
+{
+ if(_response)
+ {
+ assert(_os.b.size() == headerSize + 4); // Reply status position.
+ assert(_current.encoding >= Ice::Encoding_1_0); // Encoding for reply is known.
+ _os.write(replyOK);
+ _os.writeEmptyEncaps(_current.encoding);
+ }
+}
+
+void
+IncomingBase::__writeParamEncaps(const Byte* v, Ice::Int sz, bool ok)
+{
+ if(_response)
+ {
+ assert(_os.b.size() == headerSize + 4); // Reply status position.
+ assert(_current.encoding >= Ice::Encoding_1_0); // Encoding for reply is known.
+ _os.write(ok ? replyOK : replyUserException);
+ if(sz == 0)
+ {
+ _os.writeEmptyEncaps(_current.encoding);
+ }
+ else
+ {
+ _os.writeEncaps(v, sz);
+ }
+ }
+}
+
void
IceInternal::IncomingBase::__warning(const Exception& ex) const
{
@@ -139,10 +201,9 @@ IceInternal::IncomingBase::__servantLocatorFinished()
//
if(_response)
{
- _os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Reply status position.
_os.write(replyUserException);
- _os.startWriteEncaps();
+ _os.startWriteEncaps(_current.encoding);
_os.write(ex);
_os.endWriteEncaps();
_connection->sendResponse(&_os, _compress);
@@ -197,7 +258,6 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
if(_response)
{
- _os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Reply status position.
if(dynamic_cast<ObjectNotExistException*>(rfe))
{
@@ -249,7 +309,6 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
if(_response)
{
- _os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Reply status position.
if(const UnknownLocalException* ule = dynamic_cast<const UnknownLocalException*>(&exc))
{
@@ -315,7 +374,6 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
if(_response)
{
- _os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Reply status position.
_os.write(replyUnknownException);
ostringstream str;
@@ -344,7 +402,6 @@ IceInternal::IncomingBase::__handleException()
if(_response)
{
- _os.endWriteEncaps();
_os.b.resize(headerSize + 4); // Reply status position.
_os.write(replyUnknownException);
string reason = "unknown c++ exception";
@@ -360,13 +417,23 @@ IceInternal::IncomingBase::__handleException()
}
-IceInternal::Incoming::Incoming(Instance* instance, ConnectionI* connection,
- const ObjectAdapterPtr& adapter,
+IceInternal::Incoming::Incoming(Instance* instance, ConnectionI* connection, const ObjectAdapterPtr& adapter,
bool response, Byte compress, Int requestId) :
IncomingBase(instance, connection, adapter, response, compress, requestId),
- _is(instance),
_inParamPos(0)
{
+ //
+ // Prepare the response if necessary.
+ //
+ if(response)
+ {
+ _os.writeBlob(replyHdr, sizeof(replyHdr));
+
+ //
+ // Add the request ID.
+ //
+ _os.write(requestId);
+ }
}
@@ -390,7 +457,7 @@ IceInternal::Incoming::startOver()
//
// That's the first startOver, so almost nothing to do
//
- _inParamPos = _is.i;
+ _inParamPos = _is->i;
}
else
{
@@ -399,15 +466,8 @@ IceInternal::Incoming::startOver()
//
// Let's rewind _is and clean-up _os
//
- _is.i = _inParamPos;
-
- if(_response)
- {
- _os.endWriteEncaps();
- _os.b.resize(headerSize + 4);
- _os.write(static_cast<Byte>(0));
- _os.startWriteEncaps();
- }
+ _is->i = _inParamPos;
+ _os.b.resize(headerSize + 4); // Reply status position.
}
}
@@ -435,12 +495,14 @@ IceInternal::Incoming::setActive(IncomingAsync& cb)
}
void
-IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
+IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStream* stream)
{
+ _is = stream;
+
//
// Read the current.
//
- _current.id.__read(&_is);
+ _current.id.__read(_is);
//
// For compatibility with the old FacetPath.
@@ -448,7 +510,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
string facet;
{
vector<string> facetPath;
- _is.read(facetPath);
+ _is->read(facetPath);
if(!facetPath.empty())
{
if(facetPath.size() > 1)
@@ -460,34 +522,22 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
}
_current.facet.swap(facet);
- _is.read(_current.operation, false);
+ _is->read(_current.operation, false);
Byte b;
- _is.read(b);
+ _is->read(b);
_current.mode = static_cast<OperationMode>(b);
Int sz;
- _is.readSize(sz);
+ _is->readSize(sz);
while(sz--)
{
pair<const string, string> pr;
- _is.read(const_cast<string&>(pr.first));
- _is.read(pr.second);
+ _is->read(const_cast<string&>(pr.first));
+ _is->read(pr.second);
_current.ctx.insert(_current.ctx.end(), pr);
}
- if(_response)
- {
- assert(_os.b.size() == headerSize + 4); // Reply status position.
- _os.write(static_cast<Byte>(0));
- _os.startWriteEncaps();
- }
-
- // Initialize status to some value, to keep the compiler happy.
- Ice::Byte replyStatus = replyOK;
-
- DispatchStatus dispatchStatus = DispatchOK;
-
//
// Don't put the code above into the try block below. Exceptions
// in the code above are considered fatal, and must propagate to
@@ -513,16 +563,33 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
}
catch(const UserException& ex)
{
- _os.write(ex);
- replyStatus = replyUserException;
+ Ice::EncodingVersion encoding = _is->skipEncaps(); // Required for batch requests.
+
+ if(_response)
+ {
+ _os.write(replyUserException);
+ _os.startWriteEncaps(encoding);
+ _os.write(ex);
+ _os.endWriteEncaps();
+ _connection->sendResponse(&_os, _compress);
+ }
+ else
+ {
+ _connection->sendNoResponse();
+ }
+
+ _connection = 0;
+ return;
}
catch(const std::exception& ex)
{
+ _is->skipEncaps(); // Required for batch requests.
__handleException(ex);
return;
}
catch(...)
{
+ _is->skipEncaps(); // Required for batch requests.
__handleException();
return;
}
@@ -530,54 +597,62 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
}
}
- if(_servant)
+ try
{
- try
+ if(_servant)
{
- assert(replyStatus == replyOK);
- dispatchStatus = _servant->__dispatch(*this, _current);
- if(dispatchStatus == DispatchUserException)
- {
- replyStatus = replyUserException;
+ //
+ // DispatchAsync is a "pseudo dispatch status", used internally only
+ // to indicate async dispatch.
+ //
+ if(_servant->__dispatch(*this, _current) == DispatchAsync)
+ {
+ //
+ // If this was an asynchronous dispatch, we're done here.
+ //
+ return;
}
- if(dispatchStatus != DispatchAsync)
- {
- if(_locator && !__servantLocatorFinished())
- {
- return;
- }
- }
- }
- catch(const std::exception& ex)
- {
if(_locator && !__servantLocatorFinished())
{
return;
}
- __handleException(ex);
- return;
}
- catch(...)
+ else
{
- if(_locator && !__servantLocatorFinished())
+ //
+ // Skip the input parameters, this is required for reading
+ // the next batch request if dispatching batch requests.
+ //
+ _is->skipEncaps();
+
+ if(servantManager && servantManager->hasServant(_current.id))
{
- return;
+ throw FacetNotExistException(__FILE__, __LINE__, _current.id, _current.facet, _current.operation);
+ }
+ else
+ {
+ throw ObjectNotExistException(__FILE__, __LINE__, _current.id, _current.facet, _current.operation);
}
- __handleException();
- return;
}
}
- else if(replyStatus == replyOK)
+ catch(const std::exception& ex)
{
- if(servantManager && servantManager->hasServant(_current.id))
+ if(_servant && _locator && !__servantLocatorFinished())
{
- replyStatus = replyFacetNotExist;
+ return;
}
- else
+ __handleException(ex);
+ return;
+ }
+ catch(...)
+ {
+ if(_servant && _locator && !__servantLocatorFinished())
{
- replyStatus = replyObjectNotExist;
+ return;
}
+ __handleException();
+ return;
}
//
@@ -586,53 +661,8 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
// the caller of this operation.
//
- //
- // DispatchAsync is "pseudo dispatch status", used internally only
- // to indicate async dispatch.
- //
- if(dispatchStatus == DispatchAsync)
- {
- //
- // If this was an asynchronous dispatch, we're done here.
- //
- return;
- }
-
- assert(_connection);
-
if(_response)
{
- _os.endWriteEncaps();
-
- if(replyStatus != replyOK && replyStatus != replyUserException)
- {
- assert(replyStatus == replyObjectNotExist ||
- replyStatus == replyFacetNotExist);
-
- _os.b.resize(headerSize + 4); // Reply status position.
- _os.write(replyStatus);
-
- _current.id.__write(&_os);
-
- //
- // For compatibility with the old FacetPath.
- //
- if(_current.facet.empty())
- {
- _os.write(static_cast<string*>(0), static_cast<string*>(0));
- }
- else
- {
- _os.write(&_current.facet, &_current.facet + 1);
- }
-
- _os.write(_current.operation, false);
- }
- else
- {
- *(_os.b.begin() + headerSize + 4) = replyStatus; // Reply status position.
- }
-
_connection->sendResponse(&_os, _compress);
}
else
@@ -643,7 +673,6 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager)
_connection = 0;
}
-
bool
IceInternal::IncomingRequest::isCollocated()
{