summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp215
1 files changed, 188 insertions, 27 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 640dcbef0c0..62901ee72b0 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -23,6 +23,7 @@
#include <Ice/ObjectAdapter.h>
#include <Ice/Endpoint.h>
#include <Ice/Outgoing.h>
+#include <Ice/OutgoingAsync.h>
#include <Ice/Incoming.h>
#include <Ice/LocalException.h>
#include <Ice/Protocol.h>
@@ -151,19 +152,15 @@ IceInternal::Connection::decUsageCount()
if(_usageCount == 0 && !_adapter)
{
assert(_requests.empty());
+ assert(_asyncRequests.empty());
setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
void
-IceInternal::Connection::prepareRequest(Outgoing* out)
+IceInternal::Connection::prepareRequest(BasicStream* os)
{
- BasicStream* os = out->os();
- os->write(protocolVersion);
- os->write(encodingVersion);
- os->write(requestMsg);
- os->write(Int(0)); // Message size (placeholder).
- os->write(Int(0)); // Request ID (placeholder).
+ os->writeBlob(_requestHdr);
}
void
@@ -268,7 +265,101 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp)
}
void
-IceInternal::Connection::prepareBatchRequest(Outgoing* out)
+IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp)
+{
+ IceUtil::RecMutex::Lock sync(*this);
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+ assert(_state < StateClosing);
+
+ Int requestId;
+
+ try
+ {
+ BasicStream* os = out->__os();
+
+ //
+ // Fill in the request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+ const Byte* p;
+ p = reinterpret_cast<const Byte*>(&requestId);
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+
+ if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
+ {
+ comp = false;
+ }
+ else
+ {
+ if(_defaultsAndOverrides->overrideComppress)
+ {
+ comp = _defaultsAndOverrides->overrideComppressValue;
+ }
+ }
+
+ if(comp)
+ {
+ //
+ // Change message type.
+ //
+ os->b[2] = compressedRequestMsg;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance);
+ compress(*os, cstream);
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending compressed request", *os, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ //
+ // No compression, just fill in the message size.
+ //
+ const Byte* p;
+ Int sz = os->b.size();
+ p = reinterpret_cast<const Byte*>(&sz);
+ copy(p, p + sizeof(Int), os->b.begin() + 3);
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending request", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // Only add to the request map if there was no exception.
+ //
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), make_pair(requestId, out));
+}
+
+void
+IceInternal::Connection::prepareBatchRequest(BasicStream* os)
{
lock();
@@ -280,27 +371,24 @@ IceInternal::Connection::prepareBatchRequest(Outgoing* out)
assert(_state < StateClosing);
//
- // The Connection now belongs to `out', until finishBatchRequest()
- // is called.
+ // The Connection now belongs to the caller, until
+ // finishBatchRequest() is called.
//
if(_batchStream.b.empty())
{
- _batchStream.write(protocolVersion);
- _batchStream.write(encodingVersion);
- _batchStream.write(requestBatchMsg);
- _batchStream.write(Int(0)); // Message size (placeholder).
+ _batchStream.writeBlob(_requestBatchHdr);
}
//
- // Give the batch stream to `out', until finishBatchRequest() is
- // called.
+ // Give the batch stream to the caller, until finishBatchRequest()
+ // is called.
//
- _batchStream.swap(*out->os());
+ _batchStream.swap(*os);
}
void
-IceInternal::Connection::finishBatchRequest(Outgoing* out)
+IceInternal::Connection::finishBatchRequest(BasicStream* os)
{
if(_exception.get())
{
@@ -309,7 +397,7 @@ IceInternal::Connection::finishBatchRequest(Outgoing* out)
}
assert(_state < StateClosing);
- _batchStream.swap(*out->os()); // Get the batch stream back.
+ _batchStream.swap(*os); // Get the batch stream back.
unlock(); // Give the Connection back.
}
@@ -474,6 +562,7 @@ IceInternal::Connection::read(BasicStream& stream)
void
IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
+ OutgoingAsyncPtr outAsync;
bool invoke = false;
bool comp = false;
bool batch = false;
@@ -596,6 +685,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
stream.read(requestId);
map<Int, Outgoing*>::iterator p = _requests.end();
+ map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end();
if(_requestsHint != _requests.end())
{
@@ -607,24 +697,59 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
if(p == _requests.end())
{
+ if(_asyncRequestsHint != _asyncRequests.end())
+ {
+ if(_asyncRequestsHint->first == requestId)
+ {
+ q = _asyncRequestsHint;
+ }
+ }
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
p = _requests.find(requestId);
}
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ q = _asyncRequests.find(requestId);
+ }
- if(p == _requests.end())
+ if(p == _requests.end() && q == _asyncRequests.end())
{
throw UnknownRequestIdException(__FILE__, __LINE__);
}
- p->second->finished(stream);
-
- if(p == _requestsHint)
+ if(p != _requests.end())
{
- _requests.erase(p++);
- _requestsHint = p;
+ p->second->finished(stream);
+
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
}
else
{
- _requests.erase(p);
+ assert(q != _asyncRequests.end());
+
+ outAsync = q->second;
+
+ if(q == _asyncRequestsHint)
+ {
+ _asyncRequests.erase(q++);
+ _asyncRequestsHint = q;
+ }
+ else
+ {
+ _asyncRequests.erase(q);
+ }
}
break;
@@ -679,8 +804,17 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
}
//
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(outAsync)
+ {
+ outAsync->__finished(stream);
+ }
+
+ //
// Method invocation must be done outside the thread
- // synchronization, so that nested callbacks are possible.
+ // synchronization, so that nested calls are possible.
//
if(invoke)
{
@@ -855,12 +989,32 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0),
_nextRequestId(1),
_requestsHint(_requests.end()),
+ _asyncRequestsHint(_asyncRequests.end()),
_batchStream(_instance),
_responseCount(0),
_usageCount(0),
_state(StateHolding),
_registeredWithPool(false)
{
+ vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr);
+ requestHdr.reserve(headerSize + 4);
+ requestHdr.push_back(protocolVersion);
+ requestHdr.push_back(encodingVersion);
+ requestHdr.push_back(requestMsg);
+ requestHdr.push_back(0); // Message size (placeholder).
+ requestHdr.push_back(0); // Message size (placeholder).
+ requestHdr.push_back(0); // Message size (placeholder).
+ requestHdr.push_back(0); // Message size (placeholder).
+ requestHdr.push_back(0); // Request ID (placeholder).
+ requestHdr.push_back(0); // Request ID (placeholder).
+ requestHdr.push_back(0); // Request ID (placeholder).
+ requestHdr.push_back(0); // Request ID (placeholder).
+ assert(_requestHdr.size() == headerSize + 4);
+
+ vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr);
+ requestBatchHdr = _requestHdr;
+ requestBatchHdr[2] = requestBatchMsg;
+ assert(_requestBatchHdr.size() == headerSize + 4);
}
IceInternal::Connection::~Connection()
@@ -925,6 +1079,13 @@ IceInternal::Connection::setState(State state, const LocalException& ex)
_requests.clear();
_requestsHint = _requests.end();
+ for(std::map< ::Ice::Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ {
+ q->second->__finished(*_exception.get());
+ }
+ _asyncRequests.clear();
+ _asyncRequestsHint = _asyncRequests.end();
+
setState(state);
}