summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp2569
1 files changed, 1578 insertions, 991 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index cdf3dce6b80..ccc6d5236e7 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -34,195 +34,226 @@ using namespace IceInternal;
Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; }
void
-Ice::ConnectionI::validate()
+Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
{
- bool active = false;
+ if(adopted)
+ {
+ if(str)
+ {
+ delete stream;
+ stream = 0;
+ adopted = false;
+ }
+ else
+ {
+ return; // Stream is already adopted.
+ }
+ }
+ else if(!str)
+ {
+ if(out || outAsync)
+ {
+ return; // Adopting request stream is not necessary.
+ }
+ else
+ {
+ str = stream; // Adopt this stream
+ stream = 0;
+ }
+ }
- if(!_endpoint->datagram()) // Datagram connections are always implicitly validated.
+ assert(str);
+ stream = new BasicStream(str->instance());
+ stream->swap(*str);
+ adopted = true;
+}
+
+void
+Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify)
+{
+ if(out)
+ {
+ out->sent(notify); // true = notify the waiting thread that the request was sent.
+ }
+ else if(outAsync)
+ {
+ outAsync->__sent(connection);
+ }
+
+ if(adopted)
+ {
+ delete stream;
+ stream = 0;
+ }
+}
+
+void
+Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
+{
+ if(!response)
+ {
+ //
+ // Only notify oneway requests. The connection keeps track of twoway
+ // requests in the _requests/_asyncRequests maps and will notify them
+ // of the connection exceptions.
+ //
+ if(out)
+ {
+ out->finished(ex);
+ }
+ else if(outAsync)
+ {
+ outAsync->__finished(ex);
+ }
+ }
+
+ if(adopted)
+ {
+ delete stream;
+ stream = 0;
+ }
+}
+
+void
+Ice::ConnectionI::start(const StartCallbackPtr& callback)
+{
+ try
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl())
- {
- //
- // In thread per connection mode, this connection's thread
- // will take care of connection validation. Therefore all we
- // have to do here is to wait until this thread has completed
- // validation.
- //
- while(_state == StateNotValidated)
- {
- wait();
- }
-
- if(_state >= StateClosing)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- return;
- }
-
+ _startCallback = callback;
+
//
- // The connection might already be closed (e.g.: the communicator
- // was destroyed or object adapter deactivated.)
+ // The connection might already be closed if the communicator was destroyed.
//
- assert(_state == StateNotValidated || _state == StateClosed);
if(_state == StateClosed)
{
assert(_exception.get());
_exception->ice_throw();
}
-
- if(_adapter)
- {
- active = true; // The server side has the active role for connection validation.
- }
- else
- {
- active = false; // The client side has the passive role for connection validation.
- }
- }
-
- try
- {
- Int timeout;
- if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
- {
- timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint->timeout();
- }
-
- if(active)
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- BasicStream os(_instance.get());
- os.write(magic[0]);
- os.write(magic[1]);
- os.write(magic[2]);
- os.write(magic[3]);
- os.write(protocolMajor);
- os.write(protocolMinor);
- os.write(encodingMajor);
- os.write(encodingMinor);
- os.write(validateConnectionMsg);
- os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
- os.write(headerSize); // Message size.
- os.i = os.b.begin();
- traceHeader("sending validate connection", os, _logger, _traceLevels);
- try
- {
- _transceiver->initialize(timeout);
- _transceiver->write(os, timeout);
- }
- catch(const TimeoutException&)
- {
- throw ConnectTimeoutException(__FILE__, __LINE__);
- }
- }
- else
+ //
+ // In thread per connection mode, we create the thread for the connection. The
+ // intialization and validation of the connection is taken care of by the thread
+ // per connection. If a callback is given, no need to wait, the thread will notify
+ // the callback, otherwise wait until the connection is validated.
+ //
+ if(_threadPerConnection)
{
- BasicStream is(_instance.get());
- is.b.resize(headerSize);
- is.i = is.b.begin();
try
{
- _transceiver->initialize(timeout);
- _transceiver->read(is, timeout);
+ _thread = new ThreadPerConnection(this);
+ _thread->start(_threadPerConnectionStackSize);
}
- catch(const TimeoutException&)
+ catch(const IceUtil::Exception& ex)
{
- throw ConnectTimeoutException(__FILE__, __LINE__);
- }
- assert(is.i == is.b.end());
- is.i = is.b.begin();
- Byte m[4];
- is.read(m[0]);
- is.read(m[1]);
- is.read(m[2]);
- is.read(m[3]);
- if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
- {
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
- throw ex;
- }
- Byte pMajor;
- Byte pMinor;
- is.read(pMajor);
- is.read(pMinor);
- if(pMajor != protocolMajor)
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- Byte eMajor;
- Byte eMinor;
- is.read(eMajor);
- is.read(eMinor);
- if(eMajor != encodingMajor)
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
- Byte messageType;
- is.read(messageType);
- if(messageType != validateConnectionMsg)
- {
- throw ConnectionNotValidatedException(__FILE__, __LINE__);
+ {
+ Error out(_logger);
+ out << "cannot create thread for connection:\n" << ex;
+ }
+
+ //
+ // Clean up.
+ //
+ _thread = 0;
+ _state = StateClosed;
+
+ ex.ice_throw();
}
- Byte compress;
- is.read(compress); // Ignore compression status for validate connection.
- Int size;
- is.read(size);
- if(size != headerSize)
+
+ if(!callback) // Wait for the connection to be validated.
{
- throw IllegalMessageSizeException(__FILE__, __LINE__);
+ while(_state <= StateNotValidated)
+ {
+ wait();
+ }
+
+ if(_state >= StateClosing)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
}
- traceHeader("received validate connection", is, _logger, _traceLevels);
+ return; // We're done.
}
}
- catch(const LocalException& ex)
+
+ SocketStatus status = initialize();
+ if(status == Finished)
+ {
+ status = validate();
+ }
+
+ if(status == Finished)
+ {
+ finishStart();
+ return; // We're done!
+ }
+
+ assert(callback);
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state == StateClosed)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
assert(_exception.get());
_exception->ice_throw();
}
- }
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ int timeout;
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideConnectTimeout)
+ {
+ timeout = defaultsAndOverrides->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
- if(_acmTimeout > 0)
+ _sendInProgress = true;
+ _selectorThread->_register(_transceiver->fd(), this, status, timeout);
+ return;
+ }
+ catch(const Ice::LocalException& ex)
+ {
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+
+ //
+ // If start is called with a callback, the callback is notified either by the
+ // thread per conncetion or the thread pool.
+ //
+ if(callback)
+ {
+ if(!_threadPerConnection)
+ {
+ registerWithPool();
+ unregisterWithPool(); // Let finished do the close.
+ }
+ return;
+ }
+
+ //
+ // Close the transceiver if there's no thread per connection, otherwise, the
+ // thread per connection takes care of it.
+ //
+ if(!_thread && _transceiver)
+ {
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+ _transceiver = 0;
+ }
}
- //
- // We start out in holding state.
- //
- setState(StateHolding);
+ waitUntilFinished();
+ throw;
}
}
@@ -230,10 +261,14 @@ void
Ice::ConnectionI::activate()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state <= StateNotValidated)
+ {
+ return;
+ }
- while(_state == StateNotValidated)
+ if(_acmTimeout > 0)
{
- wait();
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
setState(StateActive);
@@ -243,10 +278,9 @@ void
Ice::ConnectionI::hold()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_state == StateNotValidated)
+ if(_state <= StateNotValidated)
{
- wait();
+ return;
}
setState(StateHolding);
@@ -255,20 +289,35 @@ Ice::ConnectionI::hold()
void
Ice::ConnectionI::destroy(DestructionReason reason)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- switch(reason)
+ bool send = false;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ switch(reason)
+ {
+ case ObjectAdapterDeactivated:
+ {
+ send = setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
+ break;
+ }
+
+ case CommunicatorDestroyed:
+ {
+ send = setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
+ break;
+ }
+ }
+ }
+
+ if(send) // Send the close connection message
{
- case ObjectAdapterDeactivated:
+ try
{
- setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
- break;
+ finishSendMessage();
}
-
- case CommunicatorDestroyed:
+ catch(const Ice::LocalException&)
{
- setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
- break;
+ // Ignore.
}
}
}
@@ -276,32 +325,47 @@ Ice::ConnectionI::destroy(DestructionReason reason)
void
Ice::ConnectionI::close(bool force)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(force)
+ bool send = false;
{
- setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(force)
+ {
+ setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ }
+ else
+ {
+ //
+ // If we do a graceful shutdown, then we wait until all
+ // outstanding requests have been completed. Otherwise, the
+ // CloseConnectionException will cause all outstanding
+ // requests to be retried, regardless of whether the server
+ // has processed them or not.
+ //
+ while(!_requests.empty() || !_asyncRequests.empty())
+ {
+ wait();
+ }
+
+ send = setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ }
}
- else
+
+ if(send) // Send the close connection message
{
- //
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise, the
- // CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the server
- // has processed them or not.
- //
- while(!_requests.empty() || !_asyncRequests.empty())
+ try
{
- wait();
+ finishSendMessage();
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore.
}
-
- setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
bool
-Ice::ConnectionI::isDestroyed() const
+Ice::ConnectionI::isActiveOrHolding() const
{
//
// We can not use trylock here, otherwise the outgoing connection
@@ -310,7 +374,7 @@ Ice::ConnectionI::isDestroyed() const
//
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- return _state >= StateClosing;
+ return _state > StateNotValidated && _state < StateClosing;
}
bool
@@ -460,56 +524,55 @@ Ice::ConnectionI::waitUntilFinished()
void
Ice::ConnectionI::monitor()
{
- IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
-
- if(!sync.acquired())
+ bool send = false;
{
- return;
- }
+ IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
+ if(!sync.acquired())
+ {
+ return;
+ }
- if(_state != StateActive)
- {
- return;
- }
+ if(_state != StateActive)
+ {
+ return;
+ }
- //
- // Check for timed out async requests.
- //
- for(map<Int, AsyncRequest>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
- {
- if(p->second.t > IceUtil::Time() && p->second.t <= IceUtil::Time::now(IceUtil::Time::Monotonic))
+ //
+ // Active connection management for idle connections.
+ //
+ if(_acmTimeout <= 0 ||
+ !_requests.empty() || !_asyncRequests.empty() ||
+ _batchStreamInUse || !_batchStream.b.empty() ||
+ _sendInProgress || _dispatchCount > 0)
{
- setState(StateClosed, TimeoutException(__FILE__, __LINE__));
return;
}
+
+ if(IceUtil::Time::now(IceUtil::Time::Monotonic) >= _acmAbsoluteTimeout)
+ {
+ send = setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
+ }
}
-
- //
- // Active connection management for idle connections.
- //
- if(_acmTimeout > 0 &&
- _requests.empty() && _asyncRequests.empty() &&
- !_batchStreamInUse && _batchStream.b.empty() &&
- _dispatchCount == 0)
+
+ if(send)
{
- if(IceUtil::Time::now(IceUtil::Time::Monotonic) >= _acmAbsoluteTimeout)
+ try
+ {
+ finishSendMessage();
+ }
+ catch(const Ice::LocalException&)
{
- setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
- return;
}
}
}
-void
-Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress)
+bool
+Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
{
- Int requestId;
-
+ BasicStream* os = out->os();
+ bool send = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams.
-
if(_exception.get())
{
//
@@ -523,10 +586,8 @@ Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
- //
- // Only add to the request map if this is a twoway call.
- //
- if(out)
+ Int requestId;
+ if(response)
{
//
// Create a new unique request ID.
@@ -547,134 +608,65 @@ Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress)
#else
copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
-
- //
- // Add to the requests map.
- //
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
}
-
- if(_acmTimeout > 0)
+
+ //
+ // Send the message. If it can't be sent without blocking the message is added
+ // to _sendStreams and it will be sent by the selector thread or by this thread
+ // if flush is true.
+ //
+ try
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ OutgoingMessage message(out, os, compress, response);
+ send = sendMessage(message);
}
- }
-
- try
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
+ catch(const LocalException& ex)
{
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
}
-
- if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes.
+
+ if(response)
{
//
- // Message compressed. Request compressed response, if any.
- //
- os->b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(*os, cstream);
-
- //
- // Send the request.
+ // Add to the requests map.
//
- os->i = os->b.begin();
- traceRequest("sending request", *os, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
}
- else
- {
- if(compress)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- os->b[9] = 1;
- }
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
-#endif
-
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
+ if(!send)
+ {
+ return !_sendInProgress && _queuedStreams.empty(); // The request was sent if it's not queued!
}
}
- catch(const LocalException& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
- if(out)
+ if(send)
+ {
+ try
{
- //
- // If the request has already been removed from the
- // request map, we are out of luck. It would mean that
- // finished() has been called already, and therefore the
- // exception has been set using the Outgoing::finished()
- // callback. In this case, we cannot throw the exception
- // here, because we must not both raise an exception and
- // have Outgoing::finished() called with an
- // exception. This means that in some rare cases, a
- // request will not be retried even though it could. But I
- // honestly don't know how I could avoid this, without a
- // very elaborate and complex design, which would be bad
- // for performance.
- //
- map<Int, Outgoing*>::iterator p = _requests.find(requestId);
- if(p != _requests.end())
- {
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
-
- _exception->ice_throw();
- }
+ finishSendMessage();
}
- else
+ catch(const Ice::LocalException&)
{
- _exception->ice_throw();
+ assert(_exception.get());
+ if(!response) // Twoway calls are notified through finished()
+ {
+ throw;
+ }
}
}
+ return true; // The request was sent.
}
void
-Ice::ConnectionI::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPtr& out, bool compress)
+Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
{
- Int requestId;
+ BasicStream* os = out->__getOs();
+ bool send = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway.
-
if(_exception.get())
{
//
@@ -687,140 +679,66 @@ Ice::ConnectionI::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPtr& out,
assert(_state > StateNotValidated);
assert(_state < StateClosing);
-
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
+
+ Int requestId;
+ if(response)
{
- _nextRequestId = 1;
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
-
- //
- // Add to the async requests map.
- //
- struct AsyncRequest asyncRequest;
- asyncRequest.p = out;
- if(_endpoint->timeout() > 0)
- {
- asyncRequest.t =
- IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::milliSeconds(_endpoint->timeout());
}
- _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
- pair<const Int, AsyncRequest>(requestId, asyncRequest));
-
- if(_acmTimeout > 0)
+
+ try
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ OutgoingMessage message(out, os, compress, response);
+ send = sendMessage(message);
}
- }
-
- try
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
+ catch(const LocalException& ex)
{
+ setState(StateClosed, ex);
assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
+ _exception->ice_throw();
}
- if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes.
- {
- //
- // Message compressed. Request compressed response, if any.
- //
- os->b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(*os, cstream);
-
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
+ if(response)
{
- if(compress)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- os->b[9] = 1;
- }
-
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
-#endif
-
//
- // Send the request.
+ // Add to the async requests map.
//
- os->i = os->b.begin();
- traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
+ pair<const Int, OutgoingAsyncPtr>(requestId, out));
}
}
- catch(const LocalException& ex)
+
+ if(send)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
-
- //
- // If the request has already been removed from the async
- // request map, we are out of luck. It would mean that
- // finished() has been called already, and therefore the
- // exception has been set using the
- // OutgoingAsync::__finished() callback. In this case, we
- // cannot throw the exception here, because we must not both
- // raise an exception and have OutgoingAsync::__finished()
- // called with an exception. This means that in some rare
- // cases, a request will not be retried even though it
- // could. But I honestly don't know how I could avoid this,
- // without a very elaborate and complex design, which would be
- // bad for performance.
- //
- map<Int, AsyncRequest>::iterator p = _asyncRequests.find(requestId);
- if(p != _asyncRequests.end())
+ try
{
- if(p == _asyncRequestsHint)
- {
- _asyncRequests.erase(p++);
- _asyncRequestsHint = p;
- }
- else
+ finishSendMessage();
+ }
+ catch(const Ice::LocalException&)
+ {
+ assert(_exception.get());
+ if(!response) // Twoway calls are notified through finished().
{
- _asyncRequests.erase(p);
- }
-
- _exception->ice_throw();
+ throw;
+ }
}
}
}
@@ -872,9 +790,8 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
void
Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
{
- bool autoflush = false;
- vector<Ice::Byte> lastRequest;
-
+ bool send = false;
+ try
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -882,16 +799,15 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
// Get the batch stream back.
//
_batchStream.swap(*os);
-
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ bool flush = false;
if(_batchAutoFlush)
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
- if(!_transceiver)
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
//
// Throw memory limit exception if the first message added causes us to
// go over limit. Otherwise put aside the marshalled message that caused
@@ -903,122 +819,152 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
}
catch(const Ice::Exception&)
{
- if(_batchRequestNum == 0)
+ if(_batchRequestNum > 0)
+ {
+ flush = true;
+ }
+ else
{
- resetBatch(true);
throw;
}
- vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
- _batchStream.b.resize(_batchMarker);
- autoflush = true;
}
}
- if(!autoflush)
+ if(flush)
{
//
- // Increment the number of requests in the batch.
+ // Temporarily save the last request.
+ //
+ vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
+ _batchStream.b.resize(_batchMarker);
+
+ //
+ // Send the batch stream without the last request.
+ //
+ try
+ {
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+
+ OutgoingMessage message(&_batchStream, _batchRequestCompress);
+ send = sendMessage(message);
+ if(send)
+ {
+ //
+ // If the request can't be sent immediately and this is a foreground send,
+ // we adopt the stream to be able to re-use _batchStream immediately.
+ //
+ assert(!_sendStreams.empty());
+ _sendStreams.back().adopt(0);
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // Reset the batch.
//
- ++_batchRequestNum;
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
//
- // We compress the whole batch if there is at least one compressed
- // message.
+ // Check again if the last request doesn't exceed what we can send with the auto flush
//
- if(compress)
+ if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax())
{
- _batchRequestCompress = true;
+ throw MemoryLimitException(__FILE__, __LINE__);
}
-
+
//
- // Notify about the batch stream not being in use anymore.
+ // Start a new batch with the last message that caused us to go over the limit.
//
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
}
- }
-
- if(autoflush)
- {
- //
- // We have to keep _batchStreamInUse set until after we insert the
- // saved marshalled data into a new stream.
- //
- flushBatchRequestsInternal(true);
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
- // Throw memory limit exception if the message that caused us to go over
- // limit causes us to exceed the limit by itself.
+ // Increment the number of requests in the batch.
//
- if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax())
- {
- resetBatch(true);
- throw MemoryLimitException(__FILE__, __LINE__);
- }
-
+ ++_batchRequestNum;
+
//
- // Start a new batch with the last message that caused us to
- // go over the limit.
+ // We compress the whole batch if there is at least one compressed
+ // message.
//
- try
- {
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- ex.ice_throw();
- }
-
if(compress)
{
_batchRequestCompress = true;
}
-
+
//
- // Notify that the batch stream not in use anymore.
+ // Notify about the batch stream not being in use anymore.
//
- ++_batchRequestNum;
+ assert(_batchStreamInUse);
_batchStreamInUse = false;
notifyAll();
}
+ catch(const Ice::LocalException&)
+ {
+ abortBatchRequest();
+ if(send)
+ {
+ finishSendMessage(); // Let exceptions go through to report auto-flush failures to the caller.
+ }
+ throw;
+ }
+
+ if(send)
+ {
+ finishSendMessage(); // Let exceptions go through to report auto-flush failures to the caller.
+ }
}
void
Ice::ConnectionI::abortBatchRequest()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Reset the batch stream. We cannot save old requests
- // in the batch stream, as they might be corrupted due to
- // incomplete marshaling.
- //
- resetBatch(true);
+
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
}
void
Ice::ConnectionI::flushBatchRequests()
{
- flushBatchRequestsInternal(false);
+ BatchOutgoing out(this, _instance.get());
+ out.invoke();
}
-void
-Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse)
+bool
+Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
{
+ bool send = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(!ignoreInUse)
+ while(_batchStreamInUse && !_exception.get())
{
- while(_batchStreamInUse && !_exception.get())
- {
- wait();
- }
+ wait();
}
if(_exception.get())
@@ -1026,36 +972,78 @@ Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse)
_exception->ice_throw();
}
- if(_batchStream.b.empty())
+ if(_batchRequestNum == 0)
{
- return; // Nothing to do.
+ return true;
}
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- _batchStream.i = _batchStream.b.begin();
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+ _batchStream.swap(*out->os());
- if(_acmTimeout > 0)
+ //
+ // Send the batch stream.
+ //
+ try
{
- _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ OutgoingMessage message(out, out->os(), _batchRequestCompress, false);
+ send = sendMessage(message);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
}
//
- // Prevent that new batch requests are added while we are
- // flushing.
+ // Reset the batch stream.
//
- _batchStreamInUse = true;
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+
+ if(!send)
+ {
+ return !_sendInProgress && _queuedStreams.empty(); // The request was sent if it's not queued!
+ }
}
-
- try
+
+ if(send)
+ {
+ finishSendMessage();
+ }
+ return true;
+}
+
+void
+Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
+{
+ bool send = false;
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
- if(!_transceiver) // Has the transceiver already been closed?
+ if(_batchRequestNum == 0)
{
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
+ return;
}
//
@@ -1067,195 +1055,123 @@ Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
-
- if(_batchRequestCompress && _batchStream.b.size() >= 100) // Only compress messages larger than 100 bytes.
+ _batchStream.swap(*outAsync->__getOs());
+
+ //
+ // Send the batch stream.
+ //
+ try
{
- //
- // Message compressed. Request compressed response, if any.
- //
- _batchStream.b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(_batchStream, cstream);
-
- //
- // Send the batch request.
- //
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
+ OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false);
+ send = sendMessage(message);
}
- else
+ catch(const Ice::LocalException& ex)
{
- if(_batchRequestCompress)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- _batchStream.b[9] = 1;
- }
-
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(_batchStream.b.size());
- const Byte* q = reinterpret_cast<const Byte*>(&sz);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(q, q + sizeof(Int), _batchStream.b.begin() + 10);
-#else
- copy(q, q + sizeof(Int), _batchStream.b.begin() + 10);
-#endif
-
- //
- // Send the batch request.
- //
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver->write(_batchStream, _endpoint->timeout());
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
}
- }
- catch(const LocalException& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
//
- // Since batch requests are all oneways (or datagrams), we
- // must report the exception to the caller.
+ // Reset the batch stream.
//
- _exception->ice_throw();
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
}
+ if(send)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Reset the batch stream, and notify that flushing is over.
- //
- resetBatch(!ignoreInUse);
- }
-}
-
-void
-Ice::ConnectionI::resetBatch(bool resetInUse)
-{
- BasicStream dummy(_instance.get(), _batchAutoFlush);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- //
- // Notify about the batch stream not being in use
- // anymore.
- //
- if(resetInUse)
- {
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ finishSendMessage();
}
}
void
Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
{
- try
+ bool send = false;
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_state > StateNotValidated);
- //
- // Only compress if compression was requested by the client,
- // and if the message is larger than 100 bytes.
- //
- if(compressFlag > 0 && os->b.size() >= 100)
+ try
{
- //
- // Message compressed. Request compressed response, if any.
- //
- os->b[9] = 2;
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(*os, cstream);
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
- //
- // Send the reply.
- //
- os->i = os->b.begin();
- traceReply("sending reply", *os, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
- if(compressFlag > 0)
+ OutgoingMessage message(os, compressFlag > 0);
+ send = sendMessage(message);
+
+ if(_state == StateClosing && _dispatchCount == 0)
{
- //
- // Message not compressed. Request compressed response, if any.
- //
- os->b[9] = 1;
+ send = initiateShutdown(send);
}
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
-#endif
-
- //
- // Send the reply.
- //
- os->i = os->b.begin();
- traceReply("sending reply", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) +
+ IceUtil::Time::seconds(_acmTimeout);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
}
}
- catch(const LocalException& ex)
+
+ if(send)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
+ try
+ {
+ finishSendMessage();
+ }
+ catch(Ice::LocalException&)
+ {
+ // Ignore.
+ }
}
+}
+void
+Ice::ConnectionI::sendNoResponse()
+{
+ bool send = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
assert(_state > StateNotValidated);
-
+
try
{
if(--_dispatchCount == 0)
{
notifyAll();
}
-
+
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
if(_state == StateClosing && _dispatchCount == 0)
{
- initiateShutdown();
+ send = initiateShutdown(false);
}
-
+
if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout =
- IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) +
+ IceUtil::Time::seconds(_acmTimeout);
}
}
catch(const LocalException& ex)
@@ -1263,31 +1179,18 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
setState(StateClosed, ex);
}
}
-}
-void
-Ice::ConnectionI::sendNoResponse()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(_state > StateNotValidated);
-
- try
+ if(send)
{
- if(--_dispatchCount == 0)
+ try
{
- notifyAll();
+ finishSendMessage();
}
-
- if(_state == StateClosing && _dispatchCount == 0)
+ catch(Ice::LocalException&)
{
- initiateShutdown();
+ // Ignore.
}
}
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- }
}
EndpointIPtr
@@ -1307,12 +1210,15 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_exception.get())
+ if(_state == StateClosing || _state == StateClosed)
{
+ assert(_exception.get());
_exception->ice_throw();
}
-
- assert(_state < StateClosing);
+ else if(_state <= StateNotValidated)
+ {
+ return;
+ }
_adapter = adapter;
@@ -1370,12 +1276,12 @@ Ice::ConnectionI::readable() const
return true;
}
-void
+bool
Ice::ConnectionI::read(BasicStream& stream)
{
assert(!_threadPerConnection); // Only for use with a thread pool.
- _transceiver->read(stream, 0);
+ return _transceiver->read(stream, 0);
//
// Updating _acmAbsoluteTimeout is too expensive here, because we
@@ -1446,9 +1352,6 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
threadPool->promoteFollower();
auto_ptr<LocalException> localEx;
-
- map<Int, Outgoing*> requests;
- map<Int, AsyncRequest> asyncRequests;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1456,49 +1359,48 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
--_finishedCount;
assert(threadPool.get() == _threadPool.get());
- if(_finishedCount == 0 && _state == StateClosed)
+ if(_finishedCount > 0 || _state != StateClosed || _sendInProgress)
{
- _threadPool->decFdsInUse();
+ return;
+ }
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ _threadPool->decFdsInUse();
+ _selectorThread->decFdsInUse();
- try
- {
- _transceiver->close();
- }
- catch(const LocalException& ex)
- {
- localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
- }
-
- _transceiver = 0;
- notifyAll();
+ try
+ {
+ _transceiver->close();
}
-
- if(_state == StateClosed || _state == StateClosing)
+ catch(const LocalException& ex)
{
- requests.swap(_requests);
- _requestsHint = _requests.end();
-
- asyncRequests.swap(_asyncRequests);
- _asyncRequestsHint = _asyncRequests.end();
+ localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
}
+
+ _transceiver = 0;
+ notifyAll();
}
- for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ finishStart(*_exception.get());
+
+ // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid.
+ for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o)
{
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ o->finished(*_exception.get());
}
+ _queuedStreams.clear();
- for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
+ for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
}
+ _requests.clear();
+ for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ {
+ q->second->__finished(*_exception.get()); // The exception is immutable at this point.
+ }
+ _asyncRequests.clear();
+
if(localEx.get())
{
localEx->ice_throw();
@@ -1554,6 +1456,139 @@ Ice::ConnectionI::toString() const
}
//
+// Operations from SocketReadyCallback
+//
+SocketStatus
+Ice::ConnectionI::socketReady(bool finished)
+{
+ if(!finished)
+ {
+ try
+ {
+ //
+ // First, we check if there's something to send. If that's the case, the connection
+ // must be active and the only thing to do is send the queued streams.
+ //
+ if(!_sendStreams.empty())
+ {
+ if(!send(0))
+ {
+ return NeedWrite;
+ }
+ assert(_sendStreams.empty());
+ }
+ else
+ {
+ //
+ // If there's nothing to send, we're still validating the connection.
+ //
+ int state;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_state == StateClosed || _state <= StateNotValidated);
+
+ state = _state;
+
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+ }
+
+ if(state == StateNotInitialized)
+ {
+ SocketStatus status = initialize();
+ if(status != Finished)
+ {
+ return status;
+ }
+ }
+
+ if(state <= StateNotValidated)
+ {
+ SocketStatus status = validate();
+ if(status != Finished)
+ {
+ return status;
+ }
+ }
+
+ finishStart();
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ }
+ }
+
+ //
+ // If there's no more data to send or if connection validation is finished, we checkout
+ // the connection state to figure out whether or not it's time to unregister with the
+ // selector thread.
+ //
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_sendInProgress);
+ if(_state == StateClosed)
+ {
+ assert(!_startCallback || (!_threadPerConnection && !_registeredWithPool));
+
+ _queuedStreams.insert(_queuedStreams.begin(), _sendStreams.begin(), _sendStreams.end());
+ _sendStreams.clear();
+ _sendInProgress = false;
+
+ if(_threadPerConnection)
+ {
+ _transceiver->shutdownReadWrite();
+ }
+ else
+ {
+ registerWithPool();
+ unregisterWithPool(); // Let finished() do the close.
+ }
+ notifyAll();
+ return Finished;
+ }
+ else if(_waitingForSend > 0) // If there's synchronous calls waiting to be sent, unregister.
+ {
+ _sendInProgress = false;
+ notifyAll();
+ return Finished;
+ }
+ else if(_queuedStreams.empty())
+ {
+ _sendInProgress = false;
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+ return Finished;
+ }
+ else
+ {
+ _sendStreams.swap(_queuedStreams);
+ return NeedWrite; // We're not finished yet, there's more data to send!
+ }
+}
+
+void
+Ice::ConnectionI::socketTimeout()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state <= StateNotValidated)
+ {
+ setState(StateClosed, ConnectTimeoutException(__FILE__, __LINE__));
+ }
+ else if(_state <= StateClosing)
+ {
+ setState(StateClosed, TimeoutException(__FILE__, __LINE__));
+ }
+}
+
+//
// Only used by the SSL plug-in.
//
// The external party has to synchronize the connection, since the
@@ -1596,8 +1631,10 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_batchRequestNum(0),
_batchRequestCompress(false),
_batchMarker(0),
+ _sendInProgress(false),
+ _waitingForSend(0),
_dispatchCount(0),
- _state(StateNotValidated),
+ _state(StateNotInitialized),
_stateTime(IceUtil::Time::now(IceUtil::Time::Monotonic))
{
Int& acmTimeout = const_cast<Int&>(_acmTimeout);
@@ -1635,17 +1672,17 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_servantManager = adapterImpl->getServantManager();
}
- if(!threadPerConnection)
+ __setNoDelete(true);
+ try
{
- //
- // Only set _threadPool if we really need it, i.e., if we are
- // not in thread per connection mode. Thread pools have lazy
- // initialization in Instance, and we don't want them to be
- // created if they are not needed.
- //
- __setNoDelete(true);
- try
+ if(!threadPerConnection)
{
+ //
+ // Only set _threadPool if we really need it, i.e., if we are
+ // not in thread per connection mode. Thread pools have lazy
+ // initialization in Instance, and we don't want them to be
+ // created if they are not needed.
+ //
if(adapterImpl)
{
const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool();
@@ -1656,76 +1693,34 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
}
_threadPool->incFdsInUse();
}
- catch(const IceUtil::Exception& ex)
- {
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
- __setNoDelete(false);
- ex.ice_throw();
- }
+ //
+ // Only set selector thread if we really need it.
+ //
+ const_cast<SelectorThreadPtr&>(_selectorThread) = _instance->selectorThread();
+ _selectorThread->incFdsInUse();
+ }
+ catch(const IceUtil::Exception&)
+ {
__setNoDelete(false);
+ throw;
}
+ __setNoDelete(false);
}
Ice::ConnectionI::~ConnectionI()
{
+ assert(!_startCallback);
assert(_state == StateClosed);
assert(!_transceiver);
assert(_dispatchCount == 0);
assert(!_thread);
+ assert(_queuedStreams.empty());
+ assert(_requests.empty());
+ assert(_asyncRequests.empty());
}
-void
-Ice::ConnectionI::start()
-{
- //
- // If we are in thread per connection mode, create the thread for this connection.
- // We can't start the thread in the constructor because it can cause a race condition
- // (see bug 1718).
- //
- if(_threadPerConnection)
- {
- try
- {
- _thread = new ThreadPerConnection(this);
- _thread->start(_threadPerConnectionStackSize);
- }
- catch(const IceUtil::Exception& ex)
- {
- {
- Error out(_logger);
- out << "cannot create thread for connection:\n" << ex;
- }
-
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- //
- // Clean up.
- //
- _transceiver = 0;
- _thread = 0;
- _state = StateClosed;
-
- ex.ice_throw();
- }
- }
-}
-
-void
+bool
Ice::ConnectionI::setState(State state, const LocalException& ex)
{
//
@@ -1736,7 +1731,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
if(_state == state) // Don't switch twice.
{
- return;
+ return false;
}
if(!_exception.get())
@@ -1777,10 +1772,10 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
// exceptions. Otherwise new requests may retry on a connection
// that is not yet marked as closed or closing.
//
- setState(state);
+ return setState(state);
}
-void
+bool
Ice::ConnectionI::setState(State state)
{
//
@@ -1795,135 +1790,122 @@ Ice::ConnectionI::setState(State state)
//
// Skip graceful shutdown if we are destroyed before validation.
//
- if(_state == StateNotValidated && state == StateClosing)
+ if(_state <= StateNotValidated && state == StateClosing)
{
state = StateClosed;
}
if(_state == state) // Don't switch twice.
{
- return;
+ return false;
}
switch(state)
{
- case StateNotValidated:
+ case StateNotInitialized:
+ {
+ assert(false);
+ break;
+ }
+
+ case StateNotValidated:
+ {
+ if(_state != StateNotInitialized)
{
- assert(false);
- break;
+ assert(_state == StateClosed);
+ return false;
}
+ break;
+ }
- case StateActive:
+ case StateActive:
+ {
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+ if(_state != StateHolding && _state != StateNotValidated)
{
- //
- // Can only switch from holding or not validated to
- // active.
- //
- if(_state != StateHolding && _state != StateNotValidated)
- {
- return;
- }
- if(!_threadPerConnection)
- {
- registerWithPool();
- }
- break;
+ return false;
+ }
+ if(!_threadPerConnection)
+ {
+ registerWithPool();
+ }
+ break;
+ }
+
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return false;
+ }
+ if(!_threadPerConnection)
+ {
+ unregisterWithPool();
+ }
+ break;
+ }
+
+ case StateClosing:
+ {
+ //
+ // Can't change back from closed.
+ //
+ if(_state == StateClosed)
+ {
+ return false;
}
+ if(!_threadPerConnection)
+ {
+ registerWithPool(); // We need to continue to read in closing state.
+ }
+ break;
+ }
- case StateHolding:
+ case StateClosed:
+ {
+ if(_sendInProgress)
{
//
- // Can only switch from active or not validated to
- // holding.
+ // Unregister with both the pool and the selector thread. We unregister with
+ // the pool to ensure that it stops reading on the socket (otherwise, if the
+ // socket is closed the thread pool would spin always reading 0 from the FD).
+ // The selector thread will register again the FD with the pool once it's
+ // done.
//
- if(_state != StateActive && _state != StateNotValidated)
- {
- return;
- }
+ _selectorThread->unregister(_transceiver->fd());
if(!_threadPerConnection)
{
unregisterWithPool();
}
- break;
- }
- case StateClosing:
+ _transceiver->shutdownWrite(); // Prevent further writes.
+ }
+ else if(_state <= StateNotValidated || _threadPerConnection)
{
//
- // Can't change back from closed.
+ // If we are in thread per connection mode or we're initializing
+ // the connection in blocking mode, we shutdown both for reading
+ // and writing. This will unblock and read call with an exception.
+ // The thread per connection then closes the transceiver.
//
- if(_state == StateClosed)
- {
- return;
- }
- if(!_threadPerConnection)
- {
- registerWithPool(); // We need to continue to read in closing state.
- }
- break;
+ _transceiver->shutdownReadWrite();
}
-
- case StateClosed:
+ else
{
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we
- // shutdown both for reading and writing. This will
- // unblock and read call with an exception. The thread
- // per connection then closes the transceiver.
- //
- _transceiver->shutdownReadWrite();
- }
- else if(_state == StateNotValidated)
- {
- //
- // If we change from not validated we can close right
- // away.
- //
- assert(!_registeredWithPool);
-
- _threadPool->decFdsInUse();
-
- //
- // We must make sure that nobody is sending when we
- // close the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- _transceiver = 0;
- //notifyAll(); // We notify already below.
- }
- else
- {
- //
- // Otherwise we first must make sure that we are
- // registered, then we unregister, and let finished()
- // do the close.
- //
- registerWithPool();
- unregisterWithPool();
+ registerWithPool();
+ unregisterWithPool(); // Let finished() do the close.
- //
- // We must prevent any further writes when _state == StateClosed.
- // However, functions such as sendResponse cannot acquire the main
- // mutex in order to check _state. Therefore we shut down the write
- // end of the transceiver, which causes subsequent write attempts
- // to fail with an exception.
- //
- _transceiver->shutdownWrite();
- }
- break;
+ _transceiver->shutdownWrite(); // Prevent further writes.
}
+ break;
+ }
}
//
@@ -1954,25 +1936,25 @@ Ice::ConnectionI::setState(State state)
{
try
{
- initiateShutdown();
+ return initiateShutdown(false);
}
catch(const LocalException& ex)
{
setState(StateClosed, ex);
}
}
+
+ return false;
}
-void
-Ice::ConnectionI::initiateShutdown() const
+bool
+Ice::ConnectionI::initiateShutdown(bool queue)
{
assert(_state == StateClosing);
assert(_dispatchCount == 0);
if(!_endpoint->datagram())
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
//
// Before we shut down, we send a close connection message.
//
@@ -1989,12 +1971,9 @@ Ice::ConnectionI::initiateShutdown() const
os.write((Byte)1); // Compression status: compression supported but not used.
os.write(headerSize); // Message size.
- //
- // Send the message.
- //
- os.i = os.b.begin();
- traceHeader("sending close connection", os, _logger, _traceLevels);
- _transceiver->write(os, _endpoint->timeout());
+ OutgoingMessage message(&os, false);
+ return sendMessage(message, queue);
+
//
// The CloseConnection message should be sufficient. Closing the write
// end of the socket is probably an artifact of how things were done
@@ -2005,6 +1984,610 @@ Ice::ConnectionI::initiateShutdown() const
//
//_transceiver->shutdownWrite();
}
+
+ return false;
+}
+
+SocketStatus
+Ice::ConnectionI::initialize()
+{
+ int timeout = 0;
+ if(!_startCallback || _threadPerConnection)
+ {
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideConnectTimeout)
+ {
+ timeout = defaultsAndOverrides->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
+ }
+
+ try
+ {
+ SocketStatus status = _transceiver->initialize(timeout);
+ if(status != Finished)
+ {
+ if(!_startCallback || _threadPerConnection)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+ return status;
+ }
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // Update the connection description once the transceiver is initialized.
+ //
+ const_cast<string&>(_desc) = _transceiver->toString();
+
+ setState(StateNotValidated);
+ }
+
+ return Finished;
+}
+
+SocketStatus
+Ice::ConnectionI::validate()
+{
+ if(!_endpoint->datagram()) // Datagram connections are always implicitly validated.
+ {
+ Int timeout = 0;
+ if(!_startCallback || _threadPerConnection)
+ {
+ if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
+ {
+ timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
+ }
+
+ if(_adapter) // The server side has the active role for connection validation.
+ {
+ BasicStream& os = _stream;
+ if(os.b.empty())
+ {
+ os.write(magic[0]);
+ os.write(magic[1]);
+ os.write(magic[2]);
+ os.write(magic[3]);
+ os.write(protocolMajor);
+ os.write(protocolMinor);
+ os.write(encodingMajor);
+ os.write(encodingMinor);
+ os.write(validateConnectionMsg);
+ os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ os.write(headerSize); // Message size.
+ os.i = os.b.begin();
+ traceSend(os, _logger, _traceLevels);
+ }
+ else
+ {
+ // The stream can only be non-empty if we're doing a non-blocking connection validation.
+ assert(_startCallback && !_threadPerConnection);
+ }
+
+ try
+ {
+ if(!_transceiver->write(os, timeout))
+ {
+ if(!_startCallback || _threadPerConnection)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+ return NeedWrite;
+ }
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+ }
+ else // The client side has the passive role for connection validation.
+ {
+ BasicStream& is = _stream;
+ if(is.b.empty())
+ {
+ is.b.resize(headerSize);
+ is.i = is.b.begin();
+ }
+ else
+ {
+ // The stream can only be non-empty if we're doing a non-blocking connection validation.
+ assert(_startCallback && !_threadPerConnection);
+ }
+
+ try
+ {
+ if(!_transceiver->read(is, timeout))
+ {
+ if(!_startCallback || _threadPerConnection)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+ return NeedRead;
+ }
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+
+ assert(is.i == is.b.end());
+ is.i = is.b.begin();
+ Byte m[4];
+ is.read(m[0]);
+ is.read(m[1]);
+ is.read(m[2]);
+ is.read(m[3]);
+ if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
+ throw ex;
+ }
+ Byte pMajor;
+ Byte pMinor;
+ is.read(pMajor);
+ is.read(pMinor);
+ if(pMajor != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ is.read(eMajor);
+ is.read(eMinor);
+ if(eMajor != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ is.read(messageType);
+ if(messageType != validateConnectionMsg)
+ {
+ throw ConnectionNotValidatedException(__FILE__, __LINE__);
+ }
+ Byte compress;
+ is.read(compress); // Ignore compression status for validate connection.
+ Int size;
+ is.read(size);
+ if(size != headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ traceRecv(is, _logger, _traceLevels);
+ }
+ }
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _stream.resize(0);
+ _stream.i = _stream.b.begin();
+
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+
+ return Finished;
+}
+
+bool
+Ice::ConnectionI::send(int timeout)
+{
+ assert(_transceiver);
+ assert(!_sendStreams.empty());
+
+ while(!_sendStreams.empty())
+ {
+ OutgoingMessage* message = &_sendStreams.front();
+
+ //
+ // Prepare the message stream for writing if necessary.
+ //
+ if(!message->stream->i)
+ {
+ message->stream->i = message->stream->b.begin();
+ if(message->compress && message->stream->b.size() >= 100) // Only compress messages larger than 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ message->stream->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream stream(_instance.get());
+ doCompress(*message->stream, stream);
+
+ if(message->outAsync)
+ {
+ trace("sending asynchronous request", *message->stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message->stream, _logger, _traceLevels);
+ }
+
+ message->adopt(&stream); // Adopt the compressed stream.
+ message->stream->i = message->stream->b.begin();
+ }
+ else
+ {
+ if(message->compress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ message->stream->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(message->stream->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), message->stream->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), message->stream->b.begin() + 10);
+#endif
+ message->stream->i = message->stream->b.begin();
+
+ if(message->outAsync)
+ {
+ trace("sending asynchronous request", *message->stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message->stream, _logger, _traceLevels);
+ }
+ }
+ }
+
+ //
+ // Send the first message.
+ //
+ assert(message->stream->i);
+ if(!_transceiver->write(*message->stream, timeout))
+ {
+ assert(timeout == 0);
+ return false;
+ }
+
+ //
+ // Notify the message that it was sent.
+ //
+ message->sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread.
+ _sendStreams.pop_front();
+ }
+
+ return true;
+}
+
+bool
+Ice::ConnectionI::sendMessage(OutgoingMessage& message, bool queue)
+{
+ assert(_state != StateClosed);
+
+ //
+ // TODO: Remove support for foreground send? If set to true, messages are sent
+ // by the calling thread. Foreground send might still be useful for transports
+ // that don't support non-blocking send.
+ //
+ bool foreground = false;
+
+ message.stream->i = 0; // Reset the message stream iterator before starting sending the message.
+
+ //
+ // If another thread is currently sending messages, we queue the message in _queuedStreams
+ // if we're not required to send the message in the foreground. If we're required to send
+ // the request in the foreground we wait until no more threads send messages.
+ //
+ if(_sendInProgress)
+ {
+ if(!foreground)
+ {
+ _queuedStreams.push_back(message);
+ _queuedStreams.back().adopt(0);
+ return false;
+ }
+ else if(queue)
+ {
+ //
+ // Add the message to _sendStreams if requested, this is useful for sendResponse() to
+ // send the close connection message after sending the response.
+ //
+ _sendStreams.push_back(message);
+ return true; // The calling thread must send the messages by calling finishSendMessage()
+ }
+ else
+ {
+ ++_waitingForSend;
+ while(_sendInProgress)
+ {
+ wait();
+ }
+ --_waitingForSend;
+
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+ }
+ }
+
+ assert(!_sendInProgress);
+
+ //
+ // Attempt to send the message without blocking. If the send blocks, we register
+ // the connection with the selector thread or we request the caller to call
+ // finishSendMessage() outside the synchronization.
+ //
+
+ message.stream->i = message.stream->b.begin();
+
+ if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ message.stream->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream stream(_instance.get());
+ doCompress(*message.stream, stream);
+ stream.i = stream.b.begin();
+
+ if(message.outAsync)
+ {
+ trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message.stream, _logger, _traceLevels);
+ }
+
+ //
+ // Send the message without blocking.
+ //
+ if(!foreground && _transceiver->write(stream, 0))
+ {
+ message.sent(this, false);
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout =
+ IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+ return false;
+ }
+
+ _sendStreams.push_back(message);
+ _sendStreams.back().adopt(&stream);
+ }
+ else
+ {
+ if(message.compress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ message.stream->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(message.stream->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), message.stream->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), message.stream->b.begin() + 10);
+#endif
+ message.stream->i = message.stream->b.begin();
+
+ if(message.outAsync)
+ {
+ trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message.stream, _logger, _traceLevels);
+ }
+
+ //
+ // Send the message without blocking.
+ //
+ if(!foreground && _transceiver->write(*message.stream, 0))
+ {
+ message.sent(this, false);
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout =
+ IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+ return false;
+ }
+
+ _sendStreams.push_back(message);
+ if(!foreground)
+ {
+ _sendStreams.back().adopt(0);
+ }
+ }
+
+ _sendInProgress = true;
+ if(!foreground)
+ {
+ _selectorThread->_register(_transceiver->fd(), this, NeedWrite, _endpoint->timeout());
+ return false; // The selector thread will send the message.
+ }
+ else
+ {
+ return true; // The calling thread must send the message by calling finishSendMessage()
+ }
+}
+
+void
+Ice::ConnectionI::finishSendMessage()
+{
+ try
+ {
+ //
+ // Send the send messages with a blocking write().
+ //
+ send(_endpoint->timeout());
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+
+ for(deque<OutgoingMessage>::const_iterator p = _sendStreams.begin(); p != _sendStreams.end(); ++p)
+ {
+ if(p->adopted)
+ {
+ delete p->stream;
+ }
+ }
+ _sendStreams.clear();
+ _sendInProgress = false;
+
+ if(_threadPerConnection)
+ {
+ _transceiver->shutdownReadWrite();
+ }
+ else
+ {
+ registerWithPool();
+ unregisterWithPool(); // Let finished() do the close.
+ }
+
+ notifyAll();
+
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+
+ //
+ // Clear the _sendInProgress flag and notify waiting threads that we're not
+ // sending anymore data.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_sendStreams.empty());
+
+ if(_state == StateClosed)
+ {
+ _sendInProgress = false;
+ if(_threadPerConnection)
+ {
+ _transceiver->shutdownReadWrite();
+ }
+ else
+ {
+ registerWithPool();
+ unregisterWithPool(); // Let finished() do the close.
+ }
+ notifyAll();
+ }
+ else if(_waitingForSend > 0)
+ {
+ _sendInProgress = false;
+ notifyAll();
+ }
+ else if(_queuedStreams.empty())
+ {
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+ _sendInProgress = false;
+ }
+ else
+ {
+ _selectorThread->_register(_transceiver->fd(), this, NeedWrite, _endpoint->timeout());
+ }
+}
+
+void
+Ice::ConnectionI::finishStart()
+{
+ //
+ // We set _startCallback to null to break potential cyclic reference count
+ // and because the destructor checks for it to ensure that we always invoke
+ // on the callback.
+ //
+
+ StartCallbackPtr callback;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ swap(callback, _startCallback);
+ }
+ if(callback)
+ {
+ callback->connectionStartCompleted(this);
+ }
+}
+
+void
+Ice::ConnectionI::finishStart(const Ice::LocalException& ex)
+{
+ //
+ // We set _startCallback to null to break potential cyclic reference count
+ // and because the destructor checks for it to ensure that we always invoke
+ // on the callback.
+ //
+
+ StartCallbackPtr callback;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ swap(callback, _startCallback);
+ }
+ if(callback)
+ {
+ callback->connectionStartFailed(this, ex);
+ }
}
void
@@ -2213,7 +2796,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
case closeConnectionMsg:
{
- traceHeader("received close connection", stream, _logger, _traceLevels);
+ traceRecv(stream, _logger, _traceLevels);
if(_endpoint->datagram())
{
if(_warn)
@@ -2233,13 +2816,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
if(_state == StateClosing)
{
- traceRequest("received request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
+ trace("received request during closing\n(ignored by server, client will retry)", stream, _logger,
+ _traceLevels);
}
else
{
- traceRequest("received request", stream, _logger, _traceLevels);
+ traceRecv(stream, _logger, _traceLevels);
stream.read(requestId);
invokeNum = 1;
servantManager = _servantManager;
@@ -2253,13 +2835,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
if(_state == StateClosing)
{
- traceBatchRequest("received batch request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
+ trace("received batch request during closing\n(ignored by server, client will retry)", stream,
+ _logger, _traceLevels);
}
else
{
- traceBatchRequest("received batch request", stream, _logger, _traceLevels);
+ traceRecv(stream, _logger, _traceLevels);
stream.read(invokeNum);
if(invokeNum < 0)
{
@@ -2275,12 +2856,12 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
case replyMsg:
{
- traceReply("received reply", stream, _logger, _traceLevels);
+ traceRecv(stream, _logger, _traceLevels);
stream.read(requestId);
map<Int, Outgoing*>::iterator p = _requests.end();
- map<Int, AsyncRequest>::iterator q = _asyncRequests.end();
+ map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end();
if(_requestsHint != _requests.end())
{
@@ -2334,7 +2915,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
assert(q != _asyncRequests.end());
- outAsync = q->second.p;
+ outAsync = q->second;
if(q == _asyncRequestsHint)
{
@@ -2352,7 +2933,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
case validateConnectionMsg:
{
- traceHeader("received validate connection", stream, _logger, _traceLevels);
+ traceRecv(stream, _logger, _traceLevels);
if(_warn)
{
Warning out(_logger);
@@ -2363,9 +2944,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
default:
{
- traceHeader("received unknown message\n"
- "(invalid, closing connection)",
- stream, _logger, _traceLevels);
+ trace("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels);
throw UnknownMessageException(__FILE__, __LINE__);
break;
}
@@ -2448,30 +3027,26 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
void
Ice::ConnectionI::run()
{
- //
- // For non-datagram connections, the thread-per-connection must
- // validate and activate this connection, and not in the
- // connection factory. Please see the comments in the connection
- // factory for details.
- //
- if(!_endpoint->datagram())
+ try
+ {
+ //
+ // Initialize the connection transceiver and validate the connection using
+ // blocking operations.
+ //
+ SocketStatus status;
+
+ status = initialize();
+ assert(status == Finished);
+
+ status = validate();
+ assert(status == Finished);
+ }
+ catch(const LocalException& ex)
{
- try
- {
- validate();
- }
- catch(const LocalException&)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(_state == StateClosed);
-
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
+ setState(StateClosed, ex);
+
if(_transceiver)
{
try
@@ -2482,15 +3057,17 @@ Ice::ConnectionI::run()
{
// Here we ignore any exceptions in close().
}
-
+
_transceiver = 0;
}
notifyAll();
- return;
}
-
- activate();
+
+ finishStart(ex);
+ return;
}
+
+ finishStart();
const bool warnUdp = _instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0;
@@ -2615,9 +3192,6 @@ Ice::ConnectionI::run()
OutgoingAsyncPtr outAsync;
auto_ptr<LocalException> localEx;
-
- map<Int, Outgoing*> requests;
- map<Int, AsyncRequest> asyncRequests;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -2638,12 +3212,24 @@ Ice::ConnectionI::run()
//
if(_state == StateClosed)
{
+ if(_sendInProgress)
+ {
+ _selectorThread->unregister(_transceiver->fd());
+ }
+
//
- // We must make sure that nobody is sending when we close
- // the transceiver.
+ // Prevent further writes.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
+ _transceiver->shutdownWrite();
+
+ //
+ // We must make sure that nobody is sending before closing the transceiver.
+ //
+ while(_sendInProgress)
+ {
+ wait();
+ }
+
try
{
_transceiver->close();
@@ -2652,26 +3238,15 @@ Ice::ConnectionI::run()
{
localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
}
-
_transceiver = 0;
notifyAll();
//
- // We cannot simply return here. We have to make sure
- // that all requests (regular and async) are notified
- // about the closed connection below.
+ // We cannot simply return here. We have to make sure that all requests (regular and
+ // async) are notified about the closed connection below.
//
closed = true;
}
-
- if(_state == StateClosed || _state == StateClosing)
- {
- requests.swap(_requests);
- _requestsHint = _requests.end();
-
- asyncRequests.swap(_asyncRequests);
- _asyncRequestsHint = _asyncRequests.end();
- }
}
//
@@ -2690,21 +3265,33 @@ Ice::ConnectionI::run()
//
invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
- for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
- {
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
- }
-
- for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
+ if(closed)
{
- q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
- }
+ // Note: the streams must be cleared first because they expect the Outgoing objects to still be valid.
+ for(deque<OutgoingMessage>::iterator o = _queuedStreams.begin(); o != _queuedStreams.end(); ++o)
+ {
+ o->finished(*_exception.get());
+ }
+ _queuedStreams.clear();
+
+ for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+ _requests.clear();
+ for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ {
+ q->second->__finished(*_exception.get()); // The exception is immutable at this point.
+ }
+ _asyncRequests.clear();
+ }
+
if(localEx.get())
{
assert(closed);
localEx->ice_throw();
- }
+ }
}
}