diff options
author | Benoit Foucher <benoit@zeroc.com> | 2016-08-08 14:20:55 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2016-08-08 14:20:55 +0200 |
commit | 6a902b0177960d8bbb412f17f2e5303bf46f1f9a (patch) | |
tree | b50070a76a3fdffece0fdf42c897a3e622609613 /cpp/src | |
parent | Fix build failure (diff) | |
download | ice-6a902b0177960d8bbb412f17f2e5303bf46f1f9a.tar.bz2 ice-6a902b0177960d8bbb412f17f2e5303bf46f1f9a.tar.xz ice-6a902b0177960d8bbb412f17f2e5303bf46f1f9a.zip |
Fixed ICE-7208 - C++98 sync code now depends on async code
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 170 | ||||
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.h | 10 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 94 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.h | 14 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 9 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 271 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 26 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionRequestHandler.cpp | 13 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionRequestHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 693 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 303 | ||||
-rw-r--r-- | cpp/src/Ice/RequestHandler.h | 7 | ||||
-rw-r--r-- | cpp/src/Ice/RetryQueue.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/RetryQueue.h | 1 | ||||
-rw-r--r-- | cpp/src/slice2cpp/Gen.cpp | 135 |
16 files changed, 121 insertions, 1637 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 1c061dc2b9c..ee35365b042 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -14,7 +14,6 @@ #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/TraceLevels.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/TraceUtil.h> @@ -26,41 +25,6 @@ using namespace IceInternal; namespace { -class InvokeAll : public DispatchWorkItem -{ -public: - - InvokeAll(OutgoingBase* out, - OutputStream* os, - CollocatedRequestHandler* handler, - Int requestId, - Int batchRequestNum) : - _out(out), - _os(os), - _handler(ICE_GET_SHARED_FROM_THIS(handler)), - _requestId(requestId), - _batchRequestNum(batchRequestNum) - { - } - - virtual void - run() - { - if(_handler->sent(_out)) - { - _handler->invokeAll(_os, _requestId, _batchRequestNum); - } - } - -private: - - OutgoingBase* _out; - OutputStream* _os; - CollocatedRequestHandlerPtr _handler; - Int _requestId; - Int _batchRequestNum; -}; - class InvokeAllAsync : public DispatchWorkItem { public: @@ -125,13 +89,6 @@ CollocatedRequestHandler::update(const RequestHandlerPtr& previousHandler, const return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS; } -bool -CollocatedRequestHandler::sendRequest(ProxyOutgoingBase* out) -{ - out->invokeCollocated(this); - return !_response && _reference->getInvocationTimeout() == 0; -} - AsyncStatus CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outAsync) { @@ -139,41 +96,6 @@ CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outA } void -CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) -{ - Lock sync(*this); - - map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); - if(p != _sendRequests.end()) - { - if(p->second > 0) - { - _requests.erase(p->second); - } - InvocationTimeoutException ex(__FILE__, __LINE__); - out->completed(ex); - _sendRequests.erase(p); - _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. - return; - } - - Outgoing* o = dynamic_cast<Outgoing*>(out); - if(o) - { - for(map<Int, OutgoingBase*>::iterator q = _requests.begin(); q != _requests.end(); ++q) - { - if(q->second == o) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - q->second->completed(ex); - _requests.erase(q); - return; // We're done. - } - } - } -} - -void CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { Lock sync(*this); @@ -212,46 +134,6 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs } } -void -CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) -{ - // - // Increase the direct count to prevent the thread pool from being destroyed before - // invokeAll is called. This will also throw if the object adapter has been deactivated. - // - _adapter->incDirectCount(); - - int requestId = 0; - { - Lock sync(*this); - if(_response) - { - requestId = ++_requestId; - _requests.insert(make_pair(requestId, out)); - } - - _sendRequests.insert(make_pair(out, requestId)); - } - - out->attachCollocatedObserver(_adapter, requestId); - - if(_reference->getInvocationTimeout() > 0) - { - // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, batchRequestNum)); - } - else if(_dispatcher) - { - _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, - batchRequestNum)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out->sent(); - invokeAll(out->os(), requestId, batchRequestNum); - } -} - AsyncStatus CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous) { @@ -342,24 +224,15 @@ CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bo traceRecv(is, _logger, _traceLevels); } - map<int, OutgoingBase*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) - { - p->second->completed(is); - _requests.erase(p); - } - else + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); + if(q != _asyncRequests.end()) { - map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); - if(q != _asyncRequests.end()) + is.swap(*q->second->getIs()); + if(q->second->response()) { - is.swap(*q->second->getIs()); - if(q->second->response()) - { - outAsync = q->second; - } - _asyncRequests.erase(q); + outAsync = q->second; } + _asyncRequests.erase(q); } } @@ -417,18 +290,6 @@ CollocatedRequestHandler::waitForConnection() } bool -CollocatedRequestHandler::sent(OutgoingBase* out) -{ - Lock sync(*this); - if(_sendRequests.erase(out) == 0) - { - return false; // The request timed-out. - } - out->sent(); - return true; -} - -bool CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { { @@ -522,23 +383,14 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo { Lock sync(*this); - map<int, OutgoingBase*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); + if(q != _asyncRequests.end()) { - p->second->completed(ex); - _requests.erase(p); - } - else - { - map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); - if(q != _asyncRequests.end()) + if(q->second->exception(ex)) { - if(q->second->exception(ex)) - { - outAsync = q->second; - } - _asyncRequests.erase(q); + outAsync = q->second; } + _asyncRequests.erase(q); } } diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index 5dd33d13eb9..00a559c8ea5 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -31,8 +31,6 @@ ICE_DEFINE_PTR(ObjectAdapterIPtr, ObjectAdapterI); namespace IceInternal { -class OutgoingBase; -class Outgoing; class OutgoingAsyncBase; class OutgoingAsync; @@ -47,10 +45,8 @@ public: virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&); - virtual bool sendRequest(ProxyOutgoingBase*); virtual AsyncStatus sendAsyncRequest(const ProxyOutgoingAsyncBasePtr&); - virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual void sendResponse(Ice::Int, Ice::OutputStream*, Ice::Byte, bool); @@ -63,10 +59,8 @@ public: virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); - void invokeRequest(OutgoingBase*, int); AsyncStatus invokeAsyncRequest(OutgoingAsyncBase*, int, bool); - bool sent(OutgoingBase*); bool sentAsync(OutgoingAsyncBase*); void invokeAll(Ice::OutputStream*, Ice::Int, Ice::Int); @@ -88,11 +82,7 @@ private: const TraceLevelsPtr _traceLevels; int _requestId; - - std::map<OutgoingBase*, Ice::Int> _sendRequests; std::map<OutgoingAsyncBasePtr, Ice::Int> _sendAsyncRequests; - - std::map<Ice::Int, OutgoingBase*> _requests; std::map<Ice::Int, OutgoingAsyncBasePtr> _asyncRequests; }; ICE_DEFINE_PTR(CollocatedRequestHandlerPtr, CollocatedRequestHandler); diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 55bf01b0e07..ddb4fe7e748 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -14,7 +14,6 @@ #include <Ice/Proxy.h> #include <Ice/ConnectionI.h> #include <Ice/RouterInfo.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/Protocol.h> #include <Ice/Properties.h> @@ -52,22 +51,6 @@ ConnectRequestHandler::update(const RequestHandlerPtr& previousHandler, const Re return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS; } -bool -ConnectRequestHandler::sendRequest(ProxyOutgoingBase* out) -{ - { - Lock sync(*this); - if(!initialized()) - { - Request req; - req.out = out; - _requests.push_back(req); - return false; // Not sent - } - } - return out->invokeRemote(_connection, _compress, _response) && !_response; // Finished if sent and no response. -} - AsyncStatus ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) { @@ -80,9 +63,7 @@ ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) if(!initialized()) { - Request req; - req.outAsync = out; - _requests.push_back(req); + _requests.push_back(out); return AsyncStatusQueued; } } @@ -90,33 +71,6 @@ ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) } void -ConnectRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) -{ - { - Lock sync(*this); - if(_exception) - { - return; // The request has been notified of a failure already. - } - - if(!initialized()) - { - for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->out == out) - { - out->completed(ex); - _requests.erase(p); - return; - } - } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. - } - } - _connection->requestCanceled(out, ex); -} - -void ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { { @@ -128,9 +82,9 @@ ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync if(!initialized()) { - for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(deque<ProxyOutgoingAsyncBasePtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) { - if(p->outAsync.get() == outAsync.get()) + if(p->get() == outAsync.get()) { _requests.erase(p); if(outAsync->exception(ex)) @@ -239,18 +193,11 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) } - for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + for(deque<ProxyOutgoingAsyncBasePtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - if(p->out) - { - p->out->completed(ex); - } - else + if((*p)->exception(ex)) { - if(p->outAsync->exception(ex)) - { - p->outAsync->invokeExceptionAsync(); - } + (*p)->invokeExceptionAsync(); } } @@ -329,16 +276,12 @@ ConnectRequestHandler::flushRequests() #endif while(!_requests.empty()) // _requests is immutable when _flushing = true { - Request& req = _requests.front(); + ProxyOutgoingAsyncBasePtr& req = _requests.front(); try { - if(req.out) + if(req->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { - req.out->invokeRemote(_connection, _compress, _response); - } - else if(req.outAsync->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) - { - req.outAsync->invokeSentAsync(); + req->invokeSentAsync(); } } catch(const RetryException& ex) @@ -348,26 +291,13 @@ ConnectRequestHandler::flushRequests() // Remove the request handler before retrying. _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS); - if(req.out) - { - req.out->retryException(*exception); - } - else - { - req.outAsync->retryException(*exception); - } + req->retryException(*exception); } catch(const Ice::LocalException& ex) { ICE_SET_EXCEPTION_FROM_CLONE(exception, ex.ice_clone()); - if(req.out) - { - req.out->completed(ex); - } - else if(req.outAsync->exception(ex)) - { - req.outAsync->invokeExceptionAsync(); - } + + req->invokeExceptionAsync(); } _requests.pop_front(); } diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index bb00a3c5b9b..1a250e41de4 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -41,10 +41,8 @@ public: RequestHandlerPtr connect(const Ice::ObjectPrxPtr&); virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&); - virtual bool sendRequest(ProxyOutgoingBase*); virtual AsyncStatus sendAsyncRequest(const ProxyOutgoingAsyncBasePtr&); - virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual Ice::ConnectionIPtr getConnection(); @@ -60,16 +58,6 @@ private: bool initialized(); void flushRequests(); - struct Request - { - Request() : out(0) - { - } - - ProxyOutgoingBase* out; - ProxyOutgoingAsyncBasePtr outAsync; - }; - Ice::ObjectPrxPtr _proxy; std::set<Ice::ObjectPrxPtr> _proxies; @@ -79,7 +67,7 @@ private: bool _initialized; bool _flushing; - std::deque<Request> _requests; + std::deque<ProxyOutgoingAsyncBasePtr> _requests; RequestHandlerPtr _requestHandler; }; diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 92d2d400739..f0c6ed69021 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -63,8 +63,7 @@ public: void waitUntilFinished(); - void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, - const CreateConnectionCallbackPtr&); + void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, const CreateConnectionCallbackPtr&); void setRouterInfo(const RouterInfoPtr&); void removeAdapter(const Ice::ObjectAdapterPtr&); void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&); @@ -72,7 +71,7 @@ public: OutgoingConnectionFactory(const Ice::CommunicatorPtr&, const InstancePtr&); virtual ~OutgoingConnectionFactory(); friend class Instance; - + private: struct ConnectorInfo @@ -224,7 +223,6 @@ public: } #endif - private: friend class Ice::ObjectAdapterI; @@ -254,11 +252,8 @@ private: #endif Ice::ObjectAdapterIPtr _adapter; - const bool _warn; - std::set<Ice::ConnectionIPtr> _connections; - State _state; }; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 7af8fad5996..170c7b95d1e 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -20,7 +20,6 @@ #include <Ice/ACM.h> #include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager(). #include <Ice/EndpointI.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/Incoming.h> #include <Ice/LocalException.h> @@ -229,7 +228,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str) } else if(!str) { - if(out || outAsync) + if(outAsync) { return; // Adopting request stream is not necessary. } @@ -249,8 +248,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str) void Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream) { - assert((out || outAsync)); // Only requests can timeout. - out = 0; + assert(outAsync); // Only requests can timeout. outAsync = 0; if(adoptStream) { @@ -271,11 +269,7 @@ Ice::ConnectionI::OutgoingMessage::sent() } stream = 0; - if(out) - { - out->sent(); - } - else if(outAsync) + if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) invokeSent = outAsync->sent(); @@ -290,11 +284,7 @@ Ice::ConnectionI::OutgoingMessage::sent() void Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) { - if(out) - { - out->completed(ex); - } - else if(outAsync) + if(outAsync) { if(outAsync->exception(ex)) { @@ -436,7 +426,7 @@ Ice::ConnectionI::close(bool force) // requests to be retried, regardless of whether the server // has processed them or not. // - while(!_requests.empty() || !_asyncRequests.empty()) + while(!_asyncRequests.empty()) { wait(); } @@ -591,8 +581,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout)) { - if(acm.close == CloseOnIdleForceful || - (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty()))) + if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && !_asyncRequests.empty())) { // // Close the connection if we didn't receive a heartbeat in @@ -600,8 +589,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__)); } - else if(acm.close != CloseOnInvocation && - _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty()) + else if(acm.close != CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue->isEmpty() && + _asyncRequests.empty()) { // // The connection is idle, close it. @@ -611,94 +600,6 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) } } -bool -Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum) -{ - OutputStream* os = out->os(); - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - if(_exception) - { - throw RetryException(*_exception); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Ensure the message isn't bigger than what we can send with the - // transport. - // - _transceiver->checkSendSize(*os); - - Int requestId = 0; - if(response) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - else if(batchRequestNum > 0) - { - const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); - - // - // Send the message. If it can't be sent without blocking the message is added - // to _sendStreams and it will be sent by the selector thread. - // - bool sent = false; - try - { - OutgoingMessage message(out, os, compress, requestId); - sent = sendMessage(message) & AsyncStatusSent; - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception); - _exception->ice_throw(); - } - - if(response) - { - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out)); - } - - return sent; -} - AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum) { @@ -824,8 +725,7 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ void Ice::ConnectionI::flushBatchRequests() { - ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name); - out.invoke(); + end_flushBatchRequests(begin_flushBatchRequests()); } AsyncResultPtr @@ -1021,96 +921,6 @@ Ice::ConnectionI::getACM() } void -Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) - { - return; // The request has already been or will be shortly notified of the failure. - } - - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) - { - if(o->out == out) - { - if(o->requestId) - { - if(_requestsHint != _requests.end() && _requestsHint->second == out) - { - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); - } - else - { - _requests.erase(o->requestId); - } - } - - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) - { - o->canceled(true); // true = adopt the stream. - } - else - { - o->canceled(false); - _sendStreams.erase(o); - } - out->completed(ex); - } - return; - } - } - - if(dynamic_cast<Outgoing*>(out)) - { - if(_requestsHint != _requests.end() && _requestsHint->second == out) - { - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - out->completed(ex); - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); - } - return; - } - else - { - for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->second == out) - { - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - p->second->completed(ex); - assert(p != _requestsHint); - _requests.erase(p); - } - return; // We're done. - } - } - } - } -} - -void Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { // @@ -2018,28 +1828,13 @@ Ice::ConnectionI::finish(bool close) o->completed(*_exception); if(o->requestId) // Make sure finished isn't called twice. { - if(o->out) - { - _requests.erase(o->requestId); - } - else - { - _asyncRequests.erase(o->requestId); - } + _asyncRequests.erase(o->requestId); } } - _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage + _sendStreams.clear(); } - - for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->completed(*_exception); - } - - _requests.clear(); - for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { if(q->second->exception(*_exception)) @@ -2181,7 +1976,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0), _compressionLevel(1), _nextRequestId(1), - _requestsHint(_requests.end()), _asyncRequestsHint(_asyncRequests.end()), _messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()), _batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())), @@ -2249,7 +2043,6 @@ Ice::ConnectionI::~ConnectionI() assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); - assert(_requests.empty()); assert(_asyncRequests.empty()); } @@ -3310,54 +3103,22 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request stream.read(requestId); - map<Int, OutgoingBase*>::iterator p = _requests.end(); map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end(); - if(_requestsHint != _requests.end()) + if(_asyncRequestsHint != _asyncRequests.end()) { - if(_requestsHint->first == requestId) + if(_asyncRequestsHint->first == requestId) { - p = _requestsHint; + q = _asyncRequestsHint; } } - if(p == _requests.end()) - { - if(_asyncRequestsHint != _asyncRequests.end()) - { - if(_asyncRequestsHint->first == requestId) - { - q = _asyncRequestsHint; - } - } - } - - if(p == _requests.end() && q == _asyncRequests.end()) - { - p = _requests.find(requestId); - } - - if(p == _requests.end() && q == _asyncRequests.end()) + if(q == _asyncRequests.end()) { q = _asyncRequests.find(requestId); } - if(p != _requests.end()) - { - p->second->completed(stream); - - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - notifyAll(); // Notify threads blocked in close(false) - } - else if(q != _asyncRequests.end()) + if(q != _asyncRequests.end()) { outAsync = q->second; diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 72d239e48c1..ef277ee7c9a 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -49,14 +49,6 @@ # endif #endif -namespace IceInternal -{ - -class Outgoing; -class OutgoingBase; - -} - namespace Ice { @@ -101,15 +93,7 @@ public: struct OutgoingMessage { OutgoingMessage(Ice::OutputStream* str, bool comp) : - stream(str), out(0), compress(comp), requestId(0), adopted(false) -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - , isSent(false), invokeSent(false), receivedReply(false) -#endif - { - } - - OutgoingMessage(IceInternal::OutgoingBase* o, Ice::OutputStream* str, bool comp, int rid) : - stream(str), out(o), compress(comp), requestId(rid), adopted(false) + stream(str), compress(comp), requestId(0), adopted(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) , isSent(false), invokeSent(false), receivedReply(false) #endif @@ -118,7 +102,7 @@ public: OutgoingMessage(const IceInternal::OutgoingAsyncBasePtr& o, Ice::OutputStream* str, bool comp, int rid) : - stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false) + stream(str), outAsync(o), compress(comp), requestId(rid), adopted(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) , isSent(false), invokeSent(false), receivedReply(false) #endif @@ -131,7 +115,6 @@ public: void completed(const Ice::LocalException&); Ice::OutputStream* stream; - IceInternal::OutgoingBase* out; IceInternal::OutgoingAsyncBasePtr outAsync; bool compress; int requestId; @@ -188,7 +171,6 @@ public: void monitor(const IceUtil::Time&, const IceInternal::ACMConfig&); - bool sendRequest(IceInternal::OutgoingBase*, bool, bool, int); IceInternal::AsyncStatus sendAsyncRequest(const IceInternal::OutgoingAsyncBasePtr&, bool, bool, int); IceInternal::BatchRequestQueuePtr getBatchRequestQueue() const; @@ -216,7 +198,6 @@ public: const IceUtil::Optional<ACMHeartbeat>&); virtual ACM getACM(); - virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&); virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&); virtual void sendResponse(Int, Ice::OutputStream*, Byte, bool); @@ -365,9 +346,6 @@ private: Int _nextRequestId; - std::map<Int, IceInternal::OutgoingBase*> _requests; - std::map<Int, IceInternal::OutgoingBase*>::iterator _requestsHint; - std::map<Int, IceInternal::OutgoingAsyncBasePtr> _asyncRequests; std::map<Int, IceInternal::OutgoingAsyncBasePtr>::iterator _asyncRequestsHint; diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp index e461bb7d510..5705e28a9b0 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.cpp +++ b/cpp/src/Ice/ConnectionRequestHandler.cpp @@ -12,7 +12,6 @@ #include <Ice/Reference.h> #include <Ice/ConnectionI.h> #include <Ice/RouterInfo.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> using namespace std; @@ -54,12 +53,6 @@ ConnectionRequestHandler::update(const RequestHandlerPtr& previousHandler, const return ICE_SHARED_FROM_THIS; } -bool -ConnectionRequestHandler::sendRequest(ProxyOutgoingBase* out) -{ - return out->invokeRemote(_connection, _compress, _response) && !_response; // Finished if sent and no response -} - AsyncStatus ConnectionRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) { @@ -67,12 +60,6 @@ ConnectionRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) } void -ConnectionRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) -{ - _connection->requestCanceled(out, ex); -} - -void ConnectionRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { _connection->asyncRequestCanceled(outAsync, ex); diff --git a/cpp/src/Ice/ConnectionRequestHandler.h b/cpp/src/Ice/ConnectionRequestHandler.h index 90838f7d3f4..4d58928803b 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.h +++ b/cpp/src/Ice/ConnectionRequestHandler.h @@ -28,10 +28,8 @@ public: virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&); - virtual bool sendRequest(ProxyOutgoingBase*); virtual AsyncStatus sendAsyncRequest(const ProxyOutgoingAsyncBasePtr&); - virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual Ice::ConnectionIPtr getConnection(); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp deleted file mode 100644 index 592a9b2f9d4..00000000000 --- a/cpp/src/Ice/Outgoing.cpp +++ /dev/null @@ -1,693 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -#include <Ice/Outgoing.h> -#include <Ice/ConnectionI.h> -#include <Ice/CollocatedRequestHandler.h> -#include <Ice/Reference.h> -#include <Ice/Instance.h> -#include <Ice/LocalException.h> -#include <Ice/ReplyStatus.h> -#include <Ice/ImplicitContextI.h> - -using namespace std; -using namespace IceUtil; -using namespace Ice; -using namespace Ice::Instrumentation; -using namespace IceInternal; - -OutgoingBase::~OutgoingBase() -{ - // Out of line to avoid weak vtable -} - -OutgoingBase::OutgoingBase(Instance* instance) : _os(instance, Ice::currentProtocolEncoding), _sent(false) -{ -} - -ProxyOutgoingBase::ProxyOutgoingBase(const Ice::ObjectPrxPtr& proxy, OperationMode mode) : - OutgoingBase(proxy->__reference()->getInstance().get()), - _proxy(proxy), - _mode(mode), - _state(StateUnsent) -{ - int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _invocationTimeoutDeadline = Time::now(Time::Monotonic) + Time::milliSeconds(invocationTimeout); - } -} - -ProxyOutgoingBase::~ProxyOutgoingBase() -{ -} - -void -ProxyOutgoingBase::sent() -{ - Monitor<Mutex>::Lock sync(_monitor); - if(_proxy->__reference()->getMode() != Reference::ModeTwoway) - { - _childObserver.detach(); - _state = StateOK; - } - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated ProxyOutgoingBase object can be destroyed - // since the notify() on the monitor will release the thread waiting on the - // synchronous Ice call. - // -} - -void -ProxyOutgoingBase::completed(const Ice::Exception& ex) -{ - Monitor<Mutex>::Lock sync(_monitor); - //assert(_state <= StateInProgress); - if(_state > StateInProgress) - { - // - // Response was already received but message - // didn't get removed first from the connection - // send message queue so it's possible we can be - // notified of failures. In this case, ignore the - // failure and assume the outgoing has been sent. - // - assert(_state != StateFailed); - _sent = true; - _monitor.notify(); - return; - } - - _childObserver.failed(ex.ice_id()); - _childObserver.detach(); - - _state = StateFailed; - ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); - _monitor.notify(); -} - -void -ProxyOutgoingBase::completed(InputStream&) -{ - assert(false); // Must be overriden -} - -void -ProxyOutgoingBase::retryException(const Ice::Exception&) -{ - Monitor<Mutex>::Lock sync(_monitor); - assert(_state <= StateInProgress); - _state = StateRetry; - _monitor.notify(); -} - -bool -ProxyOutgoingBase::invokeImpl() -{ - assert(_state == StateUnsent); - - const int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - int cnt = 0; - while(true) - { - try - { - if(invocationTimeout > 0 && _invocationTimeoutDeadline <= Time::now(Time::Monotonic)) - { - throw Ice::InvocationTimeoutException(__FILE__, __LINE__); - } - - _state = StateInProgress; - _exception.reset(); - _sent = false; - - _handler = _proxy->__getRequestHandler(); - - if(_handler->sendRequest(this)) // Request sent and no response expected, we're done. - { - return true; - } - - if(invocationTimeout == -2) // Use the connection timeout - { - try - { - _invocationTimeoutDeadline = Time(); // Reset any previously set value - - int timeout = _handler->waitForConnection()->timeout(); - if(timeout > 0) - { - _invocationTimeoutDeadline = Time::now(Time::Monotonic) + Time::milliSeconds(timeout); - } - } - catch(const Ice::LocalException&) - { - } - } - - bool timedOut = false; - { - Monitor<Mutex>::Lock sync(_monitor); - // - // If the handler says it's not finished, we wait until we're done. - // - if(_invocationTimeoutDeadline != Time()) - { - Time now = Time::now(Time::Monotonic); - timedOut = now >= _invocationTimeoutDeadline; - while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry) - { - if(timedOut) - { - break; - } - _monitor.timedWait(_invocationTimeoutDeadline - now); - - if((_state == StateInProgress || !_sent) && _state != StateFailed) - { - now = Time::now(Time::Monotonic); - timedOut = now >= _invocationTimeoutDeadline; - } - } - } - else - { - while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry) - { - _monitor.wait(); - } - } - } - - if(timedOut) - { - if(invocationTimeout == -2) - { - _handler->requestCanceled(this, ConnectionTimeoutException(__FILE__, __LINE__)); - } - else - { - _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); - } - - // - // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestCanceled got called. - // In this case, the exception should be set on the ProxyOutgoingBase. - // - Monitor<Mutex>::Lock sync(_monitor); - while(_state == StateInProgress) - { - _monitor.wait(); - } - } - - if(_exception) - { - _exception->ice_throw(); - } - else if(_state == StateRetry) - { - _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry. - continue; - } - else - { - assert(_state != StateInProgress); - return _state == StateOK; - } - } - catch(const RetryException&) - { - _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry. - } - catch(const Ice::Exception& ex) - { - try - { - Time interval; - interval = Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); - if(interval > Time()) - { - if(invocationTimeout > 0) - { - IceUtil::Time now = Time::now(Time::Monotonic); - IceUtil::Time retryDeadline = now + interval; - - // - // Wait until either the retry and invocation timeout deadline is reached. - // Note that we're using a loop here because sleep() precision isn't as - // good as the motonic clock and it can return few hundred micro-seconds - // earlier which breaks the check for the invocation timeout. - // - while(retryDeadline > now && _invocationTimeoutDeadline > now) - { - if(retryDeadline < _invocationTimeoutDeadline) - { - ThreadControl::sleep(retryDeadline - now); - } - else if(_invocationTimeoutDeadline > now) - { - ThreadControl::sleep(_invocationTimeoutDeadline - now); - } - now = Time::now(Time::Monotonic); - } - if(now >= _invocationTimeoutDeadline) - { - throw Ice::InvocationTimeoutException(__FILE__, __LINE__); - } - } - else - { - ThreadControl::sleep(interval); - } - } - _observer.retried(); - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_id()); - throw; - } - } - } - - assert(false); - return false; -} - -Outgoing::Outgoing(const Ice::ObjectPrxPtr& proxy, const string& operation, OperationMode mode, const Context& context) : - ProxyOutgoingBase(proxy, mode), - _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), - _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _operation(operation) -{ - checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); - _observer.attach(proxy, operation, context); - switch(_proxy->__reference()->getMode()) - { - case Reference::ModeTwoway: - case Reference::ModeOneway: - case Reference::ModeDatagram: - { - _os.writeBlob(requestHdr, sizeof(requestHdr)); - break; - } - - case Reference::ModeBatchOneway: - case Reference::ModeBatchDatagram: - { - _proxy->__getBatchRequestQueue()->prepareBatchRequest(&_os); - break; - } - } - - try - { - _os.write(_proxy->__reference()->getIdentity()); - - // - // For compatibility with the old FacetPath. - // - if(_proxy->__reference()->getFacet().empty()) - { - _os.write(static_cast<string*>(0), static_cast<string*>(0)); - } - else - { - string facet = _proxy->__reference()->getFacet(); - _os.write(&facet, &facet + 1); - } - - _os.write(operation, false); - - _os.write(static_cast<Ice::Byte>(mode)); - - if(&context != &Ice::noExplicitContext) - { - // - // Explicit context - // - _os.write(context); - } - else - { - // - // Implicit context - // - const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext(); - const Context& prxContext = _proxy->__reference()->getContext()->getValue(); - if(implicitContext == 0) - { - _os.write(prxContext); - } - else - { - implicitContext->write(prxContext, &_os); - } - } - } - catch(const LocalException& ex) - { - abort(ex); - } -} - -Outgoing::~Outgoing() -{ -} - -bool -Outgoing::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response, 0); -} - -void -Outgoing::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this, 0); -} - -bool -Outgoing::invoke() -{ - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _state = StateInProgress; - _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, _operation); - return true; - } - return invokeImpl(); -} - -void -Outgoing::abort(const LocalException& ex) -{ - assert(_state == StateUnsent); - - // - // If we didn't finish a batch oneway or datagram request, we must - // notify the connection about that we give up ownership of the - // batch stream. - // - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); - } - - ex.ice_throw(); -} - -void -Outgoing::completed(InputStream& is) -{ - Monitor<Mutex>::Lock sync(_monitor); - - assert(_proxy->__reference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. - - assert(_state <= StateInProgress); - if(_childObserver) - { - _childObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4)); - } - _childObserver.detach(); - - _is.swap(is); - - Ice::Byte replyStatus; - _is.read(replyStatus); - - switch(replyStatus) - { - case replyOK: - { - _state = StateOK; // The state must be set last, in case there is an exception. - break; - } - - case replyUserException: - { - _observer.userException(); - _state = StateUserException; // The state must be set last, in case there is an exception. - break; - } - - case replyObjectNotExist: - case replyFacetNotExist: - case replyOperationNotExist: - { - // - // Don't read the exception members directly into the - // exception. Otherwise if reading fails and raises an - // exception, you will have a memory leak. - // - Identity ident; - _is.read(ident); - - // - // For compatibility with the old FacetPath. - // - vector<string> facetPath; - _is.read(facetPath); - string facet; - if(!facetPath.empty()) - { - if(facetPath.size() > 1) - { - throw MarshalException(__FILE__, __LINE__); - } - facet.swap(facetPath[0]); - } - - string operation; - _is.read(operation, false); - - RequestFailedException* ex; - switch(replyStatus) - { - case replyObjectNotExist: - { - ex = new ObjectNotExistException(__FILE__, __LINE__); - break; - } - - case replyFacetNotExist: - { - ex = new FacetNotExistException(__FILE__, __LINE__); - break; - } - - case replyOperationNotExist: - { - ex = new OperationNotExistException(__FILE__, __LINE__); - break; - } - - default: - { - ex = 0; // To keep the compiler from complaining. - assert(false); - break; - } - } - - ex->id = ident; - ex->facet = facet; - ex->operation = operation; - _exception.reset(ex); // adopt - _state = StateLocalException; // The state must be set last, in case there is an exception. - break; - } - - case replyUnknownException: - case replyUnknownLocalException: - case replyUnknownUserException: - { - // - // Don't read the exception members directly into the - // exception. Otherwise if reading fails and raises an - // exception, you will have a memory leak. - // - string unknown; - _is.read(unknown, false); - - UnknownException* ex; - switch(replyStatus) - { - case replyUnknownException: - { - ex = new UnknownException(__FILE__, __LINE__); - break; - } - - case replyUnknownLocalException: - { - ex = new UnknownLocalException(__FILE__, __LINE__); - break; - } - - case replyUnknownUserException: - { - ex = new UnknownUserException(__FILE__, __LINE__); - break; - } - - default: - { - ex = 0; // To keep the compiler from complaining. - assert(false); - break; - } - } - - ex->unknown = unknown; - _exception.reset(ex); // adopt - _state = StateLocalException; // The state must be set last, in case there is an exception. - break; - } - - default: - { - _exception.reset(new UnknownReplyStatusException(__FILE__, __LINE__)); - _state = StateLocalException; - break; - } - } - - _monitor.notify(); -} - -void -Outgoing::throwUserException() -{ - try - { - _is.startEncapsulation(); - _is.throwException(); - } - catch(const Ice::UserException&) - { - _is.endEncapsulation(); - throw; - } -} - -ProxyFlushBatch::ProxyFlushBatch(const Ice::ObjectPrxPtr& proxy, const string& operation) : - ProxyOutgoingBase(proxy, ICE_ENUM(OperationMode, Normal)) -{ - checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); - _observer.attach(proxy, operation, ::Ice::noExplicitContext); - - _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); -} - -bool -ProxyFlushBatch::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response, _batchRequestNum); -} - -void -ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this, _batchRequestNum); -} - -void -ProxyFlushBatch::invoke() -{ - if(_batchRequestNum == 0) - { - sent(); - } - else - { - invokeImpl(); - } -} - -ConnectionFlushBatch::ConnectionFlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : - OutgoingBase(instance), _connection(connection) -{ - _observer.attach(instance, operation); -} - -void -ConnectionFlushBatch::invoke() -{ - int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os); - - try - { - if(batchRequestNum == 0) - { - sent(); - } - else if(!_connection->sendRequest(this, false, false, batchRequestNum)) - { - Monitor<Mutex>::Lock sync(_monitor); - while(!_exception && !_sent) - { - _monitor.wait(); - } - if(_exception) - { - _exception->ice_throw(); - } - } - } - catch(const RetryException& ex) - { - ex.get()->ice_throw(); - } -} - -void -ConnectionFlushBatch::sent() -{ - Monitor<Mutex>::Lock sync(_monitor); - _childObserver.detach(); - - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated ConnectionFlushBatch - // object can be destroyed since the notify() on the monitor will - // release the thread waiting on the synchronous Ice call. - // -} - -void -ConnectionFlushBatch::completed(const Ice::Exception& ex) -{ - Monitor<Mutex>::Lock sync(_monitor); - _childObserver.failed(ex.ice_id()); - _childObserver.detach(); - ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); - _monitor.notify(); -} - -void -ConnectionFlushBatch::completed(InputStream&) -{ - assert(false); -} - -void -ConnectionFlushBatch::retryException(const Ice::Exception& ex) -{ - completed(ex); -} diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index e9261214a07..20ba0b38b3a 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -822,10 +822,10 @@ ProxyOutgoingAsyncBase::runTimerTask() } } -OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx) : +OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, bool synchronous) : ProxyOutgoingAsyncBase(prx), _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())), - _synchronous(false) + _synchronous(synchronous) { } diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 9a38f948c1a..119a0eb178a 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -12,7 +12,6 @@ #include <Ice/ReferenceFactory.h> #include <Ice/Object.h> #include <Ice/ObjectAdapterFactory.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/Reference.h> #include <Ice/EndpointI.h> @@ -68,7 +67,7 @@ Ice::ObjectPrx::__ice_isA(const shared_ptr<IceInternal::OutgoingAsyncT<bool>>& o const string& typeId, const Context& ctx) { - __checkAsyncTwowayOnly(ice_isA_name); + __checkTwowayOnly(ice_isA_name); outAsync->invoke(ice_isA_name, OperationMode::Nonmutating, DefaultFormat, ctx, [&](Ice::OutputStream* os) { @@ -86,7 +85,7 @@ Ice::ObjectPrx::__ice_ping(const shared_ptr<IceInternal::OutgoingAsyncT<void>>& void Ice::ObjectPrx::__ice_ids(const shared_ptr<IceInternal::OutgoingAsyncT<vector<string>>>& outAsync, const Context& ctx) { - __checkAsyncTwowayOnly(ice_ids_name); + __checkTwowayOnly(ice_ids_name); outAsync->invoke(ice_ids_name, OperationMode::Nonmutating, DefaultFormat, ctx, nullptr, nullptr, [](Ice::InputStream* stream) { @@ -99,7 +98,7 @@ Ice::ObjectPrx::__ice_ids(const shared_ptr<IceInternal::OutgoingAsyncT<vector<st void Ice::ObjectPrx::__ice_id(const shared_ptr<IceInternal::OutgoingAsyncT<string>>& outAsync, const Context& ctx) { - __checkAsyncTwowayOnly(ice_id_name); + __checkTwowayOnly(ice_id_name); outAsync->invoke(ice_id_name, OperationMode::Nonmutating, DefaultFormat, ctx, nullptr, nullptr, [](Ice::InputStream* stream) { @@ -121,6 +120,21 @@ Ice::ObjectPrx::__ice_flushBatchRequests(const shared_ptr<IceInternal::ProxyFlus outAsync->invoke(ice_flushBatchRequests_name); } +void +Ice::ObjectPrx::__checkTwowayOnly(const string& name) const +{ + // + // No mutex lock necessary, there is nothing mutable in this operation. + // + if(!ice_isTwoway()) + { + throw IceUtil::IllegalArgumentException(__FILE__, + __LINE__, + "`" + name + "' can only be called with a twoway proxy"); + } +} + + shared_ptr<ObjectPrx> Ice::ObjectPrx::__newInstance() const { @@ -173,52 +187,20 @@ IceProxy::Ice::Object::operator<(const Object& r) const return _reference < r._reference; } -bool -IceProxy::Ice::Object::ice_isA(const string& typeId, const Context& context) -{ - __checkTwowayOnly(ice_isA_name); - Outgoing __og(this, ice_isA_name, ::Ice::Nonmutating, context); - try - { - OutputStream* __os = __og.startWriteParams(DefaultFormat); - __os->write(typeId, false); - __og.endWriteParams(); - } - catch(const ::Ice::LocalException& __ex) - { - __og.abort(__ex); - } - if(!__og.invoke()) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_id()); - } - } - bool __ret; - InputStream* __is = __og.startReadParams(); - __is->read(__ret); - __og.endReadParams(); - return __ret; -} - Ice::AsyncResultPtr IceProxy::Ice::Object::__begin_ice_isA(const string& typeId, const Context& ctx, const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie, + bool sync) { - __checkAsyncTwowayOnly(ice_isA_name); - OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_isA_name, del, cookie); + __checkTwowayOnly(ice_isA_name, sync); + OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_isA_name, del, cookie, sync); try { __result->prepare(ice_isA_name, Nonmutating, ctx); ::Ice::OutputStream* __os = __result->startWriteParams(DefaultFormat); - __os->write(typeId); + __os->write(typeId, false); __result->endWriteParams(); __result->invoke(ice_isA_name); } @@ -252,35 +234,13 @@ IceProxy::Ice::Object::end_ice_isA(const AsyncResultPtr& __result) return __ret; } -void -IceProxy::Ice::Object::ice_ping(const Context& context) -{ - Outgoing __og(this, ice_ping_name, ::Ice::Nonmutating, context); - __og.writeEmptyParams(); - bool __ok = __og.invoke(); - if(__og.hasResponse()) - { - if(!__ok) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_id()); - } - } - __og.readEmptyParams(); - } -} - AsyncResultPtr IceProxy::Ice::Object::__begin_ice_ping(const Context& ctx, const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie, + bool sync) { - OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_ping_name, del, cookie); + OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_ping_name, del, cookie, sync); try { __result->prepare(ice_ping_name, Nonmutating, ctx); @@ -300,61 +260,14 @@ IceProxy::Ice::Object::end_ice_ping(const AsyncResultPtr& __result) __end(__result, ice_ping_name); } -vector<string> -IceProxy::Ice::Object::ice_ids(const Context& context) -{ - __checkTwowayOnly(ice_ids_name); - Outgoing __og(this, ice_ids_name, ::Ice::Nonmutating, context); - __og.writeEmptyParams(); - if(!__og.invoke()) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_id()); - } - } - vector<string> __ret; - InputStream* __is = __og.startReadParams(); - __is->read(__ret, false); - __og.endReadParams(); - return __ret; -} - -string -IceProxy::Ice::Object::ice_id(const Context& context) -{ - __checkTwowayOnly(ice_id_name); - Outgoing __og(this, ice_id_name, ::Ice::Nonmutating, context); - __og.writeEmptyParams(); - if(!__og.invoke()) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_id()); - } - } - string __ret; - InputStream* __is = __og.startReadParams(); - __is->read(__ret, false); - __og.endReadParams(); - return __ret; -} - AsyncResultPtr IceProxy::Ice::Object::__begin_ice_ids(const Context& ctx, const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie, + bool sync) { - __checkAsyncTwowayOnly(ice_ids_name); - OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_ids_name, del, cookie); + __checkTwowayOnly(ice_ids_name, sync); + OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_ids_name, del, cookie, sync); try { __result->prepare(ice_ids_name, Nonmutating, ctx); @@ -386,7 +299,7 @@ IceProxy::Ice::Object::end_ice_ids(const AsyncResultPtr& __result) } vector<string> __ret; ::Ice::InputStream* __is = __result->__startReadParams(); - __is->read(__ret); + __is->read(__ret, false); __result->__endReadParams(); return __ret; } @@ -394,10 +307,11 @@ IceProxy::Ice::Object::end_ice_ids(const AsyncResultPtr& __result) AsyncResultPtr IceProxy::Ice::Object::__begin_ice_id(const Context& ctx, const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie, + bool sync) { - __checkAsyncTwowayOnly(ice_id_name); - OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_id_name, del, cookie); + __checkTwowayOnly(ice_id_name, sync); + OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_id_name, del, cookie, sync); try { __result->prepare(ice_id_name, Nonmutating, ctx); @@ -429,7 +343,7 @@ IceProxy::Ice::Object::end_ice_id(const AsyncResultPtr& __result) } string __ret; ::Ice::InputStream* __is = __result->__startReadParams(); - __is->read(__ret); + __is->read(__ret, false); __result->__endReadParams(); return __ret; } @@ -460,7 +374,8 @@ IceProxy::Ice::Object::__begin_ice_invoke(const string& operation, const vector<Byte>& inEncaps, const Context& ctx, const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie, + bool sync) { pair<const Byte*, const Byte*> inPair; if(inEncaps.empty()) @@ -490,42 +405,16 @@ IceProxy::Ice::Object::end_ice_invoke(vector<Byte>& outEncaps, const AsyncResult return ok; } -bool -IceProxy::Ice::Object::ice_invoke(const string& operation, - OperationMode mode, - const pair<const Byte*, const Byte*>& inEncaps, - vector<Byte>& outEncaps, - const Context& context) -{ - Outgoing __og(this, operation, mode, context); - try - { - __og.writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); - } - catch(const ::Ice::LocalException& __ex) - { - __og.abort(__ex); - } - bool ok = __og.invoke(); - if(_reference->getMode() == Reference::ModeTwoway) - { - const Byte* v; - Int sz; - __og.readParamEncaps(v, sz); - vector<Byte>(v, v + sz).swap(outEncaps); - } - return ok; -} - AsyncResultPtr IceProxy::Ice::Object::__begin_ice_invoke(const string& operation, OperationMode mode, const pair<const Byte*, const Byte*>& inEncaps, const Context& ctx, const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) + const ::Ice::LocalObjectPtr& cookie, + bool sync) { - OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_invoke_name, del, cookie); + OutgoingAsyncPtr __result = new CallbackOutgoing(this, ice_invoke_name, del, cookie, sync); try { __result->prepare(operation, mode, ctx); @@ -554,8 +443,9 @@ IceProxy::Ice::Object::___end_ice_invoke(pair<const Byte*, const Byte*>& outEnca } ::Ice::AsyncResultPtr -IceProxy::Ice::Object::begin_ice_flushBatchRequestsInternal(const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) +IceProxy::Ice::Object::__begin_ice_flushBatchRequests(const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { class ProxyFlushBatchAsyncWithCallback : public ProxyFlushBatchAsync, public CallbackCompletion { @@ -596,33 +486,6 @@ IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& result) } void -IceProxy::Ice::Object::__invoke(Outgoing& __og) const -{ - // - // Helper for operations without out/return parameters and user - // exceptions. - // - - bool __ok = __og.invoke(); - if(__og.hasResponse()) - { - if(!__ok) - { - try - { - __og.throwUserException(); - } - catch(const ::Ice::UserException& __ex) - { - ::Ice::UnknownUserException __uue(__FILE__, __LINE__, __ex.ice_id()); - throw __uue; - } - } - __og.readEmptyParams(); - } -} - -void IceProxy::Ice::Object::__end(const ::Ice::AsyncResultPtr& __result, const std::string& operation) const { AsyncResult::__check(__result, this, operation); @@ -666,46 +529,10 @@ IceProxy::Ice::Object::__newInstance() const return new Object; } -ConnectionPtr -IceProxy::Ice::Object::ice_getConnection() -{ - InvocationObserver observer(this, ice_getConnection_name, ::Ice::noExplicitContext); - int cnt = 0; - while(true) - { - RequestHandlerPtr handler; - try - { - handler = __getRequestHandler(); - return handler->waitForConnection(); // Wait for the connection to be established. - } - catch(const IceInternal::RetryException&) - { - __updateRequestHandler(handler, 0); // Clear request handler and retry. - } - catch(const Exception& ex) - { - try - { - int interval = __handleException(ex, handler, ICE_ENUM(OperationMode, Idempotent), false, cnt); - observer.retried(); - if(interval > 0) - { - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); - } - } - catch(const Exception& exc) - { - observer.failed(exc.ice_id()); - throw; - } - } - } -} - AsyncResultPtr -IceProxy::Ice::Object::begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr& del, - const ::Ice::LocalObjectPtr& cookie) +IceProxy::Ice::Object::__begin_ice_getConnection(const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie, + bool sync) { class ProxyGetConnectionWithCallback : public ProxyGetConnection, public CallbackCompletion { @@ -747,23 +574,23 @@ IceProxy::Ice::Object::end_ice_getConnection(const AsyncResultPtr& __result) } void -IceProxy::Ice::Object::ice_flushBatchRequests() -{ - ProxyFlushBatch og(ICE_SHARED_FROM_THIS, ice_flushBatchRequests_name); - og.invoke(); -} - -void -IceProxy::Ice::Object::__checkTwowayOnly(const string& name) const +IceProxy::Ice::Object::__checkTwowayOnly(const string& name, bool sync) const { // // No mutex lock necessary, there is nothing mutable in this operation. // if(!ice_isTwoway()) { - TwowayOnlyException ex(__FILE__, __LINE__); - ex.operation = name; - throw ex; + if(sync) + { + throw TwowayOnlyException(__FILE__, __LINE__, name); + } + else + { + throw IceUtil::IllegalArgumentException(__FILE__, + __LINE__, + "`" + name + "' can only be called with a twoway proxy"); + } } } @@ -781,20 +608,6 @@ IceProxy::Ice::Object::__checkTwowayOnly(const string& name) const // methods common for both C++11/C++98 mappings // -void -ICE_OBJECT_PRX::__checkAsyncTwowayOnly(const string& name) const -{ - // - // No mutex lock necessary, there is nothing mutable in this operation. - // - if(!ice_isTwoway()) - { - throw IceUtil::IllegalArgumentException(__FILE__, - __LINE__, - "`" + name + "' can only be called with a twoway proxy"); - } -} - Identity ICE_OBJECT_PRX::ice_getIdentity() const { diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h index c1ce9446f0e..ac97141f743 100644 --- a/cpp/src/Ice/RequestHandler.h +++ b/cpp/src/Ice/RequestHandler.h @@ -29,9 +29,6 @@ class LocalException; namespace IceInternal { -class OutgoingBase; -class ProxyOutgoingBase; - // // An exception wrapper, which is used to notify that the request // handler should be cleared and the invocation retried. @@ -58,19 +55,17 @@ class CancellationHandler { public: - virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&) = 0; virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&) = 0; }; class RequestHandler : public CancellationHandler { public: - + RequestHandler(const ReferencePtr&); virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&) = 0; - virtual bool sendRequest(ProxyOutgoingBase*) = 0; virtual AsyncStatus sendAsyncRequest(const ProxyOutgoingAsyncBasePtr&) = 0; const ReferencePtr& getReference() const { return _reference; } // Inlined for performances. diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp index f74988f65a9..87389bd1f6e 100644 --- a/cpp/src/Ice/RetryQueue.cpp +++ b/cpp/src/Ice/RetryQueue.cpp @@ -44,12 +44,6 @@ IceInternal::RetryTask::runTimerTask() } void -IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException&) -{ - assert(false); -} - -void IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { if(_queue->cancel(ICE_SHARED_FROM_THIS)) diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h index bc2dd744786..e97e158af5b 100644 --- a/cpp/src/Ice/RetryQueue.h +++ b/cpp/src/Ice/RetryQueue.h @@ -33,7 +33,6 @@ public: virtual void runTimerTask(); - virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); void destroy(); diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp index 045913f1f4e..457b6ad4d66 100644 --- a/cpp/src/slice2cpp/Gen.cpp +++ b/cpp/src/slice2cpp/Gen.cpp @@ -793,7 +793,6 @@ Slice::Gen::generate(const UnitPtr& p) } C << "\n#include <Ice/LocalException.h>"; C << "\n#include <Ice/ValueFactory.h>"; - C << "\n#include <Ice/Outgoing.h>"; C << "\n#include <Ice/OutgoingAsync.h>"; } else if(p->hasLocalClassDefsWithAsync()) @@ -2090,7 +2089,15 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) string deprecateSymbol = getDeprecateSymbol(p, cl); H << sp << nl << deprecateSymbol << _dllMemberExport << retS << ' ' << fixKwd(name) << spar << paramsDecl - << "const ::Ice::Context& __ctx = ::Ice::noExplicitContext" << epar << ";"; + << "const ::Ice::Context& __ctx = ::Ice::noExplicitContext" << epar; + H << sb; + if(ret) + { + H << nl << "return "; + } + H << "end_" << name << spar << outParamNamesAMI << "__begin_" + name << spar << argsAMI; + H << "__ctx" << "::IceInternal::__dummyCallback" << "0" << "true" << epar << epar << ';'; + H << eb; H << sp << nl << "::Ice::AsyncResultPtr begin_" << name << spar << paramsDeclAMI << "const ::Ice::Context& __ctx = ::Ice::noExplicitContext" << epar; @@ -2144,133 +2151,23 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) H << sp << nl << _dllMemberExport << "::Ice::AsyncResultPtr __begin_" << name << spar << paramsAMI << "const ::Ice::Context&" << "const ::IceInternal::CallbackBasePtr&" - << "const ::Ice::LocalObjectPtr& __cookie = 0" << epar << ';'; + << "const ::Ice::LocalObjectPtr& __cookie = 0" + << "bool sync = false" << epar << ';'; H << nl; H.dec(); H << nl << "public:"; H.inc(); - C << sp << nl << retS << nl << "IceProxy" << scoped << spar << paramsDecl << "const ::Ice::Context& __ctx" << epar; - C << sb; - if(p->returnsData()) - { - C << nl << "__checkTwowayOnly(" << flatName << ");"; - } - C << nl << "::IceInternal::Outgoing __og(this, " << flatName << ", " << operationModeToString(p->sendMode()) - << ", __ctx);"; - if(inParams.empty()) - { - C << nl << "__og.writeEmptyParams();"; - } - else - { - C << nl << "try"; - C << sb; - C << nl<< "::Ice::OutputStream* __os = __og.startWriteParams(" << opFormatTypeToString(p) << ");"; - writeMarshalCode(C, inParams, 0, true, TypeContextInParam); - if(p->sendsClasses(false)) - { - C << nl << "__os->writePendingValues();"; - } - C << nl << "__og.endWriteParams();"; - C << eb; - C << nl << "catch(const ::Ice::LocalException& __ex)"; - C << sb; - C << nl << "__og.abort(__ex);"; - C << eb; - } - - if(!p->returnsData()) - { - C << nl << "__invoke(__og);"; // Use helpers for methods that don't return data. - } - else - { - C << nl << "if(!__og.invoke())"; - C << sb; - C << nl << "try"; - C << sb; - C << nl << "__og.throwUserException();"; - C << eb; - - // - // Generate a catch block for each legal user exception. This is necessary - // to prevent an "impossible" user exception to be thrown if client and - // and server use different exception specifications for an operation. For - // example: - // - // Client compiled with: - // exception A {}; - // exception B {}; - // interface I { - // void op() throws A; - // }; - // - // Server compiled with: - // exception A {}; - // exception B {}; - // interface I { - // void op() throws B; // Differs from client - // }; - // - // We need the catch blocks so, if the server throws B from op(), the - // client receives UnknownUserException instead of B. - // - ExceptionList throws = p->throws(); - throws.sort(); - throws.unique(); -#if defined(__SUNPRO_CC) - throws.sort(derivedToBaseCompare); -#else - throws.sort(Slice::DerivedToBaseCompare()); -#endif - for(ExceptionList::const_iterator i = throws.begin(); i != throws.end(); ++i) - { - C << nl << "catch(const " << fixKwd((*i)->scoped()) << "&)"; - C << sb; - C << nl << "throw;"; - C << eb; - } - C << nl << "catch(const ::Ice::UserException& __ex)"; - C << sb; - // - // COMPILERFIX: Don't throw UnknownUserException directly. This is causing access - // violation errors with Visual C++ 64bits optimized builds. See bug #2962. - // - C << nl << "::Ice::UnknownUserException __uue(__FILE__, __LINE__, __ex.ice_id());"; - C << nl << "throw __uue;"; - C << eb; - C << eb; - - if(ret || !outParams.empty()) - { - writeAllocateCode(C, ParamDeclList(), p, true, _useWstring); - C << nl << "::Ice::InputStream* __is = __og.startReadParams();"; - writeUnmarshalCode(C, outParams, p, true); - if(p->returnsClasses(false)) - { - C << nl << "__is->readPendingValues();"; - } - C << nl << "__og.endReadParams();"; - } - - if(ret) - { - C << nl << "return __ret;"; - } - } - C << eb; - C << sp << nl << "::Ice::AsyncResultPtr" << nl << "IceProxy" << scope << "__begin_" << name << spar << paramsDeclAMI << "const ::Ice::Context& __ctx" << "const ::IceInternal::CallbackBasePtr& __del" - << "const ::Ice::LocalObjectPtr& __cookie" << epar; + << "const ::Ice::LocalObjectPtr& __cookie" << "bool sync" << epar; C << sb; if(p->returnsData()) { - C << nl << "__checkAsyncTwowayOnly(" << flatName << ");"; + C << nl << "__checkTwowayOnly(" << flatName << ", sync);"; } C << nl << "::IceInternal::OutgoingAsyncPtr __result = new ::IceInternal::CallbackOutgoing(this, " << flatName - << ", __del, __cookie);"; + << ", __del, __cookie, sync);"; C << nl << "try"; C << sb; C << nl << "__result->prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);"; @@ -6364,7 +6261,7 @@ Slice::Gen::Cpp11ProxyVisitor::visitOperation(const OperationPtr& p) C << sb; if(p->returnsData()) { - C << nl << "__checkAsyncTwowayOnly(" << flatName << ");"; + C << nl << "__checkTwowayOnly(" << flatName << ");"; } C << nl << "::std::function<void(::Ice::InputStream*)> __read;"; @@ -6472,7 +6369,7 @@ Slice::Gen::Cpp11ProxyVisitor::visitOperation(const OperationPtr& p) C << sb; if(p->returnsData()) { - C << nl << "__checkAsyncTwowayOnly(" << flatName << ");"; + C << nl << "__checkTwowayOnly(" << flatName << ");"; } C << nl << "__outAsync->invoke(" << flatName << ", "; C << operationModeToString(p->sendMode(), true) << ", " << opFormatTypeToString(p) << ", __ctx, "; |