summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/Connection.cpp24
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp58
-rw-r--r--cpp/src/Ice/Network.cpp8
-rw-r--r--cpp/src/Ice/SslAcceptor.cpp2
-rw-r--r--cpp/src/Ice/SslConnector.cpp1
-rw-r--r--cpp/src/Ice/TcpAcceptor.cpp2
-rw-r--r--cpp/src/Ice/TcpConnector.cpp1
-rw-r--r--cpp/src/Ice/ThreadPool.cpp243
-rw-r--r--cpp/src/Ice/ThreadPool.h12
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp2
10 files changed, 235 insertions, 118 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 01ecb5fe6e5..81e43236ce0 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -251,6 +251,17 @@ void
IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
IceUtil::RecMutex::Lock sync(*this);
+
+ if (adapter && !_adapter)
+ {
+ _threadPool->clientIsNowServer();
+ }
+
+ if (!adapter && _adapter)
+ {
+ _threadPool->serverIsNowClient();
+ }
+
_adapter = adapter;
}
@@ -538,11 +549,14 @@ IceInternal::Connection::finished()
{
IceUtil::RecMutex::Lock sync(*this);
- _threadPool->promoteFollower();
+ assert(_state == StateClosed || _state == StateHolding);
- assert(_state == StateClosed);
+ _threadPool->promoteFollower();
- _transceiver->close();
+ if (_state == StateClosed)
+ {
+ _transceiver->close();
+ }
}
void
@@ -699,7 +713,7 @@ IceInternal::Connection::setState(State state)
{
return;
}
- _threadPool->unregister(_transceiver->fd(), false);
+ _threadPool->unregister(_transceiver->fd());
break;
}
@@ -730,7 +744,7 @@ IceInternal::Connection::setState(State state)
//
_threadPool->_register(_transceiver->fd(), this);
}
- _threadPool->unregister(_transceiver->fd(), true);
+ _threadPool->unregister(_transceiver->fd());
break;
}
}
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 378ce96d84a..9495c50983f 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -358,41 +358,45 @@ IceInternal::IncomingConnectionFactory::finished()
{
IceUtil::Mutex::Lock sync(*this);
+ assert(_state == StateClosed || _state == StateHolding);
+
_threadPool->promoteFollower();
- assert(_state == StateClosed);
- assert(_connections.empty());
-
- try
+ if (_state == StateClosed)
{
- //
- // Clear listen() backlog properly by accepting all queued
- // connections, and then shutting them down.
- //
- while (true)
+ assert(_connections.empty());
+
+ try
{
- try
- {
- TransceiverPtr transceiver = _acceptor->accept(0);
- ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter);
- connection->exception(ObjectAdapterDeactivatedException(__FILE__, __LINE__));
- }
- catch (const TimeoutException&)
+ //
+ // Clear listen() backlog properly by accepting all queued
+ // connections, and then shutting them down.
+ //
+ while (true)
{
- break; // Exit loop on timeout.
+ try
+ {
+ TransceiverPtr transceiver = _acceptor->accept(0);
+ ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter);
+ connection->exception(ObjectAdapterDeactivatedException(__FILE__, __LINE__));
+ }
+ catch (const TimeoutException&)
+ {
+ break; // Exit loop on timeout.
+ }
}
}
- }
- catch (const LocalException& ex)
- {
- if (_warn)
+ catch (const LocalException& ex)
{
- Warning out(_instance->logger());
- out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ if (_warn)
+ {
+ Warning out(_instance->logger());
+ out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
+ }
}
+
+ _acceptor->close();
}
-
- _acceptor->close();
}
void
@@ -493,7 +497,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
if (_threadPool)
{
- _threadPool->unregister(_acceptor->fd(), false);
+ _threadPool->unregister(_acceptor->fd());
}
for_each(_connections.begin(), _connections.end(), ::Ice::voidMemFun(&Connection::hold));
@@ -512,7 +516,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
_threadPool->_register(_acceptor->fd(), this);
}
- _threadPool->unregister(_acceptor->fd(), true);
+ _threadPool->unregister(_acceptor->fd());
}
#ifdef _STLP_BEGIN_NAMESPACE
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp
index 6b20964273c..326c599b89b 100644
--- a/cpp/src/Ice/Network.cpp
+++ b/cpp/src/Ice/Network.cpp
@@ -232,8 +232,6 @@ IceInternal::createSocket(bool udp)
throw ex;
}
- setBlock(fd, false);
-
if (!udp)
{
setTcpNoDelay(fd);
@@ -674,6 +672,7 @@ IceInternal::createPipe(SOCKET fds[2])
#ifdef _WIN32
SOCKET fd = createSocket(false);
+ setBlock(fd, true);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
@@ -687,6 +686,7 @@ IceInternal::createPipe(SOCKET fds[2])
try
{
fds[0] = createSocket(false);
+ setBlock(fds[0], true);
}
catch(...)
{
@@ -698,6 +698,7 @@ IceInternal::createPipe(SOCKET fds[2])
{
doConnect(fds[0], addr, -1);
fds[1] = doAccept(fd, -1);
+ setBlock(fds[1], true);
}
catch(...)
{
@@ -717,6 +718,9 @@ IceInternal::createPipe(SOCKET fds[2])
throw ex;
}
+ setBlock(fds[0], true);
+ setBlock(fds[1], true);
+
#endif
}
diff --git a/cpp/src/Ice/SslAcceptor.cpp b/cpp/src/Ice/SslAcceptor.cpp
index 15c28498d75..f421a7d2fe5 100644
--- a/cpp/src/Ice/SslAcceptor.cpp
+++ b/cpp/src/Ice/SslAcceptor.cpp
@@ -84,6 +84,7 @@ TransceiverPtr
IceInternal::SslAcceptor::accept(int timeout)
{
SOCKET fd = doAccept(_fd, timeout);
+ setBlock(fd, false);
if (_traceLevels->network >= 1)
{
@@ -136,6 +137,7 @@ IceInternal::SslAcceptor::SslAcceptor(const InstancePtr& instance, const string&
try
{
_fd = createSocket(false);
+ setBlock(_fd, false);
getAddress(host, port, _addr);
doBind(_fd, _addr);
}
diff --git a/cpp/src/Ice/SslConnector.cpp b/cpp/src/Ice/SslConnector.cpp
index fa14f529dbc..09d51af325c 100644
--- a/cpp/src/Ice/SslConnector.cpp
+++ b/cpp/src/Ice/SslConnector.cpp
@@ -49,6 +49,7 @@ IceInternal::SslConnector::connect(int timeout)
}
SOCKET fd = createSocket(false);
+ setBlock(fd, false);
doConnect(fd, _addr, timeout);
if (_traceLevels->network >= 1)
diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp
index 84691e16e2e..4217668f956 100644
--- a/cpp/src/Ice/TcpAcceptor.cpp
+++ b/cpp/src/Ice/TcpAcceptor.cpp
@@ -64,6 +64,7 @@ TransceiverPtr
IceInternal::TcpAcceptor::accept(int timeout)
{
SOCKET fd = doAccept(_fd, timeout);
+ setBlock(fd, false);
if (_traceLevels->network >= 1)
{
@@ -108,6 +109,7 @@ IceInternal::TcpAcceptor::TcpAcceptor(const InstancePtr& instance, const string&
try
{
_fd = createSocket(false);
+ setBlock(_fd, false);
getAddress(host, port, _addr);
doBind(_fd, _addr);
}
diff --git a/cpp/src/Ice/TcpConnector.cpp b/cpp/src/Ice/TcpConnector.cpp
index c00d6789667..236198804a7 100644
--- a/cpp/src/Ice/TcpConnector.cpp
+++ b/cpp/src/Ice/TcpConnector.cpp
@@ -30,6 +30,7 @@ IceInternal::TcpConnector::connect(int timeout)
}
SOCKET fd = createSocket(false);
+ setBlock(fd, false);
doConnect(fd, _addr, timeout);
if (_traceLevels->network >= 1)
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 6b7827663aa..b149265b717 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -34,16 +34,42 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
++_servers;
}
- _adds.push_back(make_pair(fd, handler));
- setInterrupt();
+ else
+ {
+ ++_clients;
+ }
+ _changes.push_back(make_pair(fd, handler));
+ setInterrupt(0);
+}
+
+void
+IceInternal::ThreadPool::unregister(SOCKET fd)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _changes.push_back(make_pair(fd, EventHandlerPtr(0)));
+ setInterrupt(0);
+}
+
+void
+IceInternal::ThreadPool::serverIsNowClient()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ ++_clients;
+ assert(_servers > 0);
+ --_servers;
+ if (_servers == 0)
+ {
+ notifyAll(); // For waitUntil...Finished() methods.
+ }
}
void
-IceInternal::ThreadPool::unregister(SOCKET fd, bool callFinished)
+IceInternal::ThreadPool::clientIsNowServer()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _removes.push_back(make_pair(fd, callFinished));
- setInterrupt();
+ ++_servers;
+ assert(_clients > 0);
+ --_clients;
}
void
@@ -55,12 +81,7 @@ IceInternal::ThreadPool::promoteFollower()
void
IceInternal::ThreadPool::initiateServerShutdown()
{
- char c = 1;
-#ifdef _WIN32
- ::send(_fdIntrWrite, &c, 1, 0);
-#else
- ::write(_fdIntrWrite, &c, 1);
-#endif
+ setInterrupt(1);
}
void
@@ -86,17 +107,21 @@ IceInternal::ThreadPool::waitUntilFinished()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while (!_handlerMap.empty() && _threadNum != 0)
+ while (_clients + _servers != 0 && _threadNum != 0)
{
wait();
}
- if (!_handlerMap.empty())
+ if (_clients + _servers != 0)
{
Error out(_logger);
out << "can't wait for graceful application termination in thread pool\n"
<< "since all threads have vanished";
}
+ else
+ {
+ assert(_handlerMap.empty());
+ }
}
void
@@ -141,6 +166,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
_properties(_instance->properties()),
_destroyed(false),
_lastFd(INVALID_SOCKET),
+ _clients(0),
_servers(0),
_timeout(0)
{
@@ -197,37 +223,74 @@ IceInternal::ThreadPool::destroy()
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_destroyed = true;
- setInterrupt();
+ setInterrupt(0);
}
bool
IceInternal::ThreadPool::clearInterrupt()
{
- bool shutdown = false;
char c;
+
+repeat:
+
#ifdef _WIN32
- while (::recv(_fdIntrRead, &c, 1, 0) == 1)
-#else
- while (::read(_fdIntrRead, &c, 1) == 1)
-#endif
+ if (::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR)
{
- if (c == 1) // Shutdown initiated?
+ if (interrupted())
{
- shutdown = true;
+ goto repeat;
}
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
+#else
+ if (::read(_fdIntrRead, &c, 1) == -1)
+ {
+ if (interrupted())
+ {
+ goto repeat;
+ }
- return shutdown;
+ SystemException ex(__FILE__, __LINE__);
+ ex.error = getSystemErrno();
+ throw ex;
+ }
+#endif
+
+ return c == 1; // Return true if shutdown has been initiated.
}
void
-IceInternal::ThreadPool::setInterrupt()
+IceInternal::ThreadPool::setInterrupt(char c)
{
- char c = 0;
+repeat:
+
#ifdef _WIN32
- ::send(_fdIntrWrite, &c, 1, 0);
+ if (::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
+ {
+ if (interrupted())
+ {
+ goto repeat;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
#else
- ::write(_fdIntrWrite, &c, 1);
+ if (::write(_fdIntrWrite, &c, 1) == -1)
+ {
+ if (interrupted())
+ {
+ goto repeat;
+ }
+
+ SystemException ex(__FILE__, __LINE__);
+ ex.error = getSystemErrno();
+ throw ex;
+ }
#endif
}
@@ -288,49 +351,82 @@ IceInternal::ThreadPool::run()
}
EventHandlerPtr handler;
- std::pair<SOCKET, bool> remove(INVALID_SOCKET, false);
+ bool finished = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if (_destroyed)
+
+ if (FD_ISSET(_fdIntrRead, &fdSet))
{
//
- // Don't clear the interrupt fd if destroyed, so that
- // the other threads exit as well.
+ // There are three possiblities for an interrupt:
//
- return;
- }
+ // - The thread pool has been destroyed.
+ //
+ // - Server shutdown has been initiated.
+ //
+ // - An event handler was registered or unregistered.
+ //
+
+ //
+ // Thread pool destroyed?
+ //
+ if (_destroyed)
+ {
+ //
+ // Don't clear the interrupt if destroyed, so that
+ // the other threads exit as well.
+ //
+ return;
+ }
+
+ shutdown = clearInterrupt();
- if (!_adds.empty())
- {
//
- // New handlers have been added.
+ // Server shutdown?
//
- for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
+ if (shutdown)
{
- _handlerMap.insert(*p);
- FD_SET(p->first, &_fdSet);
- _maxFd = max(_maxFd, p->first);
- _minFd = min(_minFd, p->first);
+ goto repeatSelect;
}
- _adds.clear();
- }
-
- if (!_removes.empty())
- {
+
//
- // Handlers are permanently removed.
+ // An event handler must have been registered or
+ // unregistered.
//
- remove = _removes.front();
- _removes.pop_front();
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first);
- assert(p != _handlerMap.end());
- FD_CLR(p->first, &_fdSet);
- handler = p->second;
+ assert(!_changes.empty());
+ pair<SOCKET, EventHandlerPtr> change = _changes.front();
+ _changes.pop_front();
+
+ if (change.second) // Addition if handler is set.
+ {
+ _handlerMap.insert(change);
+ FD_SET(change.first, &_fdSet);
+ _maxFd = max(_maxFd, change.first);
+ _minFd = min(_minFd, change.first);
+ goto repeatSelect;
+ }
+ else // Removal if handler is not set.
+ {
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
+ assert(p != _handlerMap.end());
+ handler = p->second;
+ finished = true;
+ _handlerMap.erase(p);
+ FD_CLR(change.first, &_fdSet);
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if (!_handlerMap.empty())
+ {
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
+ }
+ // Don't goto repeatSelect; we have to call
+ // finished() on the event handler below, outside
+ // the thread synchronization.
+ }
}
-
- if (!handler)
+ else
{
//
// Optimization for WIN32 specific version of fd_set. Looping with a
@@ -400,11 +496,7 @@ IceInternal::ThreadPool::run()
}
#endif
- if (_lastFd == _fdIntrRead)
- {
- shutdown = clearInterrupt();
- goto repeatSelect;
- }
+ assert(_lastFd != _fdIntrRead);
map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
if(p == _handlerMap.end())
@@ -420,34 +512,27 @@ IceInternal::ThreadPool::run()
assert(handler);
- if (remove.first != INVALID_SOCKET)
+ if (finished)
{
//
- // Call finished() on a handler if necessary.
+ // Notify a handler about it's removal from the thread
+ // pool.
//
- if (remove.second)
- {
- handler->finished();
- }
-
+ handler->finished();
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first);
- assert(p != _handlerMap.end());
- _handlerMap.erase(p);
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if (!_handlerMap.empty())
- {
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
- }
if (handler->server())
{
+ assert(_servers > 0);
--_servers;
}
- if (_handlerMap.empty() || _servers == 0)
+ else
+ {
+ assert(_clients > 0);
+ --_clients;
+ }
+ if (_clients == 0 || _servers == 0)
{
notifyAll(); // For waitUntil...Finished() methods.
}
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 90674028b28..464bbb3711e 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -21,7 +21,7 @@
#include <Ice/LoggerF.h>
#include <Ice/PropertiesF.h>
#include <Ice/EventHandlerF.h>
-#include <deque>
+#include <list>
#ifndef _WIN32
# define SOCKET int
@@ -37,7 +37,9 @@ class ThreadPool : public ::IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mu
public:
void _register(SOCKET, const EventHandlerPtr&);
- void unregister(SOCKET, bool);
+ void unregister(SOCKET);
+ void serverIsNowClient();
+ void clientIsNowServer();
void promoteFollower();
void initiateServerShutdown(); // Signal-safe shutdown initiation.
void waitUntilServerFinished();
@@ -54,7 +56,7 @@ private:
friend class Instance;
bool clearInterrupt();
- void setInterrupt();
+ void setInterrupt(char);
void run();
void read(const EventHandlerPtr&);
@@ -69,9 +71,9 @@ private:
SOCKET _fdIntrRead;
SOCKET _fdIntrWrite;
fd_set _fdSet;
- std::vector<std::pair<SOCKET, EventHandlerPtr> > _adds;
- std::deque<std::pair<SOCKET, bool> > _removes;
+ std::list<std::pair<SOCKET, EventHandlerPtr> > _changes; // Event handler set for addition; null for removal.
std::map<SOCKET, EventHandlerPtr> _handlerMap;
+ int _clients;
int _servers;
int _timeout;
::IceUtil::Mutex _threadMutex;
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index 2a0d90c645a..e5b74f31dbd 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -177,6 +177,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
try
{
_fd = createSocket(true);
+ setBlock(_fd, false);
getAddress(host, port, _addr);
doConnect(_fd, _addr, -1);
_connect = false; // We're connected now
@@ -206,6 +207,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
try
{
_fd = createSocket(true);
+ setBlock(_fd, false);
getAddress(host, port, _addr);
doBind(_fd, _addr);