summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Emitter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Emitter.cpp')
-rw-r--r--cpp/src/Ice/Emitter.cpp135
1 files changed, 122 insertions, 13 deletions
diff --git a/cpp/src/Ice/Emitter.cpp b/cpp/src/Ice/Emitter.cpp
index 4b69bf41a9d..5a5c4537893 100644
--- a/cpp/src/Ice/Emitter.cpp
+++ b/cpp/src/Ice/Emitter.cpp
@@ -19,6 +19,7 @@
#include <Ice/Endpoint.h>
#include <Ice/Outgoing.h>
#include <Ice/LocalException.h>
+#include <Ice/Protocol.h>
#include <Ice/Functional.h>
#include <sstream>
@@ -49,9 +50,9 @@ void
IceInternal::Emitter::prepareRequest(Outgoing* out)
{
Stream* os = out->os();
- os->write(Byte(0)); // Protocol version
- os->write(Byte(0)); // Encoding version
- os->write(Byte(0)); // Message type = Request
+ os->write(protocolVersion);
+ os->write(encodingVersion);
+ os->write(requestMsg);
os->write(Int(0)); // Message size (placeholder)
os->write(Int(0)); // Request ID (placeholder)
}
@@ -65,9 +66,8 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway)
{
_exception->raise();
}
-
assert(_state == StateActive);
-
+
Int requestId;
try
@@ -85,12 +85,13 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway)
if (!_endpoint->oneway() && !oneway)
{
requestId = _nextRequestId++;
- if (requestId == 0) // 0 means oneway
+ if (requestId <= 0)
{
+ _nextRequestId = 1;
requestId = _nextRequestId++;
}
p = reinterpret_cast<Byte*>(&requestId);
- copy(p, p + sizeof(Int), os->i + 7);
+ copy(p, p + sizeof(Int), os->i + headerSize);
}
traceRequest("sending request", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
@@ -100,10 +101,10 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway)
setState(StateClosed, ex);
ex.raise();
}
-
+
//
// Only add to the request map if there was no exception, and if
- // the operation is twoway.
+ // the operation is not oneway.
//
if (!_endpoint->oneway() && !oneway)
{
@@ -111,6 +112,104 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway)
}
}
+void
+IceInternal::Emitter::prepareBatchRequest(Outgoing* out)
+{
+ lock();
+
+ if (_exception.get())
+ {
+ unlock();
+ _exception->raise();
+ }
+ assert(_state == StateActive);
+
+ //
+ // The Emitter now belongs to `out', until finishBatchRequest() is
+ // called.
+ //
+
+ if (_batchStream.b.empty())
+ {
+ _batchStream.write(protocolVersion);
+ _batchStream.write(encodingVersion);
+ _batchStream.write(requestBatchMsg);
+ _batchStream.write(Int(0)); // Message size (placeholder)
+ }
+
+ //
+ // Give the batch stream to `out', until finishBatchRequest() is
+ // called.
+ //
+ _batchStream.swap(*out->os());
+}
+
+void
+IceInternal::Emitter::finishBatchRequest(Outgoing* out)
+{
+ if (_exception.get())
+ {
+ unlock();
+ _exception->raise();
+ }
+ assert(_state == StateActive);
+
+ _batchStream.swap(*out->os()); // Get the batch stream back
+ unlock(); // Give the Emitter back
+}
+
+void
+IceInternal::Emitter::abortBatchRequest()
+{
+ setState(StateClosed, AbortBatchRequestException(__FILE__, __LINE__));
+ unlock(); // Give the Emitter back
+}
+
+void
+IceInternal::Emitter::flushBatchRequest()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (_exception.get())
+ {
+ _exception->raise();
+ }
+ assert(_state == StateActive);
+
+ try
+ {
+ if(_batchStream.b.empty())
+ {
+ return; // Nothing to send
+ }
+
+ _batchStream.i = _batchStream.b.begin();
+
+ //
+ // Fill in the message size
+ //
+ const Byte* p;
+ Int sz = _batchStream.b.size();
+ p = reinterpret_cast<Byte*>(&sz);
+ copy(p, p + sizeof(Int), _batchStream.i + 3);
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver->write(_batchStream, _endpoint->timeout());
+
+ //
+ // Reset _batchStream and _batchRequestId, so that new batch
+ // messages can be sent.
+ //
+ Stream dummy(_instance);
+ _batchStream.swap(dummy);
+ assert(_batchStream.b.empty());
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.raise();
+ }
+}
+
int
IceInternal::Emitter::timeout() const
{
@@ -153,11 +252,11 @@ IceInternal::Emitter::message(Stream& stream)
stream.i = stream.b.begin() + 2;
Byte messageType;
stream.read(messageType);
- stream.i = stream.b.begin() + 7;
+ stream.i = stream.b.begin() + headerSize;
switch (messageType)
{
- case 0: // Request
+ case requestMsg:
{
traceRequest("received request on the client side\n"
"(invalid, closing connection)",
@@ -166,7 +265,16 @@ IceInternal::Emitter::message(Stream& stream)
break;
}
- case 1: // Reply
+ case requestBatchMsg:
+ {
+ traceRequest("received request batch on the client side\n"
+ "(invalid, closing connection)",
+ stream, _logger, _traceLevels);
+ throw InvalidMessageException(__FILE__, __LINE__);
+ break;
+ }
+
+ case replyMsg:
{
traceReply("received reply", stream, _logger, _traceLevels);
Int requestId;
@@ -181,7 +289,7 @@ IceInternal::Emitter::message(Stream& stream)
break;
}
- case 2: // CloseConnection
+ case closeConnectionMsg:
{
traceHeader("received close connection",
stream, _logger, _traceLevels);
@@ -227,6 +335,7 @@ IceInternal::Emitter::Emitter(const InstancePtr& instance,
_transceiver(transceiver),
_endpoint(endpoint),
_nextRequestId(1),
+ _batchStream(instance),
_state(StateActive)
{
#ifndef ICE_NO_TRACE