diff options
author | Benoit Foucher <benoit@zeroc.com> | 2012-04-18 14:33:16 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2012-04-18 14:33:16 +0200 |
commit | 2ba4d4e0ca7aeade69ee00ab17d5fa1b69372ffc (patch) | |
tree | 0a381f9b284eb7ca5acc9bca5a97659d19874f9d /cpp/src/Ice/Incoming.cpp | |
parent | ICE-4828 - Buffer.h undeclared ptrdiff_t issue on OpenSUSE (diff) | |
download | ice-2ba4d4e0ca7aeade69ee00ab17d5fa1b69372ffc.tar.bz2 ice-2ba4d4e0ca7aeade69ee00ab17d5fa1b69372ffc.tar.xz ice-2ba4d4e0ca7aeade69ee00ab17d5fa1b69372ffc.zip |
Added support for encoding versioning
Diffstat (limited to 'cpp/src/Ice/Incoming.cpp')
-rw-r--r-- | cpp/src/Ice/Incoming.cpp | 269 |
1 files changed, 149 insertions, 120 deletions
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp index 648ef8de4a3..387e2ac30e3 100644 --- a/cpp/src/Ice/Incoming.cpp +++ b/cpp/src/Ice/Incoming.cpp @@ -39,7 +39,7 @@ IceInternal::IncomingBase::IncomingBase(Instance* instance, ConnectionI* connect bool response, Byte compress, Int requestId) : _response(response), _compress(compress), - _os(instance), + _os(instance, Ice::currentProtocolEncoding), _connection(connection) { _current.adapter = adapter; @@ -49,14 +49,14 @@ IceInternal::IncomingBase::IncomingBase(Instance* instance, ConnectionI* connect IceInternal::IncomingBase::IncomingBase(IncomingBase& in) : _current(in._current), // copy - _os(in._os.instance()), + _os(in._os.instance(), Ice::currentProtocolEncoding), _interceptorAsyncCallbackQueue(in._interceptorAsyncCallbackQueue) // copy { - adopt(in); // adopt everything else + __adopt(in); // adopt everything else } void -IceInternal::IncomingBase::adopt(IncomingBase& other) +IceInternal::IncomingBase::__adopt(IncomingBase& other) { _servant = other._servant; other._servant = 0; @@ -79,6 +79,68 @@ IceInternal::IncomingBase::adopt(IncomingBase& other) other._connection = 0; } +BasicStream* +IncomingBase::__startWriteParams() +{ + if(_response) + { + assert(_os.b.size() == headerSize + 4); // Reply status position. + assert(_current.encoding >= Ice::Encoding_1_0); // Encoding for reply is known. + _os.write(static_cast<Ice::Byte>(0)); + _os.startWriteEncaps(_current.encoding); + } + + // + // We still return the stream even if no response is expected. The + // servant code might still write some out parameters if for + // example a method with out parameters somehow and erroneously + // invoked as oneway (or if the invocation is invoked on a + // blobject and the blobject erroneously writes a response). + // + return &_os; +} + +void +IncomingBase::__endWriteParams(bool ok) +{ + if(_response) + { + *(_os.b.begin() + headerSize + 4) = ok ? replyOK : replyUserException; // Reply status position. + _os.endWriteEncaps(); + } +} + +void +IncomingBase::__writeEmptyParams() +{ + if(_response) + { + assert(_os.b.size() == headerSize + 4); // Reply status position. + assert(_current.encoding >= Ice::Encoding_1_0); // Encoding for reply is known. + _os.write(replyOK); + _os.writeEmptyEncaps(_current.encoding); + } +} + +void +IncomingBase::__writeParamEncaps(const Byte* v, Ice::Int sz, bool ok) +{ + if(_response) + { + assert(_os.b.size() == headerSize + 4); // Reply status position. + assert(_current.encoding >= Ice::Encoding_1_0); // Encoding for reply is known. + _os.write(ok ? replyOK : replyUserException); + if(sz == 0) + { + _os.writeEmptyEncaps(_current.encoding); + } + else + { + _os.writeEncaps(v, sz); + } + } +} + void IceInternal::IncomingBase::__warning(const Exception& ex) const { @@ -139,10 +201,9 @@ IceInternal::IncomingBase::__servantLocatorFinished() // if(_response) { - _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Reply status position. _os.write(replyUserException); - _os.startWriteEncaps(); + _os.startWriteEncaps(_current.encoding); _os.write(ex); _os.endWriteEncaps(); _connection->sendResponse(&_os, _compress); @@ -197,7 +258,6 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) if(_response) { - _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Reply status position. if(dynamic_cast<ObjectNotExistException*>(rfe)) { @@ -249,7 +309,6 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) if(_response) { - _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Reply status position. if(const UnknownLocalException* ule = dynamic_cast<const UnknownLocalException*>(&exc)) { @@ -315,7 +374,6 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) if(_response) { - _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Reply status position. _os.write(replyUnknownException); ostringstream str; @@ -344,7 +402,6 @@ IceInternal::IncomingBase::__handleException() if(_response) { - _os.endWriteEncaps(); _os.b.resize(headerSize + 4); // Reply status position. _os.write(replyUnknownException); string reason = "unknown c++ exception"; @@ -360,13 +417,23 @@ IceInternal::IncomingBase::__handleException() } -IceInternal::Incoming::Incoming(Instance* instance, ConnectionI* connection, - const ObjectAdapterPtr& adapter, +IceInternal::Incoming::Incoming(Instance* instance, ConnectionI* connection, const ObjectAdapterPtr& adapter, bool response, Byte compress, Int requestId) : IncomingBase(instance, connection, adapter, response, compress, requestId), - _is(instance), _inParamPos(0) { + // + // Prepare the response if necessary. + // + if(response) + { + _os.writeBlob(replyHdr, sizeof(replyHdr)); + + // + // Add the request ID. + // + _os.write(requestId); + } } @@ -390,7 +457,7 @@ IceInternal::Incoming::startOver() // // That's the first startOver, so almost nothing to do // - _inParamPos = _is.i; + _inParamPos = _is->i; } else { @@ -399,15 +466,8 @@ IceInternal::Incoming::startOver() // // Let's rewind _is and clean-up _os // - _is.i = _inParamPos; - - if(_response) - { - _os.endWriteEncaps(); - _os.b.resize(headerSize + 4); - _os.write(static_cast<Byte>(0)); - _os.startWriteEncaps(); - } + _is->i = _inParamPos; + _os.b.resize(headerSize + 4); // Reply status position. } } @@ -435,12 +495,14 @@ IceInternal::Incoming::setActive(IncomingAsync& cb) } void -IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) +IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStream* stream) { + _is = stream; + // // Read the current. // - _current.id.__read(&_is); + _current.id.__read(_is); // // For compatibility with the old FacetPath. @@ -448,7 +510,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) string facet; { vector<string> facetPath; - _is.read(facetPath); + _is->read(facetPath); if(!facetPath.empty()) { if(facetPath.size() > 1) @@ -460,34 +522,22 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) } _current.facet.swap(facet); - _is.read(_current.operation, false); + _is->read(_current.operation, false); Byte b; - _is.read(b); + _is->read(b); _current.mode = static_cast<OperationMode>(b); Int sz; - _is.readSize(sz); + _is->readSize(sz); while(sz--) { pair<const string, string> pr; - _is.read(const_cast<string&>(pr.first)); - _is.read(pr.second); + _is->read(const_cast<string&>(pr.first)); + _is->read(pr.second); _current.ctx.insert(_current.ctx.end(), pr); } - if(_response) - { - assert(_os.b.size() == headerSize + 4); // Reply status position. - _os.write(static_cast<Byte>(0)); - _os.startWriteEncaps(); - } - - // Initialize status to some value, to keep the compiler happy. - Ice::Byte replyStatus = replyOK; - - DispatchStatus dispatchStatus = DispatchOK; - // // Don't put the code above into the try block below. Exceptions // in the code above are considered fatal, and must propagate to @@ -513,16 +563,33 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) } catch(const UserException& ex) { - _os.write(ex); - replyStatus = replyUserException; + Ice::EncodingVersion encoding = _is->skipEncaps(); // Required for batch requests. + + if(_response) + { + _os.write(replyUserException); + _os.startWriteEncaps(encoding); + _os.write(ex); + _os.endWriteEncaps(); + _connection->sendResponse(&_os, _compress); + } + else + { + _connection->sendNoResponse(); + } + + _connection = 0; + return; } catch(const std::exception& ex) { + _is->skipEncaps(); // Required for batch requests. __handleException(ex); return; } catch(...) { + _is->skipEncaps(); // Required for batch requests. __handleException(); return; } @@ -530,54 +597,62 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) } } - if(_servant) + try { - try + if(_servant) { - assert(replyStatus == replyOK); - dispatchStatus = _servant->__dispatch(*this, _current); - if(dispatchStatus == DispatchUserException) - { - replyStatus = replyUserException; + // + // DispatchAsync is a "pseudo dispatch status", used internally only + // to indicate async dispatch. + // + if(_servant->__dispatch(*this, _current) == DispatchAsync) + { + // + // If this was an asynchronous dispatch, we're done here. + // + return; } - if(dispatchStatus != DispatchAsync) - { - if(_locator && !__servantLocatorFinished()) - { - return; - } - } - } - catch(const std::exception& ex) - { if(_locator && !__servantLocatorFinished()) { return; } - __handleException(ex); - return; } - catch(...) + else { - if(_locator && !__servantLocatorFinished()) + // + // Skip the input parameters, this is required for reading + // the next batch request if dispatching batch requests. + // + _is->skipEncaps(); + + if(servantManager && servantManager->hasServant(_current.id)) { - return; + throw FacetNotExistException(__FILE__, __LINE__, _current.id, _current.facet, _current.operation); + } + else + { + throw ObjectNotExistException(__FILE__, __LINE__, _current.id, _current.facet, _current.operation); } - __handleException(); - return; } } - else if(replyStatus == replyOK) + catch(const std::exception& ex) { - if(servantManager && servantManager->hasServant(_current.id)) + if(_servant && _locator && !__servantLocatorFinished()) { - replyStatus = replyFacetNotExist; + return; } - else + __handleException(ex); + return; + } + catch(...) + { + if(_servant && _locator && !__servantLocatorFinished()) { - replyStatus = replyObjectNotExist; + return; } + __handleException(); + return; } // @@ -586,53 +661,8 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) // the caller of this operation. // - // - // DispatchAsync is "pseudo dispatch status", used internally only - // to indicate async dispatch. - // - if(dispatchStatus == DispatchAsync) - { - // - // If this was an asynchronous dispatch, we're done here. - // - return; - } - - assert(_connection); - if(_response) { - _os.endWriteEncaps(); - - if(replyStatus != replyOK && replyStatus != replyUserException) - { - assert(replyStatus == replyObjectNotExist || - replyStatus == replyFacetNotExist); - - _os.b.resize(headerSize + 4); // Reply status position. - _os.write(replyStatus); - - _current.id.__write(&_os); - - // - // For compatibility with the old FacetPath. - // - if(_current.facet.empty()) - { - _os.write(static_cast<string*>(0), static_cast<string*>(0)); - } - else - { - _os.write(&_current.facet, &_current.facet + 1); - } - - _os.write(_current.operation, false); - } - else - { - *(_os.b.begin() + headerSize + 4) = replyStatus; // Reply status position. - } - _connection->sendResponse(&_os, _compress); } else @@ -643,7 +673,6 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager) _connection = 0; } - bool IceInternal::IncomingRequest::isCollocated() { |