summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp105
1 files changed, 71 insertions, 34 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 6ae51a0d9eb..5920e58d1c3 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -252,26 +252,31 @@ void
IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
IceUtil::RecMutex::Lock sync(*this);
-
+
//
- // In closed and holding state, we are not registered with the
- // thread pool. For all other states, we have to notify the thread
- // pool in case this event handler changed from a client to a
- // server or vice versa.
+ // We are registered with a thread pool in active and closing
+ // mode. However, we only change subscription if we're in active
+ // mode, and thus ignore closing mode here.k
//
- if (_state != StateHolding && _state != StateClosed)
+ if (_state == StateActive)
{
if (adapter && !_adapter)
{
- _threadPool->clientIsNowServer();
+ //
+ // Client is now server.
+ //
+ unregisterWithPool();
}
if (!adapter && _adapter)
{
- _threadPool->serverIsNowClient();
+ //
+ // Server is now client.
+ //
+ unregisterWithPool();
}
}
-
+
_adapter = adapter;
}
@@ -283,13 +288,6 @@ IceInternal::Connection::getAdapter() const
}
bool
-IceInternal::Connection::server() const
-{
- IceUtil::RecMutex::Lock sync(*this);
- return _adapter;
-}
-
-bool
IceInternal::Connection::readable() const
{
return true;
@@ -302,7 +300,7 @@ IceInternal::Connection::read(BasicStream& stream)
}
void
-IceInternal::Connection::message(BasicStream& stream)
+IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
bool invoke = false;
bool batch = false;
@@ -310,7 +308,7 @@ IceInternal::Connection::message(BasicStream& stream)
{
IceUtil::RecMutex::Lock sync(*this);
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
if (_state == StateClosed)
{
@@ -555,15 +553,17 @@ IceInternal::Connection::message(BasicStream& stream)
}
void
-IceInternal::Connection::finished()
+IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
IceUtil::RecMutex::Lock sync(*this);
- assert(_state == StateClosed || _state == StateHolding);
-
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
- if (_state == StateClosed)
+ if (_state == StateActive || _state == StateClosing)
+ {
+ registerWithPool();
+ }
+ else if (_state == StateClosed)
{
_transceiver->close();
}
@@ -578,7 +578,7 @@ IceInternal::Connection::exception(const LocalException& ex)
/*
bool
-IceInternal::Connection::tryDestroy()
+IceInternal::Connection::tryDestroy(const ThreadPoolPtr& threadPool)
{
bool isLocked = trylock();
if(!isLocked)
@@ -586,7 +586,7 @@ IceInternal::Connection::tryDestroy()
return false;
}
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
try
{
@@ -611,12 +611,11 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_transceiver(transceiver),
_endpoint(endpoint),
_adapter(adapter),
- _threadPool(instance->threadPool()),
- _logger(instance->logger()),
- _traceLevels(instance->traceLevels()),
+ _logger(_instance->logger()),
+ _traceLevels(_instance->traceLevels()),
_nextRequestId(1),
_requestsHint(_requests.end()),
- _batchStream(instance),
+ _batchStream(_instance),
_responseCount(0),
_proxyUsageCount(0),
_state(StateHolding)
@@ -713,7 +712,7 @@ IceInternal::Connection::setState(State state)
{
return;
}
- _threadPool->_register(_transceiver->fd(), this);
+ registerWithPool();
break;
}
@@ -723,7 +722,7 @@ IceInternal::Connection::setState(State state)
{
return;
}
- _threadPool->unregister(_transceiver->fd());
+ unregisterWithPool();
break;
}
@@ -738,7 +737,7 @@ IceInternal::Connection::setState(State state)
//
// We need to continue to read data in closing state.
//
- _threadPool->_register(_transceiver->fd(), this);
+ registerWithPool();
}
break;
}
@@ -752,9 +751,9 @@ IceInternal::Connection::setState(State state)
// register again before we unregister, so that
// finished() is called correctly.
//
- _threadPool->_register(_transceiver->fd(), this);
+ registerWithPool();
}
- _threadPool->unregister(_transceiver->fd());
+ unregisterWithPool();
break;
}
}
@@ -787,3 +786,41 @@ IceInternal::Connection::closeConnection()
_transceiver->write(os, _endpoint->timeout());
_transceiver->shutdown();
}
+
+void
+IceInternal::Connection::registerWithPool()
+{
+ if (_adapter)
+ {
+ if (!_serverThreadPool)
+ {
+ _serverThreadPool = _instance->serverThreadPool();
+ assert(_serverThreadPool);
+ }
+ _serverThreadPool->_register(_transceiver->fd(), this);
+ }
+ else
+ {
+ if (!_clientThreadPool)
+ {
+ _clientThreadPool = _instance->clientThreadPool();
+ assert(_clientThreadPool);
+ }
+ _clientThreadPool->_register(_transceiver->fd(), this);
+ }
+}
+
+void
+IceInternal::Connection::unregisterWithPool()
+{
+ if (_adapter)
+ {
+ assert(_serverThreadPool);
+ _serverThreadPool->unregister(_transceiver->fd());
+ }
+ else
+ {
+ assert(_clientThreadPool);
+ _clientThreadPool->unregister(_transceiver->fd());
+ }
+}