diff options
author | Marc Laukien <marc@zeroc.com> | 2004-10-26 21:13:30 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-10-26 21:13:30 +0000 |
commit | e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28 (patch) | |
tree | 5629f6032f1a9471e8585ab9d73eb5cd4a2b95dd /cpp | |
parent | Work-around for Sun CC (diff) | |
download | ice-e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28.tar.bz2 ice-e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28.tar.xz ice-e1bb249bcb629ce94f69bff9e0ae72b40aeb4d28.zip |
more thread-per-connection
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/config/TestUtil.py | 4 | ||||
-rw-r--r-- | cpp/src/Ice/Acceptor.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 97 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 3 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 177 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/TcpAcceptor.cpp | 9 | ||||
-rw-r--r-- | cpp/src/Ice/TcpAcceptor.h | 1 | ||||
-rw-r--r-- | cpp/src/IceSSL/SslAcceptor.cpp | 9 | ||||
-rw-r--r-- | cpp/src/IceSSL/SslAcceptor.h | 1 | ||||
-rw-r--r-- | cpp/test/Ice/operations/Client.cpp | 5 |
11 files changed, 220 insertions, 89 deletions
diff --git a/cpp/config/TestUtil.py b/cpp/config/TestUtil.py index e70e9888832..52b6c00d380 100644 --- a/cpp/config/TestUtil.py +++ b/cpp/config/TestUtil.py @@ -29,8 +29,8 @@ compress = 1 # thread per connection mode. # -threadPerConnection = 0 -#threadPerConnection = 1 +#threadPerConnection = 0 +threadPerConnection = 1 # # If you don't set "host" below, then the Ice library will try to find diff --git a/cpp/src/Ice/Acceptor.h b/cpp/src/Ice/Acceptor.h index 05342bb5154..9403208706d 100644 --- a/cpp/src/Ice/Acceptor.h +++ b/cpp/src/Ice/Acceptor.h @@ -31,6 +31,7 @@ public: virtual void close() = 0; virtual void listen() = 0; virtual TransceiverPtr accept(int) = 0; + virtual void connectToSelf() = 0; virtual std::string toString() const = 0; }; diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 103296f1fe3..98cd946cf83 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -287,16 +287,17 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpts connection = new ConnectionI(_instance, transceiver, endpoint, 0); // - // In thread per acceptor mode, the thread per connection (in - // ConnectionI) will take care of connection validation. + // In thread per connection mode, the thread per + // connection (in ConnectionI) will take care of + // connection validation. // - if(!_instance->threadPerConnection()) + if(_instance->threadPerConnection()) { - connection->validate(); + connection->waitUntilValidated(); } else { - connection->waitUntilValidated(); + connection->validate(); } if(_instance->defaultsAndOverrides()->overrideCompress) @@ -542,6 +543,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const void IceInternal::IncomingConnectionFactory::waitUntilFinished() { + IceUtil::ThreadPtr threadPerAcceptor; list<ConnectionIPtr> connections; { @@ -555,6 +557,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() wait(); } + assert(_state == StateClosed); + + threadPerAcceptor = _threadPerAcceptor; + _threadPerAcceptor = 0; + // // We want to wait until all connections are finished outside the // thread synchronization. @@ -562,6 +569,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() connections.swap(_connections); } + if(threadPerAcceptor) + { + threadPerAcceptor->getThreadControl().join(); + } + for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); } @@ -722,22 +734,24 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt } return; } - + + assert(transceiver); + // // Create a connection object for the connection. // - assert(transceiver); connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); _connections.push_back(connection); } - + + assert(connection); + // // We validate outside the thread synchronization, to not block // the factory. // try { - assert(connection); connection->validate(); } catch(const LocalException&) @@ -819,18 +833,18 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance if(_transceiver) { ConnectionIPtr connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter); - + // - // In thread per acceptor mode, the thread per connection (in - // ConnectionI) will take care of connection validation. + // In thread per connection mode, the thread per connection + // (in ConnectionI) will take care of connection validation. // - if(!_instance->threadPerConnection()) + if(_instance->threadPerConnection()) { - connection->validate(); + connection->waitUntilValidated(); } else { - connection->waitUntilValidated(); + connection->validate(); } _connections.push_back(connection); @@ -858,18 +872,20 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance // thread per acceptor, that accepts new connections on this // endpoint. // - IceUtil::ThreadPtr thread = new ThreadPerAcceptor(this); - thread->start(); - thread->getThreadControl().detach(); + _threadPerAcceptor = new ThreadPerAcceptor(this); + _threadPerAcceptor->start(); } } } IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory() { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state == StateClosed); assert(!_acceptor); assert(_connections.empty()); + assert(!_threadPerAcceptor); } void @@ -914,10 +930,11 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_instance->threadPerConnection()) { - // TODO: This is not correct! We cannot close the - // acceptor before our thread per acceptor terminates. - _acceptor->close(); - _acceptor = 0; + // + // Connect to our own acceptor, which unblocks our + // thread per acceptor stuch in accept(). + // + _acceptor->connectToSelf(); } else { @@ -994,12 +1011,10 @@ IceInternal::IncomingConnectionFactory::run() catch(const SocketException&) { // Ignore socket exceptions. - continue; } catch(const TimeoutException&) { // Ignore timeouts. - continue; } catch(const LocalException& ex) { @@ -1009,11 +1024,10 @@ IceInternal::IncomingConnectionFactory::run() Warning out(_instance->logger()); out << "connection exception:\n" << ex << '\n' << acceptor->toString(); } - continue; } ConnectionIPtr connection; - + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1024,9 +1038,32 @@ IceInternal::IncomingConnectionFactory::run() if(_state == StateClosed) { - assert(transceiver); - transceiver->close(); - break; + if(transceiver) + { + try + { + transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + } + + try + { + _acceptor->close(); + } + catch(const LocalException& ex) + { + _acceptor = 0; + notifyAll(); + ex.ice_throw(); + } + + _acceptor = 0; + notifyAll(); + return; } assert(_state == StateActive); @@ -1037,7 +1074,7 @@ IceInternal::IncomingConnectionFactory::run() _connections.erase(remove_if(_connections.begin(), _connections.end(), Ice::constMemFun(&ConnectionI::isFinished)), _connections.end()); - + // // Create a connection object for the connection. // diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index dc34fab9e80..ff6a4813c17 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -119,7 +119,8 @@ private: IncomingConnectionFactoryPtr _factory; }; friend class ThreadPerAcceptor; - + IceUtil::ThreadPtr _threadPerAcceptor; + AcceptorPtr _acceptor; const TransceiverPtr _transceiver; const EndpointPtr _endpoint; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index aaa99590ff8..e6782af3802 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -199,8 +199,28 @@ Ice::ConnectionI::isDestroyed() const bool Ice::ConnectionI::isFinished() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _transceiver == 0 && _dispatchCount == 0; + IceUtil::ThreadPtr threadPerConnection; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_transceiver != 0 || _dispatchCount != 0) + { + return false; + } + + assert(_state == StateClosed); + + threadPerConnection = _threadPerConnection; + _threadPerConnection = 0; + } + + if(threadPerConnection) + { + threadPerConnection->getThreadControl().join(); + } + + return true; } void @@ -228,70 +248,82 @@ Ice::ConnectionI::waitUntilHolding() const void Ice::ConnectionI::waitUntilFinished() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee that - // there are no outstanding calls when deactivate() is called on - // the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) + IceUtil::ThreadPtr threadPerConnection; + { - wait(); - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver) - { - if(_state != StateClosed && _endpoint->timeout() >= 0) + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) { - IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); - IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); - - if(waitTime > IceUtil::Time()) + wait(); + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver) + { + if(_state != StateClosed && _endpoint->timeout() >= 0) { - // - // We must wait a bit longer until we close this - // connection. - // - if(!timedWait(waitTime)) + IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); + IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); + + if(waitTime > IceUtil::Time()) + { + // + // We must wait a bit longer until we close this + // connection. + // + if(!timedWait(waitTime)) + { + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + } + } + else { + // + // We already waited long enough, so let's close this + // connection! + // setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); } + + // + // No return here, we must still wait until close() is + // called on the _transceiver. + // } else { - // - // We already waited long enough, so let's close this - // connection! - // - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + wait(); } - - // - // No return here, we must still wait until close() is - // called on the _transceiver. - // - } - else - { - wait(); } - } - assert(_state == StateClosed); + assert(_state == StateClosed); + + threadPerConnection = _threadPerConnection; + _threadPerConnection = 0; + } + + if(threadPerConnection) + { + threadPerConnection->getThreadControl().join(); + } } void Ice::ConnectionI::monitor() { IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); - + if(!sync.acquired()) { return; @@ -1311,9 +1343,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, // If we are in thread per connection mode, create the thread // for this connection. // - IceUtil::ThreadPtr thread = new ThreadPerConnection(this); - thread->start(); - thread->getThreadControl().detach(); + _threadPerConnection = new ThreadPerConnection(this); + _threadPerConnection->start(); } vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr); @@ -1355,9 +1386,12 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, Ice::ConnectionI::~ConnectionI() { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_state == StateClosed); assert(!_transceiver); assert(_dispatchCount == 0); + assert(!_threadPerConnection); } void @@ -2070,7 +2104,9 @@ Ice::ConnectionI::run() const bool warnUdp = _instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0; - while(true) + bool closed = false; + + while(!closed) { // // We must accept new connections outside the thread @@ -2176,6 +2212,11 @@ Ice::ConnectionI::run() ObjectAdapterPtr adapter; OutgoingAsyncPtr outAsync; + auto_ptr<LocalException> exception; + + map<Int, Outgoing*> requests; + map<Int, AsyncRequest> asyncRequests; + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -2207,17 +2248,25 @@ Ice::ConnectionI::run() } catch(const LocalException& ex) { - _transceiver = 0; - notifyAll(); - ex.ice_throw(); + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); } _transceiver = 0; notifyAll(); - return; + + closed = true; + } + + if(_state == StateClosed || _state == StateClosing) + { + requests.swap(_requests); + _requestsHint = _requests.end(); + + asyncRequests.swap(_asyncRequests); + _asyncRequestsHint = _asyncRequests.end(); } } - + // // Asynchronous replies must be handled outside the thread // synchronization, so that nested calls are possible. @@ -2233,6 +2282,22 @@ Ice::ConnectionI::run() // calls are possible. // invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); + + for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + + for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) + { + q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. + } + + if(exception.get()) + { + assert(closed); + exception->ice_throw(); + } } } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 9e6b462b135..725daf6f9bc 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -147,6 +147,8 @@ private: ConnectionIPtr _connection; }; friend class ThreadPerConnection; + // Defined as mutable because "isFinished() const" sets this to 0. + mutable IceUtil::ThreadPtr _threadPerConnection; IceInternal::TransceiverPtr _transceiver; const std::string _desc; diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp index 8cc3feae491..3493fc3998d 100644 --- a/cpp/src/Ice/TcpAcceptor.cpp +++ b/cpp/src/Ice/TcpAcceptor.cpp @@ -74,6 +74,15 @@ IceInternal::TcpAcceptor::accept(int timeout) return new TcpTransceiver(_instance, fd); } +void +IceInternal::TcpAcceptor::connectToSelf() +{ + SOCKET fd = createSocket(false); + setBlock(fd, false); + doConnect(fd, _addr, -1); + closeSocket(fd); +} + string IceInternal::TcpAcceptor::toString() const { diff --git a/cpp/src/Ice/TcpAcceptor.h b/cpp/src/Ice/TcpAcceptor.h index 8526e112b0b..a128aba529a 100644 --- a/cpp/src/Ice/TcpAcceptor.h +++ b/cpp/src/Ice/TcpAcceptor.h @@ -33,6 +33,7 @@ public: virtual void close(); virtual void listen(); virtual TransceiverPtr accept(int); + virtual void connectToSelf(); virtual std::string toString() const; bool equivalent(const std::string&, int) const; diff --git a/cpp/src/IceSSL/SslAcceptor.cpp b/cpp/src/IceSSL/SslAcceptor.cpp index da0fbae72bf..a72e2f6e00f 100644 --- a/cpp/src/IceSSL/SslAcceptor.cpp +++ b/cpp/src/IceSSL/SslAcceptor.cpp @@ -74,6 +74,15 @@ IceSSL::SslAcceptor::accept(int timeout) return _plugin->createTransceiver(IceSSL::Server, fd, timeout); } +void +IceSSL::SslAcceptor::connectToSelf() +{ + SOCKET fd = createSocket(false); + setBlock(fd, false); + doConnect(fd, _addr, -1); + closeSocket(fd); +} + string IceSSL::SslAcceptor::toString() const { diff --git a/cpp/src/IceSSL/SslAcceptor.h b/cpp/src/IceSSL/SslAcceptor.h index 9cc3e08cc18..47f7f52fd5c 100644 --- a/cpp/src/IceSSL/SslAcceptor.h +++ b/cpp/src/IceSSL/SslAcceptor.h @@ -33,6 +33,7 @@ public: virtual void close(); virtual void listen(); virtual IceInternal::TransceiverPtr accept(int); + virtual void connectToSelf(); virtual std::string toString() const; bool equivalent(const std::string&, int) const; diff --git a/cpp/test/Ice/operations/Client.cpp b/cpp/test/Ice/operations/Client.cpp index d43deacf936..c1d4e1d58e7 100644 --- a/cpp/test/Ice/operations/Client.cpp +++ b/cpp/test/Ice/operations/Client.cpp @@ -20,14 +20,19 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator) Test::MyClassPrx myClass = allTests(communicator, false); cout << "testing server shutdown... " << flush; + cout << "--- 1 ---" << endl; myClass->shutdown(); + cout << "--- 2 ---" << endl; try { + cout << "--- 3 ---" << endl; myClass->opVoid(); + cout << "--- 3.5 ---" << endl; test(false); } catch(const Ice::LocalException&) { + cout << "--- 4 ---" << endl; cout << "ok" << endl; } |