diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Glacier2/Blobject.cpp | 164 | ||||
-rw-r--r-- | cpp/src/Glacier2/Blobject.h | 5 | ||||
-rw-r--r-- | cpp/src/Glacier2/ClientBlobject.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.cpp | 151 | ||||
-rw-r--r-- | cpp/src/Glacier2/RequestQueue.h | 23 | ||||
-rw-r--r-- | cpp/src/Glacier2/SessionRouterI.cpp | 16 | ||||
-rw-r--r-- | cpp/src/Glacier2/SessionRouterI.h | 2 |
7 files changed, 220 insertions, 143 deletions
diff --git a/cpp/src/Glacier2/Blobject.cpp b/cpp/src/Glacier2/Blobject.cpp index e845444f1e4..9633e7903c2 100644 --- a/cpp/src/Glacier2/Blobject.cpp +++ b/cpp/src/Glacier2/Blobject.cpp @@ -13,24 +13,44 @@ using namespace std; using namespace Ice; using namespace Glacier2; +static const string serverForwardContext = "Glacier2.Server.ForwardContext"; +static const string clientForwardContext = "Glacier2.Client.ForwardContext"; +static const string serverUnbuffered = "Glacier2.Server.Unbuffered"; +static const string clientUnbuffered = "Glacier2.Client.Unbuffered"; static const string serverAlwaysBatch = "Glacier2.Server.AlwaysBatch"; static const string clientAlwaysBatch = "Glacier2.Client.AlwaysBatch"; static const string serverThreadStackSize = "Glacier2.Server.ThreadStackSize"; static const string clientThreadStackSize = "Glacier2.Client.ThreadStackSize"; +static const string serverTraceRequest = "Glacier2.Server.Trace.Request"; +static const string clientTraceRequest = "Glacier2.Client.Trace.Request"; Glacier2::Blobject::Blobject(const CommunicatorPtr& communicator, bool reverse) : - _properties(communicator->getProperties()), - _logger(communicator->getLogger()), - _alwaysBatch(reverse ? + _communicator(communicator), + _properties(_communicator->getProperties()), + _logger(_communicator->getLogger()), + _reverse(reverse), + _forwardContext(_reverse ? + _properties->getPropertyAsInt(serverForwardContext) > 0 : + _properties->getPropertyAsInt(clientForwardContext) > 0), + _unbuffered(_reverse ? + _properties->getPropertyAsInt(serverUnbuffered) > 0 : + _properties->getPropertyAsInt(clientUnbuffered) > 0), + _alwaysBatch(_reverse ? _properties->getPropertyAsInt(serverAlwaysBatch) > 0 : - _properties->getPropertyAsInt(clientAlwaysBatch) > 0) + _properties->getPropertyAsInt(clientAlwaysBatch) > 0), + _requestTraceLevel(_reverse ? + _properties->getPropertyAsInt(serverTraceRequest) : + _properties->getPropertyAsInt(clientTraceRequest)) { - Int threadStackSize = reverse ? - _properties->getPropertyAsInt(serverThreadStackSize) : - _properties->getPropertyAsInt(clientThreadStackSize); - - _requestQueue = new RequestQueue(communicator, reverse); - _requestQueue->start(static_cast<size_t>(threadStackSize)); + if(!_unbuffered) + { + Int threadStackSize = _reverse ? + _properties->getPropertyAsInt(serverThreadStackSize) : + _properties->getPropertyAsInt(clientThreadStackSize); + + _requestQueue = new RequestQueue(_communicator, _reverse); + _requestQueue->start(static_cast<size_t>(threadStackSize)); + } } Glacier2::Blobject::~Blobject() @@ -41,43 +61,17 @@ Glacier2::Blobject::~Blobject() void Glacier2::Blobject::destroy() { - assert(_requestQueue); // Destroyed? - _requestQueue->destroy(); - _requestQueue = 0; -} - -class Glacier2CB : public AMI_Object_ice_invoke -{ -public: - - Glacier2CB(const AMD_Object_ice_invokePtr& cb) : - _cb(cb) + if(_requestQueue) { + _requestQueue->destroy(); + _requestQueue = 0; } - - virtual void - ice_response(bool ok, const vector<Byte>& outParams) - { - _cb->ice_response(ok, outParams); - } - - virtual void - ice_exception(const Exception& ex) - { - _cb->ice_exception(ex); - } - -private: - - AMD_Object_ice_invokePtr _cb; -}; +} void Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amdCB, const vector<Byte>& inParams, - const Current& current) + const Current& current) { - assert(_requestQueue); // Destroyed? - // // Set the correct facet on the proxy. // @@ -105,7 +99,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd case 'o': { - if(_alwaysBatch) + if(_alwaysBatch && !_unbuffered) { proxy = proxy->ice_batchOneway(); } @@ -118,7 +112,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd case 'd': { - if(_alwaysBatch) + if(_alwaysBatch && !_unbuffered) { proxy = proxy->ice_batchDatagram(); } @@ -131,13 +125,27 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd case 'O': { - proxy = proxy->ice_batchOneway(); + if(!_unbuffered) + { + proxy = proxy->ice_batchOneway(); + } + else + { + proxy = proxy->ice_oneway(); + } break; } case 'D': { - proxy = proxy->ice_batchDatagram(); + if(!_unbuffered) + { + proxy = proxy->ice_batchDatagram(); + } + else + { + proxy = proxy->ice_datagram(); + } break; } @@ -163,18 +171,64 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd } } - // - // Create a new request and add it to the request queue. - // - if(proxy->ice_isTwoway()) + if(_unbuffered) { - AMI_Object_ice_invokePtr amiCB = new Glacier2CB(amdCB); - _requestQueue->addRequest(new Request(proxy, inParams, current, amiCB)); + // + // If we are in unbuffered mode, we sent the request directly. + // + + if(_requestTraceLevel >= 1) + { + Trace out(_logger, "Glacier2"); + if(_reverse) + { + out << "reverse "; + } + out << "routing (unbuffered)"; + out << "\nproxy = " << _communicator->proxyToString(proxy); + out << "\noperation = " << current.operation; + out << "\ncontext = "; + Context::const_iterator q = current.ctx.begin(); + while(q != current.ctx.end()) + { + out << q->first << '/' << q->second; + if(++q != current.ctx.end()) + { + out << ", "; + } + } + } + + bool ok; + vector<Byte> outParams; + + try + { + if(_forwardContext) + { + ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams, current.ctx); + } + else + { + ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams); + } + } + catch(const LocalException& ex) + { + amdCB->ice_exception(ex); + return; + } + + amdCB->ice_response(ok, outParams); } else - { - vector<Byte> dummy; - amdCB->ice_response(true, dummy); - _requestQueue->addRequest(new Request(proxy, inParams, current, 0)); + { + // + // If we are not in unbuffered mode, we create a new request + // and add it to the request queue. If the request is twoway, + // we use AMI. + // + + _requestQueue->addRequest(new Request(proxy, inParams, current, _forwardContext, amdCB)); } } diff --git a/cpp/src/Glacier2/Blobject.h b/cpp/src/Glacier2/Blobject.h index 5895b2fad06..004cfaf1be5 100644 --- a/cpp/src/Glacier2/Blobject.h +++ b/cpp/src/Glacier2/Blobject.h @@ -30,12 +30,17 @@ protected: void invoke(Ice::ObjectPrx&, const Ice::AMD_Object_ice_invokePtr&, const std::vector<Ice::Byte>&, const Ice::Current&); + const Ice::CommunicatorPtr _communicator; const Ice::PropertiesPtr _properties; const Ice::LoggerPtr _logger; private: + const bool _reverse; + const bool _forwardContext; + const bool _unbuffered; const bool _alwaysBatch; + const int _requestTraceLevel; RequestQueuePtr _requestQueue; }; diff --git a/cpp/src/Glacier2/ClientBlobject.cpp b/cpp/src/Glacier2/ClientBlobject.cpp index 30fe28b30f1..67c4a75a563 100644 --- a/cpp/src/Glacier2/ClientBlobject.cpp +++ b/cpp/src/Glacier2/ClientBlobject.cpp @@ -67,7 +67,7 @@ Glacier2::ClientBlobject::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& { if(_rejectTraceLevel >= 1) { - Trace out(current.adapter->getCommunicator()->getLogger(), "Glacier2"); + Trace out(_logger, "Glacier2"); out << "rejecting request\n"; out << "identity: " << identityToString(current.id); } diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index f7912fa3414..1b2dceac9e2 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -14,53 +14,84 @@ using namespace std; using namespace Ice; using namespace Glacier2; +class Glacier2CB : public AMI_Object_ice_invoke +{ +public: + + Glacier2CB(const AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + virtual void + ice_response(bool ok, const vector<Byte>& outParams) + { + _cb->ice_response(ok, outParams); + } + + virtual void + ice_exception(const Exception& ex) + { + _cb->ice_exception(ex); + } + +private: + + AMD_Object_ice_invokePtr _cb; +}; + Glacier2::Request::Request(const ObjectPrx& proxy, const vector<Byte>& inParams, const Current& current, - const AMI_Object_ice_invokePtr& amiCB) : + bool forwardContext, const AMD_Object_ice_invokePtr& amdCB) : _proxy(proxy), _inParams(inParams), _current(current), - _amiCB(amiCB) + _forwardContext(forwardContext), + _amiCB(new Glacier2CB(amdCB)) { Context::const_iterator p = current.ctx.find("_ovrd"); if(p != current.ctx.end()) { - _override = p->second; + const_cast<string&>(_override) = p->second; } } void -Glacier2::Request::invoke(bool forwardContext) +Glacier2::Request::invoke() { if(_proxy->ice_isTwoway()) { - assert(_amiCB); - try + if(_forwardContext) { - if(forwardContext) - { - _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx); - } - else - { - _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams); - } + _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx); } - catch(const Ice::Exception& ex) + else { - _amiCB->ice_exception(ex); + _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams); } } else { - vector<Byte> dummy; - if(forwardContext) + bool ok; + vector<Byte> outParams; + + try { - _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy, _current.ctx); + if(_forwardContext) + { + ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams, _current.ctx); + } + else + { + ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams); + } } - else + catch(const LocalException& ex) { - _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy); + _amiCB->ice_exception(ex); + return; } + + _amiCB->ice_response(ok, outParams); } } @@ -107,30 +138,25 @@ Glacier2::Request::getCurrent() const return _current; } +static const string serverSleepTime = "Glacier2.Server.SleepTime"; +static const string clientSleepTime = "Glacier2.Client.SleepTime"; static const string serverTraceRequest = "Glacier2.Server.Trace.Request"; static const string clientTraceRequest = "Glacier2.Client.Trace.Request"; static const string serverTraceOverride = "Glacier2.Server.Trace.Override"; static const string clientTraceOverride = "Glacier2.Client.Trace.Override"; -static const string serverForwardContext = "Glacier2.Server.ForwardContext"; -static const string clientForwardContext = "Glacier2.Client.ForwardContext"; -static const string serverSleepTime = "Glacier2.Server.SleepTime"; -static const string clientSleepTime = "Glacier2.Client.SleepTime"; Glacier2::RequestQueue::RequestQueue(const Ice::CommunicatorPtr& communicator, bool reverse) : _logger(communicator->getLogger()), _reverse(reverse), - _traceLevelRequest(reverse ? + _sleepTime(reverse ? + IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) : + IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))), + _requestTraceLevel(_reverse ? communicator->getProperties()->getPropertyAsInt(serverTraceRequest) : communicator->getProperties()->getPropertyAsInt(clientTraceRequest)), - _traceLevelOverride(reverse ? + _overrideTraceLevel(reverse ? communicator->getProperties()->getPropertyAsInt(serverTraceOverride) : communicator->getProperties()->getPropertyAsInt(clientTraceOverride)), - _forwardContext(reverse ? - communicator->getProperties()->getPropertyAsInt(serverForwardContext) > 0 : - communicator->getProperties()->getPropertyAsInt(clientForwardContext) > 0), - _sleepTime(reverse ? - IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) : - IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))), _destroy(false) { } @@ -179,7 +205,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request) { *p = request; // Replace old request if this is an override. - if(_traceLevelOverride >= 1) + if(_overrideTraceLevel >= 1) { traceRequest(request, "override"); } @@ -225,43 +251,36 @@ Glacier2::RequestQueue::run() // while this is being done. // - try - { - set<ConnectionPtr> flushSet; - - for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p) + set<ConnectionPtr> flushSet; + + for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p) + { + const ObjectPrx& proxy = (*p)->getProxy(); + + if(proxy->ice_batchOneway() || proxy->ice_batchDatagram()) { - const ObjectPrx& proxy = (*p)->getProxy(); - - if(proxy->ice_batchOneway() || proxy->ice_batchDatagram()) - { - flushSet.insert(proxy->ice_connection()); - } - - if(_traceLevelRequest >= 1) - { - traceRequest(*p, ""); - } - - (*p)->invoke(_forwardContext); + flushSet.insert(proxy->ice_connection()); } - - for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests)); + + if(_requestTraceLevel >= 1) + { + traceRequest(*p, ""); + } + + (*p)->invoke(); } - catch(const Ice::Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); - - if(_traceLevelRequest >= 1) + + for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q) + { + try { - Trace out(_logger, "Glacier2"); - if(_reverse) - { - out << "reverse "; - } - out << "routing exception:\n" << ex; + for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests)); } - } + catch(const LocalException& ex) + { + // Ignore. + } + } // // In order to avoid flooding, we add a delay, if so diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h index c274ad5a9ef..512fbba68ba 100644 --- a/cpp/src/Glacier2/RequestQueue.h +++ b/cpp/src/Glacier2/RequestQueue.h @@ -24,22 +24,22 @@ class Request : virtual public IceUtil::Shared { public: - Request(const Ice::ObjectPrx&, const std::vector<Ice::Byte>&, const Ice::Current&, - const Ice::AMI_Object_ice_invokePtr&); + Request(const Ice::ObjectPrx&, const std::vector<Ice::Byte>&, const Ice::Current&, bool, + const Ice::AMD_Object_ice_invokePtr&); - void invoke(bool); + void invoke(); bool override(const RequestPtr&) const; const Ice::ObjectPrx& getProxy() const; const Ice::Current& getCurrent() const; private: - Ice::ObjectPrx _proxy; - std::vector<Ice::Byte> _inParams; - Ice::Current _current; - bool _forwardContext; - Ice::AMI_Object_ice_invokePtr _amiCB; - std::string _override; + const Ice::ObjectPrx _proxy; + const std::vector<Ice::Byte> _inParams; + const Ice::Current _current; + const bool _forwardContext; + const Ice::AMI_Object_ice_invokePtr _amiCB; + const std::string _override; }; class RequestQueue; @@ -63,10 +63,9 @@ private: const Ice::LoggerPtr _logger; const bool _reverse; - const int _traceLevelRequest; - const int _traceLevelOverride; - const bool _forwardContext; const IceUtil::Time _sleepTime; + const int _requestTraceLevel; + const int _overrideTraceLevel; std::vector<RequestPtr> _requests; diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp index adf196fd9c3..5e344c94510 100644 --- a/cpp/src/Glacier2/SessionRouterI.cpp +++ b/cpp/src/Glacier2/SessionRouterI.cpp @@ -82,7 +82,7 @@ Glacier2::SessionRouterI::SessionRouterI(const ObjectAdapterPtr& clientAdapter, const SessionManagerPrx& sessionManager) : _properties(clientAdapter->getCommunicator()->getProperties()), _logger(clientAdapter->getCommunicator()->getLogger()), - _traceLevel(_properties->getPropertyAsInt("Glacier2.Trace.Session")), + _sessionTraceLevel(_properties->getPropertyAsInt("Glacier2.Trace.Session")), _clientAdapter(clientAdapter), _serverAdapter(serverAdapter), _verifier(verifier), @@ -173,7 +173,7 @@ Glacier2::SessionRouterI::destroy() } catch(const Exception& ex) { - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "exception while destroying session\n" << ex; @@ -304,7 +304,7 @@ Glacier2::SessionRouterI::createSession(const std::string& userId, const std::st } catch(const Exception& ex) { - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "exception while verifying password\n" << ex; @@ -419,7 +419,7 @@ Glacier2::SessionRouterI::createSession(const std::string& userId, const std::st _routersByCategoryHint, pair<const string, RouterIPtr>(category, router)); } - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "created session\n"; @@ -480,7 +480,7 @@ Glacier2::SessionRouterI::destroySession(const Current& current) // try { - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "destroying session\n"; @@ -491,7 +491,7 @@ Glacier2::SessionRouterI::destroySession(const Current& current) } catch(const Exception& ex) { - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "exception while destroying session\n" << ex; @@ -612,7 +612,7 @@ Glacier2::SessionRouterI::expireSessions() try { - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "expiring session\n"; @@ -623,7 +623,7 @@ Glacier2::SessionRouterI::expireSessions() } catch(const Exception& ex) { - if(_traceLevel >= 1) + if(_sessionTraceLevel >= 1) { Trace out(_logger, "Glacier2"); out << "exception while expiring session\n" << ex; diff --git a/cpp/src/Glacier2/SessionRouterI.h b/cpp/src/Glacier2/SessionRouterI.h index daf985cdbe4..f38d711736d 100644 --- a/cpp/src/Glacier2/SessionRouterI.h +++ b/cpp/src/Glacier2/SessionRouterI.h @@ -50,7 +50,7 @@ private: const Ice::PropertiesPtr _properties; const Ice::LoggerPtr _logger; - const int _traceLevel; + const int _sessionTraceLevel; const Ice::ObjectAdapterPtr _clientAdapter; const Ice::ObjectAdapterPtr _serverAdapter; const PermissionsVerifierPrx _verifier; |