diff options
Diffstat (limited to 'cpp/src/Ice/Emitter.cpp')
-rw-r--r-- | cpp/src/Ice/Emitter.cpp | 135 |
1 files changed, 122 insertions, 13 deletions
diff --git a/cpp/src/Ice/Emitter.cpp b/cpp/src/Ice/Emitter.cpp index 4b69bf41a9d..5a5c4537893 100644 --- a/cpp/src/Ice/Emitter.cpp +++ b/cpp/src/Ice/Emitter.cpp @@ -19,6 +19,7 @@ #include <Ice/Endpoint.h> #include <Ice/Outgoing.h> #include <Ice/LocalException.h> +#include <Ice/Protocol.h> #include <Ice/Functional.h> #include <sstream> @@ -49,9 +50,9 @@ void IceInternal::Emitter::prepareRequest(Outgoing* out) { Stream* os = out->os(); - os->write(Byte(0)); // Protocol version - os->write(Byte(0)); // Encoding version - os->write(Byte(0)); // Message type = Request + os->write(protocolVersion); + os->write(encodingVersion); + os->write(requestMsg); os->write(Int(0)); // Message size (placeholder) os->write(Int(0)); // Request ID (placeholder) } @@ -65,9 +66,8 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway) { _exception->raise(); } - assert(_state == StateActive); - + Int requestId; try @@ -85,12 +85,13 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway) if (!_endpoint->oneway() && !oneway) { requestId = _nextRequestId++; - if (requestId == 0) // 0 means oneway + if (requestId <= 0) { + _nextRequestId = 1; requestId = _nextRequestId++; } p = reinterpret_cast<Byte*>(&requestId); - copy(p, p + sizeof(Int), os->i + 7); + copy(p, p + sizeof(Int), os->i + headerSize); } traceRequest("sending request", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); @@ -100,10 +101,10 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway) setState(StateClosed, ex); ex.raise(); } - + // // Only add to the request map if there was no exception, and if - // the operation is twoway. + // the operation is not oneway. // if (!_endpoint->oneway() && !oneway) { @@ -111,6 +112,104 @@ IceInternal::Emitter::sendRequest(Outgoing* out, bool oneway) } } +void +IceInternal::Emitter::prepareBatchRequest(Outgoing* out) +{ + lock(); + + if (_exception.get()) + { + unlock(); + _exception->raise(); + } + assert(_state == StateActive); + + // + // The Emitter now belongs to `out', until finishBatchRequest() is + // called. + // + + if (_batchStream.b.empty()) + { + _batchStream.write(protocolVersion); + _batchStream.write(encodingVersion); + _batchStream.write(requestBatchMsg); + _batchStream.write(Int(0)); // Message size (placeholder) + } + + // + // Give the batch stream to `out', until finishBatchRequest() is + // called. + // + _batchStream.swap(*out->os()); +} + +void +IceInternal::Emitter::finishBatchRequest(Outgoing* out) +{ + if (_exception.get()) + { + unlock(); + _exception->raise(); + } + assert(_state == StateActive); + + _batchStream.swap(*out->os()); // Get the batch stream back + unlock(); // Give the Emitter back +} + +void +IceInternal::Emitter::abortBatchRequest() +{ + setState(StateClosed, AbortBatchRequestException(__FILE__, __LINE__)); + unlock(); // Give the Emitter back +} + +void +IceInternal::Emitter::flushBatchRequest() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (_exception.get()) + { + _exception->raise(); + } + assert(_state == StateActive); + + try + { + if(_batchStream.b.empty()) + { + return; // Nothing to send + } + + _batchStream.i = _batchStream.b.begin(); + + // + // Fill in the message size + // + const Byte* p; + Int sz = _batchStream.b.size(); + p = reinterpret_cast<Byte*>(&sz); + copy(p, p + sizeof(Int), _batchStream.i + 3); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver->write(_batchStream, _endpoint->timeout()); + + // + // Reset _batchStream and _batchRequestId, so that new batch + // messages can be sent. + // + Stream dummy(_instance); + _batchStream.swap(dummy); + assert(_batchStream.b.empty()); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + ex.raise(); + } +} + int IceInternal::Emitter::timeout() const { @@ -153,11 +252,11 @@ IceInternal::Emitter::message(Stream& stream) stream.i = stream.b.begin() + 2; Byte messageType; stream.read(messageType); - stream.i = stream.b.begin() + 7; + stream.i = stream.b.begin() + headerSize; switch (messageType) { - case 0: // Request + case requestMsg: { traceRequest("received request on the client side\n" "(invalid, closing connection)", @@ -166,7 +265,16 @@ IceInternal::Emitter::message(Stream& stream) break; } - case 1: // Reply + case requestBatchMsg: + { + traceRequest("received request batch on the client side\n" + "(invalid, closing connection)", + stream, _logger, _traceLevels); + throw InvalidMessageException(__FILE__, __LINE__); + break; + } + + case replyMsg: { traceReply("received reply", stream, _logger, _traceLevels); Int requestId; @@ -181,7 +289,7 @@ IceInternal::Emitter::message(Stream& stream) break; } - case 2: // CloseConnection + case closeConnectionMsg: { traceHeader("received close connection", stream, _logger, _traceLevels); @@ -227,6 +335,7 @@ IceInternal::Emitter::Emitter(const InstancePtr& instance, _transceiver(transceiver), _endpoint(endpoint), _nextRequestId(1), + _batchStream(instance), _state(StateActive) { #ifndef ICE_NO_TRACE |