diff options
Diffstat (limited to 'cpp/src/Glacier2/Blobject.cpp')
-rw-r--r-- | cpp/src/Glacier2/Blobject.cpp | 169 |
1 files changed, 84 insertions, 85 deletions
diff --git a/cpp/src/Glacier2/Blobject.cpp b/cpp/src/Glacier2/Blobject.cpp index bc5995fb547..6ae716c5cfe 100644 --- a/cpp/src/Glacier2/Blobject.cpp +++ b/cpp/src/Glacier2/Blobject.cpp @@ -15,73 +15,77 @@ using namespace Glacier2; static const string serverForwardContext = "Glacier2.Server.ForwardContext"; static const string clientForwardContext = "Glacier2.Client.ForwardContext"; -static const string serverBuffered = "Glacier2.Server.Buffered"; -static const string clientBuffered = "Glacier2.Client.Buffered"; static const string serverAlwaysBatch = "Glacier2.Server.AlwaysBatch"; static const string clientAlwaysBatch = "Glacier2.Client.AlwaysBatch"; static const string serverTraceRequest = "Glacier2.Server.Trace.Request"; static const string clientTraceRequest = "Glacier2.Client.Trace.Request"; static const string serverTraceOverride = "Glacier2.Server.Trace.Override"; static const string clientTraceOverride = "Glacier2.Client.Trace.Override"; -static const string serverSleepTime = "Glacier2.Server.SleepTime"; -static const string clientSleepTime = "Glacier2.Client.SleepTime"; -Glacier2::Blobject::Blobject(const CommunicatorPtr& communicator, bool reverse, const Ice::Context& sslContext) : - _communicator(communicator), - _properties(_communicator->getProperties()), - _logger(_communicator->getLogger()), +namespace +{ + +class AMI_Array_Object_ice_invokeI : public AMI_Array_Object_ice_invoke +{ +public: + + AMI_Array_Object_ice_invokeI(const AMD_Array_Object_ice_invokePtr& amdCB) : + _amdCB(amdCB) + { + } + + virtual void + ice_response(bool ok, const pair<const Byte*, const Byte*>& outParams) + { + if(_amdCB) + { + _amdCB->ice_response(ok, outParams); + } + } + + virtual void + ice_exception(const Exception& ex) + { + if(_amdCB) + { + _amdCB->ice_exception(ex); + } + } + +private: + + const AMD_Array_Object_ice_invokePtr _amdCB; +}; + +} + +Glacier2::Blobject::Blobject(const InstancePtr& instance, bool reverse, const Ice::Context& sslContext) : + _instance(instance), _reverse(reverse), _forwardContext(_reverse ? - _properties->getPropertyAsInt(serverForwardContext) > 0 : - _properties->getPropertyAsInt(clientForwardContext) > 0), - _buffered(_reverse ? - _properties->getPropertyAsIntWithDefault(serverBuffered, 1) > 0 : - _properties->getPropertyAsIntWithDefault(clientBuffered, 1) > 0), + _instance->properties()->getPropertyAsInt(serverForwardContext) > 0 : + _instance->properties()->getPropertyAsInt(clientForwardContext) > 0), _alwaysBatch(_reverse ? - _properties->getPropertyAsInt(serverAlwaysBatch) > 0 : - _properties->getPropertyAsInt(clientAlwaysBatch) > 0), + _instance->properties()->getPropertyAsInt(serverAlwaysBatch) > 0 : + _instance->properties()->getPropertyAsInt(clientAlwaysBatch) > 0), _requestTraceLevel(_reverse ? - _properties->getPropertyAsInt(serverTraceRequest) : - _properties->getPropertyAsInt(clientTraceRequest)), + _instance->properties()->getPropertyAsInt(serverTraceRequest) : + _instance->properties()->getPropertyAsInt(clientTraceRequest)), _overrideTraceLevel(reverse ? - _properties->getPropertyAsInt(serverTraceOverride) : - _properties->getPropertyAsInt(clientTraceOverride)), + _instance->properties()->getPropertyAsInt(serverTraceOverride) : + _instance->properties()->getPropertyAsInt(clientTraceOverride)), _sslContext(sslContext) { - - if(_buffered) + RequestQueueThreadPtr t = _reverse ? _instance->serverRequestQueueThread() : _instance->clientRequestQueueThread(); + if(t) { - try + if(reverse) { - IceUtil::Time sleepTime = _reverse ? - IceUtil::Time::milliSeconds(_properties->getPropertyAsInt(serverSleepTime)) : - IceUtil::Time::milliSeconds(_properties->getPropertyAsInt(clientSleepTime)); - - const_cast<RequestQueuePtr&>(_requestQueue) = new RequestQueue(sleepTime); - - Int threadStackSize = _properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); - - _requestQueue->start(static_cast<size_t>(threadStackSize)); - - // - // See the comment in Glacier2::RequestQueue::destroy() - // for why we detach the thread. - // - _requestQueue->getThreadControl().detach(); + const_cast<RequestQueuePtr&>(_requestQueue) = new RequestQueue(t); } - catch(const IceUtil::Exception& ex) + else { - { - Error out(_logger); - out << "cannot create thread for request queue:\n" << ex; - } - - if(_requestQueue) - { - _requestQueue->destroy(); - } - - throw; + const_cast<RequestQueuePtr&>(_requestQueue) = new RequestQueue(t); } } } @@ -91,12 +95,6 @@ Glacier2::Blobject::~Blobject() } void -Glacier2::Blobject::destroy() -{ - _requestQueue->destroy(); -} - -void Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePtr& amdCB, const std::pair<const Ice::Byte*, const Ice::Byte*>& inParams, const Current& current) { @@ -114,7 +112,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt // if(current.requestId == 0) { - if(_alwaysBatch && _buffered) + if(_alwaysBatch && _requestQueue) { proxy = proxy->ice_batchOneway(); } @@ -147,7 +145,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt case 'o': { - if(_alwaysBatch && _buffered) + if(_alwaysBatch && _requestQueue) { proxy = proxy->ice_batchOneway(); } @@ -160,7 +158,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt case 'd': { - if(_alwaysBatch && _buffered) + if(_alwaysBatch && _requestQueue) { proxy = proxy->ice_batchDatagram(); } @@ -173,7 +171,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt case 'O': { - if(_buffered) + if(_requestQueue) { proxy = proxy->ice_batchOneway(); } @@ -186,7 +184,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt case 'D': { - if(_buffered) + if(_requestQueue) { proxy = proxy->ice_batchDatagram(); } @@ -211,7 +209,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt default: { - Warning out(_logger); + Warning out(_instance->logger()); out << "unknown forward option `" << option << "'"; break; } @@ -221,13 +219,13 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt if(_requestTraceLevel >= 1) { - Trace out(_logger, "Glacier2"); + Trace out(_instance->logger(), "Glacier2"); if(_reverse) { out << "reverse "; } out << "routing"; - if(_buffered) + if(_requestQueue) { out << " (buffered)"; } @@ -237,11 +235,11 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt } if(_reverse) { - out << "\nidentity = " << _communicator->identityToString(proxy->ice_getIdentity()); + out << "\nidentity = " << _instance->communicator()->identityToString(proxy->ice_getIdentity()); } else { - out << "\nproxy = " << _communicator->proxyToString(proxy); + out << "\nproxy = " << _instance->communicator()->proxyToString(proxy); } out << "\noperation = " << current.operation; out << "\ncontext = "; @@ -256,7 +254,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt } } - if(_buffered) + if(_requestQueue) { // // If we are in buffered mode, we create a new request and add @@ -278,7 +276,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt if(override && _overrideTraceLevel >= 1) { - Trace out(_logger, "Glacier2"); + Trace out(_instance->logger(), "Glacier2"); if(_reverse) { out << "reverse "; @@ -286,11 +284,11 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt out << "routing override"; if(_reverse) { - out << "\nidentity = " << _communicator->identityToString(proxy->ice_getIdentity()); + out << "\nidentity = " << _instance->communicator()->identityToString(proxy->ice_getIdentity()); } else { - out << "\nproxy = " << _communicator->proxyToString(proxy); + out << "\nproxy = " << _instance->communicator()->proxyToString(proxy); } out << "\noperation = " << current.operation; out << "\ncontext = "; @@ -311,48 +309,49 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Array_Object_ice_invokePt // If we are in not in buffered mode, we send the request // directly. // - - bool ok; - ByteSeq outParams; + assert(!proxy->ice_isBatchOneway() && !proxy->ice_isBatchDatagram()); try { + AMI_Array_Object_ice_invokePtr amiCB; + if(proxy->ice_isTwoway()) + { + amiCB = new AMI_Array_Object_ice_invokeI(amdCB); + } + else + { + amiCB = new AMI_Array_Object_ice_invokeI(0); + } + if(_forwardContext) { if(_sslContext.size() > 0) { Ice::Context ctx = current.ctx; ctx.insert(_sslContext.begin(), _sslContext.end()); - ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams, ctx); + proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams); } else { - ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams, current.ctx); + proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams, current.ctx); } } else { if(_sslContext.size() > 0) { - ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams, _sslContext); + proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams, _sslContext); } else { - ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams); + proxy->ice_invoke_async(amiCB, current.operation, current.mode, inParams); } } - pair<const Byte*, const Byte*> outPair; - if(outParams.size() == 0) - { - outPair.first = outPair.second = 0; - } - else + if(!proxy->ice_isTwoway()) { - outPair.first = &outParams[0]; - outPair.second = outPair.first + outParams.size(); + amdCB->ice_response(true, pair<const Byte*, const Byte*>(0, 0)); } - amdCB->ice_response(ok, outPair); } catch(const LocalException& ex) { |