diff options
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 215 |
1 files changed, 188 insertions, 27 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 640dcbef0c0..62901ee72b0 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -23,6 +23,7 @@ #include <Ice/ObjectAdapter.h> #include <Ice/Endpoint.h> #include <Ice/Outgoing.h> +#include <Ice/OutgoingAsync.h> #include <Ice/Incoming.h> #include <Ice/LocalException.h> #include <Ice/Protocol.h> @@ -151,19 +152,15 @@ IceInternal::Connection::decUsageCount() if(_usageCount == 0 && !_adapter) { assert(_requests.empty()); + assert(_asyncRequests.empty()); setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } void -IceInternal::Connection::prepareRequest(Outgoing* out) +IceInternal::Connection::prepareRequest(BasicStream* os) { - BasicStream* os = out->os(); - os->write(protocolVersion); - os->write(encodingVersion); - os->write(requestMsg); - os->write(Int(0)); // Message size (placeholder). - os->write(Int(0)); // Request ID (placeholder). + os->writeBlob(_requestHdr); } void @@ -268,7 +265,101 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway, bool comp) } void -IceInternal::Connection::prepareBatchRequest(Outgoing* out) +IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out, bool comp) +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_exception.get()) + { + _exception->ice_throw(); + } + assert(_state < StateClosing); + + Int requestId; + + try + { + BasicStream* os = out->__os(); + + // + // Fill in the request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + const Byte* p; + p = reinterpret_cast<const Byte*>(&requestId); + copy(p, p + sizeof(Int), os->b.begin() + headerSize); + + if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. + { + comp = false; + } + else + { + if(_defaultsAndOverrides->overrideComppress) + { + comp = _defaultsAndOverrides->overrideComppressValue; + } + } + + if(comp) + { + // + // Change message type. + // + os->b[2] = compressedRequestMsg; + + // + // Do compression. + // + BasicStream cstream(_instance); + compress(*os, cstream); + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending compressed request", *os, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + // + // No compression, just fill in the message size. + // + const Byte* p; + Int sz = os->b.size(); + p = reinterpret_cast<const Byte*>(&sz); + copy(p, p + sizeof(Int), os->b.begin() + 3); + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending request", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); + } + + // + // Only add to the request map if there was no exception. + // + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), make_pair(requestId, out)); +} + +void +IceInternal::Connection::prepareBatchRequest(BasicStream* os) { lock(); @@ -280,27 +371,24 @@ IceInternal::Connection::prepareBatchRequest(Outgoing* out) assert(_state < StateClosing); // - // The Connection now belongs to `out', until finishBatchRequest() - // is called. + // The Connection now belongs to the caller, until + // finishBatchRequest() is called. // if(_batchStream.b.empty()) { - _batchStream.write(protocolVersion); - _batchStream.write(encodingVersion); - _batchStream.write(requestBatchMsg); - _batchStream.write(Int(0)); // Message size (placeholder). + _batchStream.writeBlob(_requestBatchHdr); } // - // Give the batch stream to `out', until finishBatchRequest() is - // called. + // Give the batch stream to the caller, until finishBatchRequest() + // is called. // - _batchStream.swap(*out->os()); + _batchStream.swap(*os); } void -IceInternal::Connection::finishBatchRequest(Outgoing* out) +IceInternal::Connection::finishBatchRequest(BasicStream* os) { if(_exception.get()) { @@ -309,7 +397,7 @@ IceInternal::Connection::finishBatchRequest(Outgoing* out) } assert(_state < StateClosing); - _batchStream.swap(*out->os()); // Get the batch stream back. + _batchStream.swap(*os); // Get the batch stream back. unlock(); // Give the Connection back. } @@ -474,6 +562,7 @@ IceInternal::Connection::read(BasicStream& stream) void IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { + OutgoingAsyncPtr outAsync; bool invoke = false; bool comp = false; bool batch = false; @@ -596,6 +685,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa stream.read(requestId); map<Int, Outgoing*>::iterator p = _requests.end(); + map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end(); if(_requestsHint != _requests.end()) { @@ -607,24 +697,59 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa if(p == _requests.end()) { + if(_asyncRequestsHint != _asyncRequests.end()) + { + if(_asyncRequestsHint->first == requestId) + { + q = _asyncRequestsHint; + } + } + } + + if(p == _requests.end() && q == _asyncRequests.end()) + { p = _requests.find(requestId); } + + if(p == _requests.end() && q == _asyncRequests.end()) + { + q = _asyncRequests.find(requestId); + } - if(p == _requests.end()) + if(p == _requests.end() && q == _asyncRequests.end()) { throw UnknownRequestIdException(__FILE__, __LINE__); } - p->second->finished(stream); - - if(p == _requestsHint) + if(p != _requests.end()) { - _requests.erase(p++); - _requestsHint = p; + p->second->finished(stream); + + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } } else { - _requests.erase(p); + assert(q != _asyncRequests.end()); + + outAsync = q->second; + + if(q == _asyncRequestsHint) + { + _asyncRequests.erase(q++); + _asyncRequestsHint = q; + } + else + { + _asyncRequests.erase(q); + } } break; @@ -679,8 +804,17 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa } // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(outAsync) + { + outAsync->__finished(stream); + } + + // // Method invocation must be done outside the thread - // synchronization, so that nested callbacks are possible. + // synchronization, so that nested calls are possible. // if(invoke) { @@ -855,12 +989,32 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), _nextRequestId(1), _requestsHint(_requests.end()), + _asyncRequestsHint(_asyncRequests.end()), _batchStream(_instance), _responseCount(0), _usageCount(0), _state(StateHolding), _registeredWithPool(false) { + vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr); + requestHdr.reserve(headerSize + 4); + requestHdr.push_back(protocolVersion); + requestHdr.push_back(encodingVersion); + requestHdr.push_back(requestMsg); + requestHdr.push_back(0); // Message size (placeholder). + requestHdr.push_back(0); // Message size (placeholder). + requestHdr.push_back(0); // Message size (placeholder). + requestHdr.push_back(0); // Message size (placeholder). + requestHdr.push_back(0); // Request ID (placeholder). + requestHdr.push_back(0); // Request ID (placeholder). + requestHdr.push_back(0); // Request ID (placeholder). + requestHdr.push_back(0); // Request ID (placeholder). + assert(_requestHdr.size() == headerSize + 4); + + vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr); + requestBatchHdr = _requestHdr; + requestBatchHdr[2] = requestBatchMsg; + assert(_requestBatchHdr.size() == headerSize + 4); } IceInternal::Connection::~Connection() @@ -925,6 +1079,13 @@ IceInternal::Connection::setState(State state, const LocalException& ex) _requests.clear(); _requestsHint = _requests.end(); + for(std::map< ::Ice::Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + { + q->second->__finished(*_exception.get()); + } + _asyncRequests.clear(); + _asyncRequestsHint = _asyncRequests.end(); + setState(state); } |