summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CollocatedRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/CollocatedRequestHandler.cpp
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp694
1 files changed, 694 insertions, 0 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
new file mode 100644
index 00000000000..6db9bae4daf
--- /dev/null
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -0,0 +1,694 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+#include <Ice/CollocatedRequestHandler.h>
+#include <Ice/ObjectAdapterI.h>
+#include <Ice/ThreadPool.h>
+#include <Ice/Reference.h>
+#include <Ice/Instance.h>
+#include <Ice/TraceLevels.h>
+
+#include <Ice/TraceUtil.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+namespace
+{
+
+class InvokeAll : public DispatchWorkItem
+{
+public:
+
+ InvokeAll(OutgoingMessageCallback* out,
+ BasicStream* os,
+ CollocatedRequestHandler* handler,
+ Int requestId,
+ Int invokeNum,
+ bool batch) :
+ _out(out), _os(os), _handler(handler), _requestId(requestId), _invokeNum(invokeNum), _batch(batch)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ if(_handler->sent(_out))
+ {
+ _handler->invokeAll(_os, _requestId, _invokeNum, _batch);
+ }
+ }
+
+private:
+
+ OutgoingMessageCallback* _out;
+ BasicStream* _os;
+ CollocatedRequestHandlerPtr _handler;
+ Int _requestId;
+ Int _invokeNum;
+ bool _batch;
+};
+
+class InvokeAllAsync : public DispatchWorkItem
+{
+public:
+
+ InvokeAllAsync(const OutgoingAsyncMessageCallbackPtr& outAsync,
+ BasicStream* os,
+ CollocatedRequestHandler* handler,
+ Int requestId,
+ Int invokeNum,
+ bool batch) :
+ _outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _invokeNum(invokeNum), _batch(batch)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ if(_handler->sentAsync(_outAsync.get()))
+ {
+ _handler->invokeAll(_os, _requestId, _invokeNum, _batch);
+ }
+ }
+
+private:
+
+ OutgoingAsyncMessageCallbackPtr _outAsync;
+ BasicStream* _os;
+ CollocatedRequestHandlerPtr _handler;
+ Int _requestId;
+ Int _invokeNum;
+ bool _batch;
+};
+
+class InvokeBatchRequests : public DispatchWorkItem
+{
+public:
+
+ InvokeBatchRequests(const CollocatedRequestHandlerPtr& handler,
+ BasicStream& stream,
+ Int invokeNum) :
+ _handler(handler),
+ _stream(stream.instance(), currentProtocolEncoding),
+ _invokeNum(invokeNum)
+ {
+ _stream.swap(stream);
+ }
+
+ virtual void
+ run()
+ {
+ _handler->invokeAll(&_stream, 0, _invokeNum, true);
+ }
+
+private:
+
+ const CollocatedRequestHandlerPtr _handler;
+ const OutgoingAsyncMessageCallbackPtr _outAsync;
+ BasicStream _stream;
+ Int _invokeNum;
+};
+
+void
+fillInValue(BasicStream* os, int pos, Int value)
+{
+ const Byte* p = reinterpret_cast<const Byte*>(&value);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + pos);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + pos);
+#endif
+}
+
+}
+
+CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) :
+ RequestHandler(ref),
+ _adapter(ObjectAdapterIPtr::dynamicCast(adapter)),
+ _logger(_reference->getInstance()->initializationData().logger), // Cached for better performance.
+ _traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance.
+ _batchAutoFlush(
+ ref->getInstance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0),
+ _requestId(0),
+ _batchStreamInUse(false),
+ _batchRequestNum(0),
+ _batchStream(ref->getInstance().get(), currentProtocolEncoding, _batchAutoFlush)
+{
+}
+
+CollocatedRequestHandler::~CollocatedRequestHandler()
+{
+}
+
+void
+CollocatedRequestHandler::prepareBatchRequest(BasicStream* os)
+{
+ Lock sync(*this);
+ while(_batchStreamInUse)
+ {
+ wait();
+ }
+
+ if(_batchStream.b.empty())
+ {
+ try
+ {
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ }
+ catch(const LocalException& ex)
+ {
+ ex.ice_throw();
+ }
+ }
+
+ _batchStreamInUse = true;
+ _batchMarker = _batchStream.b.size();
+ _batchStream.swap(*os);
+}
+
+void
+CollocatedRequestHandler::finishBatchRequest(BasicStream* os)
+{
+ try
+ {
+ Lock sync(*this);
+ _batchStream.swap(*os);
+
+ if(_batchAutoFlush && (_batchStream.b.size() > _reference->getInstance()->messageSizeMax()))
+ {
+ //
+ // Temporarily save the last request.
+ //
+ vector<Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
+ _batchStream.b.resize(_batchMarker);
+
+ _adapter->getThreadPool()->execute(new InvokeBatchRequests(this, _batchStream, _batchRequestNum));
+
+ //
+ // Reset the batch.
+ //
+ BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding, _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+
+ //
+ // Check again if the last request doesn't exceed what we can send with the auto flush
+ //
+ if(sizeof(requestBatchHdr) + lastRequest.size() > _reference->getInstance()->messageSizeMax())
+ {
+ Ex::throwMemoryLimitException(__FILE__, __LINE__, sizeof(requestBatchHdr) + lastRequest.size(),
+ _reference->getInstance()->messageSizeMax());
+ }
+
+ //
+ // Start a new batch with the last message that caused us to go over the limit.
+ //
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
+ }
+
+ //
+ // Increment the number of requests in the batch.
+ //
+ assert(_batchStreamInUse);
+ ++_batchRequestNum;
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ catch(const LocalException&)
+ {
+ abortBatchRequest();
+ throw;
+ }
+}
+
+void
+CollocatedRequestHandler::abortBatchRequest()
+{
+ Lock sync(*this);
+
+ BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding, _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+}
+
+bool
+CollocatedRequestHandler::sendRequest(OutgoingMessageCallback* out)
+{
+ out->invokeCollocated(this);
+ return !_response && _reference->getInvocationTimeout() == 0;
+}
+
+AsyncStatus
+CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& outAsync)
+{
+ return outAsync->__invokeCollocated(this);
+}
+
+void
+CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
+{
+ Lock sync(*this);
+
+ map<OutgoingMessageCallback*, Int>::iterator p = _sendRequests.find(out);
+ if(p != _sendRequests.end())
+ {
+ if(p->second > 0)
+ {
+ _requests.erase(p->second);
+ }
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ out->finished(ex, false);
+ _sendRequests.erase(p);
+ }
+ else
+ {
+ Outgoing* o = dynamic_cast<Outgoing*>(out);
+ if(o)
+ {
+ for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q)
+ {
+ if(q->second == o)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->finished(ex, true);
+ _requests.erase(q);
+ return; // We're done.
+ }
+ }
+ }
+ }
+}
+
+void
+CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+{
+ OutgoingAsyncMessageCallbackPtr out;
+ bool sent;
+ {
+ Lock sync(*this);
+
+ map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
+ if(p != _sendAsyncRequests.end())
+ {
+ if(p->second > 0)
+ {
+ _asyncRequests.erase(p->second);
+ }
+ out = p->first;
+ sent = false;
+ _sendAsyncRequests.erase(p);
+ }
+ else
+ {
+ OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
+ if(o)
+ {
+ for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ {
+ if(q->second.get() == o.get())
+ {
+ out = o;
+ sent = true;
+ _asyncRequests.erase(q);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if(out)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ out->__finished(ex, sent);
+ }
+}
+
+void
+CollocatedRequestHandler::invokeRequest(Outgoing* out)
+{
+ int requestId = 0;
+ if(_reference->getInvocationTimeout() > 0 || _response)
+ {
+ Lock sync(*this);
+ if(_response)
+ {
+ requestId = ++_requestId;
+ _requests.insert(make_pair(requestId, out));
+ }
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ _sendRequests.insert(make_pair(out, requestId));
+ }
+ }
+
+ out->attachCollocatedObserver(requestId);
+
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ _adapter->getThreadPool()->execute(new InvokeAll(out, out->os(), this, requestId, 1, false));
+ }
+ else
+ {
+ out->sent();
+ invokeAll(out->os(), requestId, 1, false); // Invoke from the user thread.
+ }
+}
+
+AsyncStatus
+CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
+{
+ int requestId = 0;
+ if(_reference->getInvocationTimeout() > 0 || _response)
+ {
+ Lock sync(*this);
+ if(_response)
+ {
+ requestId = ++_requestId;
+ _asyncRequests.insert(make_pair(requestId, outAsync));
+ }
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ _sendAsyncRequests.insert(make_pair(outAsync, requestId));
+ }
+ }
+
+ outAsync->__attachCollocatedObserver(requestId);
+
+ _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false));
+ return AsyncStatusQueued;
+}
+
+void
+CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
+{
+ Int invokeNum;
+ {
+ Lock sync(*this);
+ while(_batchStreamInUse)
+ {
+ wait();
+ }
+
+ invokeNum = _batchRequestNum;
+
+ if(_batchRequestNum > 0)
+ {
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ _sendRequests.insert(make_pair(out, 0));
+ }
+
+ assert(!_batchStream.b.empty());
+ _batchStream.swap(*out->os());
+
+ //
+ // Reset the batch stream.
+ //
+ BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding, _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+ }
+ }
+
+ out->attachCollocatedObserver(0);
+
+ if(invokeNum > 0)
+ {
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ _adapter->getThreadPool()->execute(new InvokeAll(out, out->os(), this, 0, invokeNum, true));
+ }
+ else
+ {
+ out->sent();
+ invokeAll(out->os(), 0, invokeNum, true); // Invoke from the user thread.
+ }
+ }
+ else
+ {
+ out->sent();
+ }
+}
+
+AsyncStatus
+CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
+{
+ Int invokeNum;
+ {
+ Lock sync(*this);
+ while(_batchStreamInUse)
+ {
+ wait();
+ }
+
+ invokeNum = _batchRequestNum;
+ if(_batchRequestNum > 0)
+ {
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ _sendAsyncRequests.insert(make_pair(outAsync, 0));
+ }
+
+ assert(!_batchStream.b.empty());
+ _batchStream.swap(*outAsync->__getOs());
+
+ //
+ // Reset the batch stream.
+ //
+ BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding, _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+ }
+ }
+
+ outAsync->__attachCollocatedObserver(0);
+
+ if(invokeNum > 0)
+ {
+ _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum, true));
+ return AsyncStatusQueued;
+ }
+ else if(outAsync->__sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+}
+
+void
+CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
+{
+ OutgoingAsyncPtr outAsync;
+ {
+ Lock sync(*this);
+ assert(_response);
+
+ os->i = os->b.begin() + sizeof(replyHdr) + 4;
+
+ if(_traceLevels->protocol >= 1)
+ {
+ fillInValue(os, 10, static_cast<Int>(os->b.size()));
+ traceRecv(*os, _logger, _traceLevels);
+ }
+
+ map<int, Outgoing*>::iterator p = _requests.find(requestId);
+ if(p != _requests.end())
+ {
+ p->second->finished(*os);
+ _requests.erase(p);
+ }
+ else
+ {
+ map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
+ if(q != _asyncRequests.end())
+ {
+ os->swap(*q->second->__getIs());
+ outAsync = q->second;
+ _asyncRequests.erase(q);
+ }
+ }
+ }
+
+ if(outAsync)
+ {
+ outAsync->__finished();
+ }
+ _adapter->decDirectCount();
+}
+
+void
+CollocatedRequestHandler::sendNoResponse()
+{
+ _adapter->decDirectCount();
+}
+
+void
+CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum)
+{
+ if(requestId > 0)
+ {
+ Lock sync(*this);
+ _requests.erase(requestId);
+ _asyncRequests.erase(requestId);
+ }
+ _adapter->decDirectCount();
+}
+
+ConnectionIPtr
+CollocatedRequestHandler::getConnection(bool /*wait*/)
+{
+ return 0;
+}
+
+bool
+CollocatedRequestHandler::sent(OutgoingMessageCallback* out)
+{
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ Lock sync(*this);
+ if(_sendRequests.erase(out) == 0)
+ {
+ return false; // The request timed-out.
+ }
+ out->sent();
+ }
+ else
+ {
+ out->sent();
+ }
+ return true;
+}
+
+bool
+CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync)
+{
+ if(_reference->getInvocationTimeout() > 0)
+ {
+ Lock sync(*this);
+ if(_sendAsyncRequests.erase(outAsync) == 0)
+ {
+ return false; // The request timed-out.
+ }
+ }
+ if(outAsync->__sent())
+ {
+ outAsync->__invokeSent();
+ }
+ return true;
+}
+
+void
+CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNum, bool batch)
+{
+ if(batch)
+ {
+ os->i = os->b.begin() + sizeof(requestBatchHdr);
+ }
+ else
+ {
+ os->i = os->b.begin() + sizeof(requestHdr);
+ }
+
+ if(_traceLevels->protocol >= 1)
+ {
+ fillInValue(os, 10, static_cast<Int>(os->b.size()));
+ if(requestId > 0)
+ {
+ fillInValue(os, headerSize, requestId);
+ }
+ else if(batch)
+ {
+ fillInValue(os, headerSize, invokeNum);
+ }
+ traceSend(*os, _logger, _traceLevels);
+ }
+
+ ServantManagerPtr servantManager = _adapter->getServantManager();
+ try
+ {
+ while(invokeNum > 0)
+ {
+ try
+ {
+ _adapter->incDirectCount();
+ }
+ catch(const ObjectAdapterDeactivatedException& ex)
+ {
+ handleException(requestId, ex, false);
+ return;
+ }
+
+ Incoming in(_reference->getInstance().get(), this, 0, _adapter, _response, 0, requestId);
+ try
+ {
+ in.invoke(servantManager, os);
+ }
+ catch(const SystemException& ex)
+ {
+ handleException(requestId, ex, true);
+ _adapter->decDirectCount();
+ }
+ --invokeNum;
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ invokeException(requestId, ex, invokeNum); // Fatal invocation exception
+ }
+}
+
+void
+CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bool sent)
+{
+ if(requestId == 0)
+ {
+ return; // Ignore exception for oneway messages.
+ }
+
+ OutgoingAsyncPtr outAsync;
+ {
+ Lock sync(*this);
+
+ map<int, Outgoing*>::iterator p = _requests.find(requestId);
+ if(p != _requests.end())
+ {
+ p->second->finished(ex, sent);
+ _requests.erase(p);
+ }
+ else
+ {
+ map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
+ if(q != _asyncRequests.end())
+ {
+ outAsync = q->second;
+ _asyncRequests.erase(q);
+ }
+ }
+ }
+ if(outAsync)
+ {
+ outAsync->__finished(ex, sent);
+ }
+}