diff options
Diffstat (limited to 'cpp/src/Ice/Collector.cpp')
-rw-r--r-- | cpp/src/Ice/Collector.cpp | 92 |
1 files changed, 59 insertions, 33 deletions
diff --git a/cpp/src/Ice/Collector.cpp b/cpp/src/Ice/Collector.cpp index 482abaef791..bc40066723f 100644 --- a/cpp/src/Ice/Collector.cpp +++ b/cpp/src/Ice/Collector.cpp @@ -19,6 +19,7 @@ #include <Ice/Endpoint.h> #include <Ice/Incoming.h> #include <Ice/LocalException.h> +#include <Ice/Protocol.h> #include <Ice/Functional.h> using namespace std; @@ -83,6 +84,7 @@ IceInternal::Collector::message(Stream& stream) Incoming in(_instance, _adapter); Stream* os = in.os(); bool invoke = false; + bool batch = false; bool response = false; { @@ -101,17 +103,17 @@ IceInternal::Collector::message(Stream& stream) stream.i = stream.b.begin() + 2; Byte messageType; stream.read(messageType); - stream.i = stream.b.begin() + 7; + stream.i = stream.b.begin() + headerSize; // // Write partial message header // - os->write(Byte(0)); // Protocol version - os->write(Byte(0)); // Encoding version + os->write(protocolVersion); + os->write(encodingVersion); switch (messageType) { - case 0: // Request + case requestMsg: { if (_state == StateClosing) { @@ -126,20 +128,37 @@ IceInternal::Collector::message(Stream& stream) invoke = true; Int requestId; stream.read(requestId); - if (!_endpoint->oneway() && - requestId != 0) // 0 means oneway + if (!_endpoint->oneway() && requestId != 0) // 0 means oneway { response = true; ++_responseCount; - os->write(Byte(1)); // Message type (reply) + os->write(replyMsg); os->write(Int(0)); // Message size (placeholder) - os->write(requestId); // Request id + os->write(requestId); } } break; } - case 1: // Reply + case requestBatchMsg: + { + if (_state == StateClosing) + { + traceRequest("received batch request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); + } + else + { + traceRequest("received batch request", + stream, _logger, _traceLevels); + invoke = true; + batch = true; + } + break; + } + + case replyMsg: { traceReply("received reply on server side\n" "(invalid, closing connection)", @@ -148,7 +167,7 @@ IceInternal::Collector::message(Stream& stream) break; } - case 2: // CloseConnection + case closeConnectionMsg: { traceHeader("received close connection on server side\n" "(invalid, closing connection)", @@ -182,27 +201,34 @@ IceInternal::Collector::message(Stream& stream) if (invoke) { - try - { - in.invoke(stream); - } - catch(const LocalException& ex) + do { - JTCSyncT<JTCRecursiveMutex> sync(*this); - warning(ex); - setState(StateClosed); - return; - } - catch(...) - { - JTCSyncT<JTCRecursiveMutex> sync(*this); - string s("server exception:\n"); - s += "unknown exception (no further information available)\n"; - s += _transceiver->toString(); - _logger->warning(s); - setState(StateClosed); - return; + try + { + in.invoke(stream); + + if (batch) // If we're in batch mode, we need the input stream back + { + stream.swap(*in.is()); + } + } + catch(const LocalException& ex) + { + JTCSyncT<JTCRecursiveMutex> sync(*this); + warning(ex); + setState(StateClosed); + } + catch(...) + { + JTCSyncT<JTCRecursiveMutex> sync(*this); + string s("server exception:\n"); + s += "unknown exception (no further information available)\n"; + s += _transceiver->toString(); + _logger->warning(s); + setState(StateClosed); + } } + while (batch && stream.i < stream.b.end()); } if (response) @@ -405,10 +431,10 @@ void IceInternal::Collector::closeConnection() { Stream os(_instance); - os.write(Byte(0)); // Protocol version - os.write(Byte(0)); // Encoding version - os.write(Byte(2)); // Message type = CloseConnection - os.write(Int(7)); // Message size + os.write(protocolVersion); + os.write(encodingVersion); + os.write(closeConnectionMsg); + os.write(headerSize); // Message size os.i = os.b.begin(); traceHeader("sending close connection", os, _logger, _traceLevels); _transceiver->write(os, _endpoint->timeout()); |