diff options
author | Marc Laukien <marc@zeroc.com> | 2004-10-26 21:13:30 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-10-26 21:13:30 +0000 |
commit | e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28 (patch) | |
tree | 5629f6032f1a9471e8585ab9d73eb5cd4a2b95dd /cpp/src/Ice/ConnectionI.cpp | |
parent | Work-around for Sun CC (diff) | |
download | ice-e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28.tar.bz2 ice-e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28.tar.xz ice-e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28.zip |
more thread-per-connection
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 177 |
1 files changed, 121 insertions, 56 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index aaa99590ff8..e6782af3802 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -199,8 +199,28 @@ Ice::ConnectionI::isDestroyed() const bool Ice::ConnectionI::isFinished() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _transceiver == 0 && _dispatchCount == 0; + IceUtil::ThreadPtr threadPerConnection; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_transceiver != 0 || _dispatchCount != 0) + { + return false; + } + + assert(_state == StateClosed); + + threadPerConnection = _threadPerConnection; + _threadPerConnection = 0; + } + + if(threadPerConnection) + { + threadPerConnection->getThreadControl().join(); + } + + return true; } void @@ -228,70 +248,82 @@ Ice::ConnectionI::waitUntilHolding() const void Ice::ConnectionI::waitUntilFinished() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee that - // there are no outstanding calls when deactivate() is called on - // the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) + IceUtil::ThreadPtr threadPerConnection; + { - wait(); - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver) - { - if(_state != StateClosed && _endpoint->timeout() >= 0) + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) { - IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); - IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); - - if(waitTime > IceUtil::Time()) + wait(); + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver) + { + if(_state != StateClosed && _endpoint->timeout() >= 0) { - // - // We must wait a bit longer until we close this - // connection. - // - if(!timedWait(waitTime)) + IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); + IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); + + if(waitTime > IceUtil::Time()) + { + // + // We must wait a bit longer until we close this + // connection. + // + if(!timedWait(waitTime)) + { + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + } + } + else { + // + // We already waited long enough, so let's close this + // connection! + // setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); } + + // + // No return here, we must still wait until close() is + // called on the _transceiver. + // } else { - // - // We already waited long enough, so let's close this - // connection! - // - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + wait(); } - - // - // No return here, we must still wait until close() is - // called on the _transceiver. - // - } - else - { - wait(); } - } - assert(_state == StateClosed); + assert(_state == StateClosed); + + threadPerConnection = _threadPerConnection; + _threadPerConnection = 0; + } + + if(threadPerConnection) + { + threadPerConnection->getThreadControl().join(); + } } void Ice::ConnectionI::monitor() { IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); - + if(!sync.acquired()) { return; @@ -1311,9 +1343,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, // If we are in thread per connection mode, create the thread // for this connection. // - IceUtil::ThreadPtr thread = new ThreadPerConnection(this); - thread->start(); - thread->getThreadControl().detach(); + _threadPerConnection = new ThreadPerConnection(this); + _threadPerConnection->start(); } vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr); @@ -1355,9 +1386,12 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, Ice::ConnectionI::~ConnectionI() { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state == StateClosed); assert(!_transceiver); assert(_dispatchCount == 0); + assert(!_threadPerConnection); } void @@ -2070,7 +2104,9 @@ Ice::ConnectionI::run() const bool warnUdp = _instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0; - while(true) + bool closed = false; + + while(!closed) { // // We must accept new connections outside the thread @@ -2176,6 +2212,11 @@ Ice::ConnectionI::run() ObjectAdapterPtr adapter; OutgoingAsyncPtr outAsync; + auto_ptr<LocalException> exception; + + map<Int, Outgoing*> requests; + map<Int, AsyncRequest> asyncRequests; + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -2207,17 +2248,25 @@ Ice::ConnectionI::run() } catch(const LocalException& ex) { - _transceiver = 0; - notifyAll(); - ex.ice_throw(); + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); } _transceiver = 0; notifyAll(); - return; + + closed = true; + } + + if(_state == StateClosed || _state == StateClosing) + { + requests.swap(_requests); + _requestsHint = _requests.end(); + + asyncRequests.swap(_asyncRequests); + _asyncRequestsHint = _asyncRequests.end(); } } - + // // Asynchronous replies must be handled outside the thread // synchronization, so that nested calls are possible. @@ -2233,6 +2282,22 @@ Ice::ConnectionI::run() // calls are possible. // invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); + + for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + + for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) + { + q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. + } + + if(exception.get()) + { + assert(closed); + exception->ice_throw(); + } } } |