From 4723f7bbd3ea2ffb241df26e5736fa5c04589e7b Mon Sep 17 00:00:00 2001 From: Dwayne Boone Date: Thu, 18 Sep 2014 09:51:22 -0230 Subject: ICE-5661 add proxy->begin_ice_getConnection --- cpp/src/Ice/CollocatedRequestHandler.h | 4 +- cpp/src/Ice/ConnectRequestHandler.cpp | 48 +++++------ cpp/src/Ice/ConnectRequestHandler.h | 2 +- cpp/src/Ice/ConnectionRequestHandler.cpp | 8 +- cpp/src/Ice/LoggerAdminI.cpp | 126 ++++++++++++++--------------- cpp/src/Ice/OutgoingAsync.cpp | 135 ++++++++++++++++++++++++++----- cpp/src/Ice/Proxy.cpp | 69 ++++++++++++++++ cpp/src/Ice/RequestHandler.cpp | 5 +- cpp/src/Ice/RequestHandler.h | 3 +- 9 files changed, 279 insertions(+), 121 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index 7b80a2a036a..36462f80feb 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -63,7 +63,7 @@ public: virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); - + void invokeRequest(Outgoing*); AsyncStatus invokeAsyncRequest(OutgoingAsync*); void invokeBatchRequests(BatchOutgoing*); @@ -77,7 +77,7 @@ public: private: void handleException(Ice::Int, const Ice::Exception&); - + const Ice::ObjectAdapterIPtr _adapter; const bool _dispatcher; const Ice::LoggerPtr _logger; diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 10f756be30c..874858e1c7e 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -28,27 +28,27 @@ namespace class FlushRequestsWithException : public DispatchWorkItem { public: - + FlushRequestsWithException(const Ice::ConnectionPtr& connection, const ConnectRequestHandlerPtr& handler) : DispatchWorkItem(connection), _handler(handler) { } - + virtual void run() { _handler->flushRequestsWithException(); } - + private: - + const ConnectRequestHandlerPtr _handler; }; class FlushSentRequests : public DispatchWorkItem { public: - + FlushSentRequests(const Ice::ConnectionPtr& connection, const vector& callbacks) : DispatchWorkItem(connection), _callbacks(callbacks) { @@ -115,7 +115,7 @@ ConnectRequestHandler::prepareBatchRequest(BasicStream* os) { wait(); } - + try { if(!initialized()) @@ -146,7 +146,7 @@ ConnectRequestHandler::finishBatchRequest(BasicStream* os) _batchStream.swap(*os); - if(!_batchAutoFlush && + if(!_batchAutoFlush && _batchStream.b.size() + _batchRequestsSize > _reference->getInstance()->messageSizeMax()) { Ex::throwMemoryLimitException(__FILE__, __LINE__, _batchStream.b.size() + _batchRequestsSize, @@ -259,7 +259,7 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) _connection->requestTimedOut(out); } -void +void ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { { @@ -298,7 +298,7 @@ ConnectRequestHandler::getConnection() else { return _connection; - } + } } Ice::ConnectionIPtr @@ -309,7 +309,7 @@ ConnectRequestHandler::waitForConnection() { throw RetryException(*_exception.get()); } - + // // Wait for the connection establishment to complete or fail. // @@ -326,7 +326,7 @@ ConnectRequestHandler::waitForConnection() else { return _connection; - } + } } void @@ -340,7 +340,7 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool _connection = connection; _compress = compress; } - + // // If this proxy is for a non-local object, and we are using a router, then // add this proxy to the router info object. @@ -369,7 +369,7 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) // // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be + // from the client thread pool since this will result in ice_exception callbacks to be // called. // if(!_requests.empty()) @@ -384,7 +384,7 @@ void ConnectRequestHandler::addedProxy() { // - // The proxy was added to the router info, we're now ready to send the + // The proxy was added to the router info, we're now ready to send the // queued requests. // flushRequests(); @@ -406,7 +406,7 @@ ConnectRequestHandler::initialized() { wait(); } - + if(_exception.get()) { _exception->ice_throw(); @@ -425,17 +425,17 @@ ConnectRequestHandler::flushRequests() { Lock sync(*this); assert(_connection && !_initialized); - + while(_batchRequestInProgress) { wait(); } - + // // We set the _flushing flag to true to prevent any additional queuing. Callers // might block for a little while as the queued requests are being sent but this // shouldn't be an issue as the request sends are non-blocking. - // + // _flushing = true; } @@ -486,7 +486,7 @@ ConnectRequestHandler::flushRequests() // RetryException. We handle the exception like it // was an exception that occured while sending the // request. - // + // Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(ex.get()->ice_clone()); @@ -504,11 +504,11 @@ ConnectRequestHandler::flushRequests() { _reference->getInstance()->clientThreadPool()->dispatch(new FlushSentRequests(_connection, sentCallbacks)); } - + // // We've finished sending the queued requests and the request handler now sends - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request + // the requests over the connection directly. It's time to substitute the + // request handler of the proxy with the more efficient connection request // handler which does not have any synchronization. This also breaks the cyclic // reference count with the proxy. // @@ -538,11 +538,11 @@ ConnectRequestHandler::flushRequestsWithException() for(deque::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { if(p->out) - { + { p->out->finished(*_exception.get()); } else if(p->outAsync) - { + { p->outAsync->__finished(*_exception.get()); } else diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index a95f62f8717..ea86da211ff 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -25,7 +25,7 @@ namespace IceInternal { -class ConnectRequestHandler : public RequestHandler, +class ConnectRequestHandler : public RequestHandler, public Reference::GetConnectionCallback, public RouterInfo::AddProxyCallback, public IceUtil::Monitor diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp index fba1ee9dc34..4d9d746675d 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.cpp +++ b/cpp/src/Ice/ConnectionRequestHandler.cpp @@ -29,8 +29,8 @@ ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference } } -ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference, - const Ice::ConnectionIPtr& connection, +ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference, + const Ice::ConnectionIPtr& connection, bool compress) : RequestHandler(reference), _connection(connection), @@ -68,13 +68,13 @@ ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr return out->__send(_connection, _compress, _response); } -void +void ConnectionRequestHandler::requestTimedOut(OutgoingMessageCallback* out) { _connection->requestTimedOut(out); } -void +void ConnectionRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { _connection->asyncRequestTimedOut(outAsync); diff --git a/cpp/src/Ice/LoggerAdminI.cpp b/cpp/src/Ice/LoggerAdminI.cpp index a673a695f6c..4ce2cd073e2 100644 --- a/cpp/src/Ice/LoggerAdminI.cpp +++ b/cpp/src/Ice/LoggerAdminI.cpp @@ -35,15 +35,15 @@ public: virtual void attachRemoteLogger(const RemoteLoggerPrx&, const LogMessageTypeSeq&, const StringSeq&, Int, const Current&); - + virtual void detachRemoteLogger(const RemoteLoggerPrx&, const Current&); - + virtual LogMessageSeq getLog(const LogMessageTypeSeq&, const StringSeq&, Int, string&, const Current&); - + void destroy(); vector log(const LogMessage&); - + void deadRemoteLogger(const RemoteLoggerPrx&, const LoggerPtr&, const LocalException&, const string&); const int getTraceLevel() const @@ -57,9 +57,9 @@ public: } private: - + bool removeRemoteLogger(const RemoteLoggerPrx&); - + void remoteCallCompleted(const AsyncResultPtr&); IceUtil::Mutex _mutex; @@ -69,19 +69,19 @@ private: int _traceCount; const int _maxTraceCount; const int _traceLevel; - + list::iterator _oldestTrace; list::iterator _oldestLog; struct ObjectIdentityCompare { - bool operator()(const RemoteLoggerPrx& lhs, const RemoteLoggerPrx& rhs) + bool operator()(const RemoteLoggerPrx& lhs, const RemoteLoggerPrx& rhs) const { // // Caller should make sure that proxies are never null // assert(lhs != 0 && rhs != 0); - + return lhs->ice_getIdentity() < rhs->ice_getIdentity(); } }; @@ -93,22 +93,22 @@ private: traceCategories(c.begin(), c.end()) { } - + const set messageTypes; const set traceCategories; }; - typedef map RemoteLoggerMap; + typedef map RemoteLoggerMap; struct GetRemoteLoggerMapKey { - RemoteLoggerMap::key_type + RemoteLoggerMap::key_type operator()(const RemoteLoggerMap::value_type& val) { return val.first; } }; - + RemoteLoggerMap _remoteLoggerMap; const CallbackPtr _remoteCallCompleted; @@ -121,15 +121,15 @@ typedef IceUtil::Handle LoggerAdminIPtr; class Job : public IceUtil::Shared { public: - + Job(const vector& r, const LogMessage& l) : remoteLoggers(r), logMessage(l) { } - + const vector remoteLoggers; - const LogMessage logMessage; + const LogMessage logMessage; }; typedef IceUtil::Handle JobPtr; @@ -150,26 +150,26 @@ public: virtual ObjectPtr getFacet() const; virtual void destroy(); - + const LoggerPtr& getLocalLogger() const { return _localLogger; } void run(); - + private: void log(const LogMessage&); LoggerPtr _localLogger; const LoggerAdminIPtr _loggerAdmin; - + IceUtil::Monitor _monitor; bool _destroyed; IceUtil::ThreadPtr _sendLogThread; - std::deque _jobQueue; + std::deque _jobQueue; }; typedef IceUtil::Handle LoggerAdminLoggerIPtr; @@ -179,9 +179,9 @@ class SendLogThread : public IceUtil::Thread public: SendLogThread(const LoggerAdminLoggerIPtr&); - + virtual void run(); - + private: LoggerAdminLoggerIPtr _logger; @@ -195,7 +195,7 @@ private: // // Filter out messages from in/out logMessages list // -void +void filterLogMessages(LogMessageSeq& logMessages, const set& messageTypes, const set& traceCategories, Int messageMax) { @@ -217,7 +217,7 @@ filterLogMessages(LogMessageSeq& logMessages, const set& message bool keepIt = false; if(messageTypes.empty() || messageTypes.count(p->type) != 0) { - if(p->type != TraceMessage || traceCategories.empty() || + if(p->type != TraceMessage || traceCategories.empty() || traceCategories.count(p->traceCategory) != 0) { keepIt = true; @@ -268,7 +268,7 @@ changeCommunicator(const RemoteLoggerPrx& prx, const CommunicatorPtr& communicat } // -// Copies a set of properties +// Copies a set of properties // void copyProperties(const string& prefix, const PropertiesPtr& from, const PropertiesPtr& to) @@ -297,7 +297,7 @@ createSendLogCommunicator(const CommunicatorPtr& communicator, const LoggerPtr& copyProperties("IceSSL.", mainProps, initData.properties); StringSeq extraProps = mainProps->getPropertyAsList("Ice.Admin.Logger.Properties"); - + if(!extraProps.empty()) { for(vector::iterator p = extraProps.begin(); p != extraProps.end(); ++p) @@ -329,17 +329,17 @@ LoggerAdminI::LoggerAdminI(const PropertiesPtr& props) : } void -LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, - const LogMessageTypeSeq& messageTypes, +LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, + const LogMessageTypeSeq& messageTypes, const StringSeq& categories, - Int messageMax, + Int messageMax, const Current& current) { if(!prx) { return; // can't send this null RemoteLogger anything! } - + LoggerAdminLoggerIPtr logger = LoggerAdminLoggerIPtr::dynamicCast(current.adapter->getCommunicator()->getLogger()); if(!logger) { @@ -358,10 +358,10 @@ LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, if(!_sendLogCommunicator) { - _sendLogCommunicator = + _sendLogCommunicator = createSendLogCommunicator(current.adapter->getCommunicator(), logger->getLocalLogger()); } - + if(!_remoteLoggerMap.insert(make_pair(changeCommunicator(remoteLogger, _sendLogCommunicator), filters)).second) { if(_traceLevel > 0) @@ -378,7 +378,7 @@ LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, initLogMessages = _queue; // copy } } - + if(_traceLevel > 0) { Trace trace(logger, traceCategory); @@ -397,7 +397,7 @@ LoggerAdminI::attachRemoteLogger(const RemoteLoggerPrx& prx, throw; } } - + void LoggerAdminI::detachRemoteLogger(const RemoteLoggerPrx& remoteLogger, const Current& current) { @@ -423,24 +423,24 @@ LoggerAdminI::detachRemoteLogger(const RemoteLoggerPrx& remoteLogger, const Curr } } -LogMessageSeq -LoggerAdminI::getLog(const LogMessageTypeSeq& messageTypes, - const StringSeq& categories, +LogMessageSeq +LoggerAdminI::getLog(const LogMessageTypeSeq& messageTypes, + const StringSeq& categories, Int messageMax, string& prefix, const Current& current) { LogMessageSeq logMessages; { IceUtil::Mutex::Lock lock(_mutex); - + if(messageMax != 0) { logMessages = _queue; } } - + LoggerPtr logger = current.adapter->getCommunicator()->getLogger(); prefix = logger->getPrefix(); - + Filters filters(messageTypes, categories); filterLogMessages(logMessages, filters.messageTypes, filters.traceCategories, messageMax); return logMessages; @@ -467,11 +467,11 @@ LoggerAdminI::log(const LogMessage& logMessage) // // Put message in _queue // - if((logMessage.type != TraceMessage && _maxLogCount > 0) || - (logMessage.type == TraceMessage && _maxTraceCount > 0)) + if((logMessage.type != TraceMessage && _maxLogCount > 0) || + (logMessage.type == TraceMessage && _maxTraceCount > 0)) { list::iterator p = _queue.insert(_queue.end(), logMessage); - + if(logMessage.type != TraceMessage) { assert(_maxLogCount > 0); @@ -524,18 +524,18 @@ LoggerAdminI::log(const LogMessage& logMessage) } } } - + // // Queue updated, now find which remote loggers want this message - // + // for(RemoteLoggerMap::const_iterator p = _remoteLoggerMap.begin(); p != _remoteLoggerMap.end(); ++p) { const Filters& filters = p->second; - + if(filters.messageTypes.empty() || filters.messageTypes.count(logMessage.type) != 0) { if(logMessage.type != TraceMessage || filters.traceCategories.empty() || - filters.traceCategories.count(logMessage.traceCategory) != 0) + filters.traceCategories.count(logMessage.traceCategory) != 0) { remoteLoggers.push_back(p->first); } @@ -574,7 +574,7 @@ LoggerAdminI::removeRemoteLogger(const RemoteLoggerPrx& remoteLogger) void LoggerAdminI::remoteCallCompleted(const AsyncResultPtr& r) { - try + try { r->throwLocalException(); @@ -624,7 +624,7 @@ void LoggerAdminLoggerI::print(const string& message) { LogMessage logMessage = { PrintMessage, IceUtil::Time::now().toMicroSeconds(), "", message }; - + _localLogger->print(message); log(logMessage); } @@ -633,7 +633,7 @@ void LoggerAdminLoggerI::trace(const string& category, const string& message) { LogMessage logMessage = { TraceMessage, IceUtil::Time::now().toMicroSeconds(), category, message }; - + _localLogger->trace(category, message); log(logMessage); } @@ -642,7 +642,7 @@ void LoggerAdminLoggerI::warning(const string& message) { LogMessage logMessage = { WarningMessage, IceUtil::Time::now().toMicroSeconds(), "", message }; - + _localLogger->warning(message); log(logMessage); } @@ -651,7 +651,7 @@ void LoggerAdminLoggerI::error(const string& message) { LogMessage logMessage = { ErrorMessage, IceUtil::Time::now().toMicroSeconds(), "", message }; - + _localLogger->error(message); log(logMessage); } @@ -674,15 +674,15 @@ LoggerAdminLoggerI::getFacet() const return _loggerAdmin; } -void +void LoggerAdminLoggerI::log(const LogMessage& logMessage) { - const vector remoteLoggers = _loggerAdmin->log(logMessage); - + const vector remoteLoggers = _loggerAdmin->log(logMessage); + if(!remoteLoggers.empty()) { IceUtil::Monitor::Lock lock(_monitor); - + if(!_sendLogThread) { _sendLogThread = new SendLogThread(this); @@ -701,7 +701,7 @@ LoggerAdminLoggerI::destroy() bool joinThread = false; { IceUtil::Monitor::Lock lock(_monitor); - + if(_sendLogThread) { joinThread = true; @@ -717,7 +717,7 @@ LoggerAdminLoggerI::destroy() sendLogThreadControl.join(); } - // destroy sendLogCommunicator + // destroy sendLogCommunicator _loggerAdmin->destroy(); } @@ -746,7 +746,7 @@ LoggerAdminLoggerI::run() JobPtr job = _jobQueue.front(); _jobQueue.pop_front(); lock.release(); - + for(vector::const_iterator p = job->remoteLoggers.begin(); p != job->remoteLoggers.end(); ++p) { if(_loggerAdmin->getTraceLevel() > 1) @@ -754,7 +754,7 @@ LoggerAdminLoggerI::run() Trace trace(_localLogger, traceCategory); trace << "sending log message to `" << *p << "'"; } - + try { // @@ -766,7 +766,7 @@ LoggerAdminLoggerI::run() { _loggerAdmin->deadRemoteLogger(*p, _localLogger, ex, "log"); } - } + } } if(_loggerAdmin->getTraceLevel() > 1) @@ -786,8 +786,8 @@ SendLogThread::SendLogThread(const LoggerAdminLoggerIPtr& logger) : _logger(logger) { } - -void + +void SendLogThread::run() { _logger->run(); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 509ba18ad06..cd3442113f8 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -38,6 +38,7 @@ IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } +IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; } const unsigned char Ice::AsyncResult::OK = 0x1; const unsigned char Ice::AsyncResult::Done = 0x2; @@ -414,7 +415,7 @@ IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const Thr { public: - InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : + InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : DispatchWorkItem(connection), _outAsync(outAsync) { } @@ -901,7 +902,7 @@ IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) { int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); _observer.retried(); // Invocation is being retried. - + // // Schedule the retry. Note that we always schedule the retry // on the retry queue even if the invocation can be retried @@ -928,7 +929,7 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu { } -AsyncStatus +AsyncStatus IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) { _cachedConnection = connection; @@ -1094,7 +1095,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons // Assume all connections are flushed synchronously. // _sentSynchronously = true; - + // // Attach observer // @@ -1109,7 +1110,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt public: BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync, - const InstancePtr& instance, + const InstancePtr& instance, InvocationObserver& observer) : BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), _outAsync(outAsync), _observer(observer) @@ -1141,7 +1142,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt } private: - + const CommunicatorBatchOutgoingAsyncPtr _outAsync; InvocationObserver& _observer; }; @@ -1181,7 +1182,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) if(--_useCount > 0) { return; - } + } _state |= Done | OK | Sent; _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation _monitor.notifyAll(); @@ -1207,6 +1208,96 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) } } +IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& proxy, + const std::string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + OutgoingAsync(proxy, operation, delegate, cookie) +{ + _observer.attach(proxy.get(), operation, 0); +} + +void +IceInternal::GetConnectionOutgoingAsync::__invoke() +{ + while(true) + { + try + { + _handler = _proxy->__getRequestHandler(); + _handler->sendAsyncRequest(this); + } + catch(const RetryException&) + { + _proxy->__setRequestHandler(_handler, 0); + } + catch(const Ice::Exception& ex) + { + handleException(ex); + } + break; + } +} + +AsyncStatus +IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool) +{ + __sent(); + return AsyncStatusSent; +} + +AsyncStatus +IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*) +{ + __sent(); + return AsyncStatusSent; +} + +bool +IceInternal::GetConnectionOutgoingAsync::__sent() +{ + { + IceUtil::Monitor::Lock sync(_monitor); + _state |= Done; + _monitor.notifyAll(); + } + __invokeCompleted(); + return false; +} + +void +IceInternal::GetConnectionOutgoingAsync::__invokeSent() +{ + // No sent callback +} + +void +IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc) +{ + try + { + handleException(exc); + } + catch(const Ice::Exception& ex) + { + __invokeException(ex); + } +} + +void +IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc) +{ + try + { + _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt)); + _observer.retried(); // Invocation is being retried. + } + catch(const Ice::Exception& ex) + { + _observer.failed(ex.ice_name()); + throw; + } +} namespace { @@ -1227,13 +1318,13 @@ public: { } - virtual void + virtual void completed(const Ice::AsyncResultPtr&) const { assert(false); } - virtual CallbackBasePtr + virtual CallbackBasePtr verify(const Ice::LocalObjectPtr&) { // @@ -1245,13 +1336,13 @@ public: return 0; } - virtual void + virtual void sent(const AsyncResultPtr&) const { assert(false); } - virtual bool + virtual bool hasSentCallback() const { assert(false); @@ -1281,25 +1372,25 @@ Ice::newCallback(const ::IceInternal::Function& co Cpp11CB(const ::std::function& completed, const ::std::function& sent) : - _completed(completed), + _completed(completed), _sent(sent) { checkCallback(true, completed != nullptr); } - - virtual void + + virtual void completed(const AsyncResultPtr& result) const { _completed(result); } - - virtual CallbackBasePtr + + virtual CallbackBasePtr verify(const LocalObjectPtr&) { return this; // Nothing to do, the cookie is not type-safe. } - - virtual void + + virtual void sent(const AsyncResultPtr& result) const { if(_sent != nullptr) @@ -1307,19 +1398,19 @@ Ice::newCallback(const ::IceInternal::Function& co _sent(result); } } - - virtual bool + + virtual bool hasSentCallback() const { return _sent != nullptr; } - + private: ::std::function< void (const AsyncResultPtr&)> _completed; ::std::function< void (const AsyncResultPtr&)> _sent; }; - + return new Cpp11CB(completed, sent); } #endif diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 720099b6927..9878f540035 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -40,6 +40,7 @@ const string ice_ids_name = "ice_ids"; const string ice_id_name = "ice_id"; const string ice_isA_name = "ice_isA"; const string ice_invoke_name = "ice_invoke"; +const string ice_getConnection_name = "ice_getConnection"; const string ice_flushBatchRequests_name = "ice_flushBatchRequests"; } @@ -472,6 +473,49 @@ IceProxy::Ice::Object::__begin_ice_invoke( return begin_ice_invoke(operation, mode, inParams, ctx, new Cpp11CB(response, exception, sent), 0); } +Ice::AsyncResultPtr +IceProxy::Ice::Object::begin_ice_getConnection( + const ::IceInternal::Function& response, + const ::IceInternal::Function& exception) +{ + class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC + { + public: + + Cpp11CB(const ::IceInternal::Function& responseFunc, + const ::std::function& exceptionFunc) : + ::IceInternal::Cpp11FnCallbackNC(exceptionFunc, nullptr), + _response(responseFunc) + { + CallbackBase::checkCallback(true, responseFunc || exceptionFunc != nullptr); + } + + virtual void completed(const ::Ice::AsyncResultPtr& __result) const + { + ::Ice::ObjectPrx __proxy = ::Ice::ObjectPrx::uncheckedCast(__result->getProxy()); + ::Ice::ConnectionPtr __ret; + try + { + __ret = __proxy->end_ice_getConnection(__result); + } + catch(const ::Ice::Exception& ex) + { + Cpp11FnCallbackNC::exception(__result, ex); + return; + } + if(_response != nullptr) + { + _response(__ret); + } + } + + private: + + ::std::function _response; + }; + return begin_ice_getConnectionInternal(new Cpp11CB(response, exception), 0); +} + #endif @@ -1440,6 +1484,31 @@ IceProxy::Ice::Object::ice_getConnection() } } +AsyncResultPtr +IceProxy::Ice::Object::begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr& del, + const ::Ice::LocalObjectPtr& cookie) +{ + ::IceInternal::GetConnectionOutgoingAsyncPtr __result = + new ::IceInternal::GetConnectionOutgoingAsync(this, ice_getConnection_name, del, cookie); + try + { + __result->__invoke(); + } + catch(const Exception& __ex) + { + __result->__invokeExceptionAsync(__ex); + } + return __result; +} + +ConnectionPtr +IceProxy::Ice::Object::end_ice_getConnection(const AsyncResultPtr& __result) +{ + AsyncResult::__check(__result, this, ice_getConnection_name); + __result->__wait(); + return ice_getCachedConnection(); +} + ConnectionPtr IceProxy::Ice::Object::ice_getCachedConnection() const { diff --git a/cpp/src/Ice/RequestHandler.cpp b/cpp/src/Ice/RequestHandler.cpp index e5c7b86565c..2cbf7826213 100644 --- a/cpp/src/Ice/RequestHandler.cpp +++ b/cpp/src/Ice/RequestHandler.cpp @@ -13,8 +13,7 @@ using namespace std; using namespace IceInternal; -IceUtil::Shared* IceInternal::upCast(RequestHandler* obj) { return obj; } - +IceUtil::Shared* IceInternal::upCast(RequestHandler* p) { return p; } RetryException::RetryException(const Ice::LocalException& ex) { @@ -37,7 +36,7 @@ RequestHandler::~RequestHandler() { } -RequestHandler::RequestHandler(const ReferencePtr& reference) : +RequestHandler::RequestHandler(const ReferencePtr& reference) : _reference(reference), _response(reference->getMode() == Reference::ModeTwoway) { diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h index e2d83c63e20..45cf917dd0a 100644 --- a/cpp/src/Ice/RequestHandler.h +++ b/cpp/src/Ice/RequestHandler.h @@ -48,7 +48,6 @@ public: private: - IceUtil::UniquePtr _ex; }; @@ -72,7 +71,7 @@ public: virtual Ice::ConnectionIPtr getConnection() = 0; virtual Ice::ConnectionIPtr waitForConnection() = 0; - + protected: RequestHandler(const ReferencePtr&); -- cgit v1.2.3