diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 190 |
1 files changed, 33 insertions, 157 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index fce13d5e88a..993cb065c23 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -23,14 +23,13 @@ using namespace std; using namespace IceInternal; +IceUtil::Shared* IceInternal::upCast(ConnectRequestHandler* p) { return p; } + ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) : RequestHandler(ref), - _connect(true), _proxy(proxy), _initialized(false), - _flushing(false), - _batchRequestInProgress(false), - _batchStream(ref->getInstance().get(), Ice::currentProtocolEncoding) + _flushing(false) { } @@ -41,39 +40,27 @@ ConnectRequestHandler::~ConnectRequestHandler() RequestHandlerPtr ConnectRequestHandler::connect(const Ice::ObjectPrx& proxy) { - // - // Initiate the connection if connect() is called by the proxy that - // created the handler. - // - if(proxy.get() == _proxy.get() && _connect) - { - _connect = false; // Call getConnection only once - _reference->getConnection(this); - } - + Lock sync(*this); try { - Lock sync(*this); if(!initialized()) { _proxies.insert(proxy); - return this; } } catch(const Ice::LocalException&) { - throw; - } - - if(_connectionRequestHandler) - { - proxy->__setRequestHandler(this, _connectionRequestHandler); - return _connectionRequestHandler; - } - else - { - return this; + // + // Only throw if the connection didn't get established. If + // it died after being established, we allow the caller to + // retry the connection establishment by not throwing here. + // + if(!_connection) + { + throw; + } } + return _requestHandler ? _requestHandler : this; } RequestHandlerPtr @@ -82,70 +69,8 @@ ConnectRequestHandler::update(const RequestHandlerPtr& previousHandler, const Re return previousHandler.get() == this ? newHandler : this; } -void -ConnectRequestHandler::prepareBatchRequest(BasicStream* os) -{ - { - Lock sync(*this); - while(_batchRequestInProgress) - { - wait(); - } - - if(!initialized()) - { - _batchRequestInProgress = true; - _batchStream.swap(*os); - return; - } - } - _connection->prepareBatchRequest(os); -} - -void -ConnectRequestHandler::finishBatchRequest(BasicStream* os) -{ - { - Lock sync(*this); - if(!initialized()) // This can't throw until _batchRequestInProgress = false - { - assert(_batchRequestInProgress); - _batchRequestInProgress = false; - notifyAll(); - - _batchStream.swap(*os); - - Request req; - req.os = new BasicStream(_reference->getInstance().get(), Ice::currentProtocolEncoding); - req.os->swap(_batchStream); - _requests.push_back(req); - return; - } - } - _connection->finishBatchRequest(os, _compress); -} - -void -ConnectRequestHandler::abortBatchRequest() -{ - { - Lock sync(*this); - if(!initialized()) // This can't throw until _batchRequestInProgress = false - { - assert(_batchRequestInProgress); - _batchRequestInProgress = false; - notifyAll(); - - BasicStream dummy(_reference->getInstance().get(), Ice::currentProtocolEncoding); - _batchStream.swap(dummy); - return; - } - } - _connection->abortBatchRequest(); -} - bool -ConnectRequestHandler::sendRequest(OutgoingBase* out) +ConnectRequestHandler::sendRequest(ProxyOutgoingBase* out) { { Lock sync(*this); @@ -164,11 +89,11 @@ ConnectRequestHandler::sendRequest(OutgoingBase* out) throw RetryException(ex); } } - return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response. + return out->invokeRemote(_connection, _compress, _response) && !_response; // Finished if sent and no response. } AsyncStatus -ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) +ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) { { Lock sync(*this); @@ -192,7 +117,7 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) throw RetryException(ex); } } - return out->send(_connection, _compress, _response); + return out->invokeRemote(_connection, _compress, _response); } void @@ -350,18 +275,13 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) { p->out->completed(*_exception.get()); } - else if(p->outAsync) + else { if(p->outAsync->completed(*_exception.get())) { p->outAsync->invokeCompletedAsync(); } } - else - { - assert(p->os); - delete p->os; - } } _requests.clear(); notifyAll(); @@ -413,11 +333,6 @@ ConnectRequestHandler::flushRequests() Lock sync(*this); assert(_connection && !_initialized); - while(_batchRequestInProgress) - { - wait(); - } - // // We set the _flushing flag to true to prevent any additional queuing. Callers // might block for a little while as the queued requests are being sent but this @@ -434,29 +349,9 @@ ConnectRequestHandler::flushRequests() { if(req.out) { - req.out->send(_connection, _compress, _response); - } - else if(req.os) - { - BasicStream os(req.os->instance(), Ice::currentProtocolEncoding); - _connection->prepareBatchRequest(&os); - try - { - const Ice::Byte* bytes; - req.os->i = req.os->b.begin(); - req.os->readBlob(bytes, req.os->b.size()); - os.writeBlob(bytes, req.os->b.size()); - } - catch(const Ice::LocalException&) - { - _connection->abortBatchRequest(); - throw; - } - - _connection->finishBatchRequest(&os, _compress); - delete req.os; + req.out->invokeRemote(_connection, _compress, _response); } - else if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) + else if(req.outAsync->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { req.outAsync->invokeSentAsync(); } @@ -464,24 +359,14 @@ ConnectRequestHandler::flushRequests() catch(const RetryException& ex) { exception.reset(ex.get()->ice_clone()); - try - { - // Remove the request handler before retrying. - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // Ignore - } + + // Remove the request handler before retrying. + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); if(req.out) { req.out->retryException(*ex.get()); } - else if(req.os) - { - delete req.os; - } else { req.outAsync->retryException(*ex.get()); @@ -494,10 +379,6 @@ ConnectRequestHandler::flushRequests() { req.out->completed(ex); } - else if(req.os) - { - delete req.os; - } else if(req.outAsync->completed(ex)) { req.outAsync->invokeCompletedAsync(); @@ -514,10 +395,10 @@ ConnectRequestHandler::flushRequests() // if(_reference->getCacheConnection() && !exception.get()) { - _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + _requestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) { - (*p)->__setRequestHandler(this, _connectionRequestHandler); + (*p)->__updateRequestHandler(this, _requestHandler); } } @@ -527,18 +408,13 @@ ConnectRequestHandler::flushRequests() _exception.swap(exception); _initialized = !_exception.get(); _flushing = false; - try - { - // - // Only remove once all the requests are flushed to - // guarantee serialization. - // - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // Ignore - } + + // + // Only remove once all the requests are flushed to + // guarantee serialization. + // + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + _proxies.clear(); _proxy = 0; // Break cyclic reference count. notifyAll(); |