summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Collector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Collector.cpp')
-rw-r--r--cpp/src/Ice/Collector.cpp92
1 files changed, 59 insertions, 33 deletions
diff --git a/cpp/src/Ice/Collector.cpp b/cpp/src/Ice/Collector.cpp
index 482abaef791..bc40066723f 100644
--- a/cpp/src/Ice/Collector.cpp
+++ b/cpp/src/Ice/Collector.cpp
@@ -19,6 +19,7 @@
#include <Ice/Endpoint.h>
#include <Ice/Incoming.h>
#include <Ice/LocalException.h>
+#include <Ice/Protocol.h>
#include <Ice/Functional.h>
using namespace std;
@@ -83,6 +84,7 @@ IceInternal::Collector::message(Stream& stream)
Incoming in(_instance, _adapter);
Stream* os = in.os();
bool invoke = false;
+ bool batch = false;
bool response = false;
{
@@ -101,17 +103,17 @@ IceInternal::Collector::message(Stream& stream)
stream.i = stream.b.begin() + 2;
Byte messageType;
stream.read(messageType);
- stream.i = stream.b.begin() + 7;
+ stream.i = stream.b.begin() + headerSize;
//
// Write partial message header
//
- os->write(Byte(0)); // Protocol version
- os->write(Byte(0)); // Encoding version
+ os->write(protocolVersion);
+ os->write(encodingVersion);
switch (messageType)
{
- case 0: // Request
+ case requestMsg:
{
if (_state == StateClosing)
{
@@ -126,20 +128,37 @@ IceInternal::Collector::message(Stream& stream)
invoke = true;
Int requestId;
stream.read(requestId);
- if (!_endpoint->oneway() &&
- requestId != 0) // 0 means oneway
+ if (!_endpoint->oneway() && requestId != 0) // 0 means oneway
{
response = true;
++_responseCount;
- os->write(Byte(1)); // Message type (reply)
+ os->write(replyMsg);
os->write(Int(0)); // Message size (placeholder)
- os->write(requestId); // Request id
+ os->write(requestId);
}
}
break;
}
- case 1: // Reply
+ case requestBatchMsg:
+ {
+ if (_state == StateClosing)
+ {
+ traceRequest("received batch request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceRequest("received batch request",
+ stream, _logger, _traceLevels);
+ invoke = true;
+ batch = true;
+ }
+ break;
+ }
+
+ case replyMsg:
{
traceReply("received reply on server side\n"
"(invalid, closing connection)",
@@ -148,7 +167,7 @@ IceInternal::Collector::message(Stream& stream)
break;
}
- case 2: // CloseConnection
+ case closeConnectionMsg:
{
traceHeader("received close connection on server side\n"
"(invalid, closing connection)",
@@ -182,27 +201,34 @@ IceInternal::Collector::message(Stream& stream)
if (invoke)
{
- try
- {
- in.invoke(stream);
- }
- catch(const LocalException& ex)
+ do
{
- JTCSyncT<JTCRecursiveMutex> sync(*this);
- warning(ex);
- setState(StateClosed);
- return;
- }
- catch(...)
- {
- JTCSyncT<JTCRecursiveMutex> sync(*this);
- string s("server exception:\n");
- s += "unknown exception (no further information available)\n";
- s += _transceiver->toString();
- _logger->warning(s);
- setState(StateClosed);
- return;
+ try
+ {
+ in.invoke(stream);
+
+ if (batch) // If we're in batch mode, we need the input stream back
+ {
+ stream.swap(*in.is());
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+ warning(ex);
+ setState(StateClosed);
+ }
+ catch(...)
+ {
+ JTCSyncT<JTCRecursiveMutex> sync(*this);
+ string s("server exception:\n");
+ s += "unknown exception (no further information available)\n";
+ s += _transceiver->toString();
+ _logger->warning(s);
+ setState(StateClosed);
+ }
}
+ while (batch && stream.i < stream.b.end());
}
if (response)
@@ -405,10 +431,10 @@ void
IceInternal::Collector::closeConnection()
{
Stream os(_instance);
- os.write(Byte(0)); // Protocol version
- os.write(Byte(0)); // Encoding version
- os.write(Byte(2)); // Message type = CloseConnection
- os.write(Int(7)); // Message size
+ os.write(protocolVersion);
+ os.write(encodingVersion);
+ os.write(closeConnectionMsg);
+ os.write(headerSize); // Message size
os.i = os.b.begin();
traceHeader("sending close connection", os, _logger, _traceLevels);
_transceiver->write(os, _endpoint->timeout());