summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp3414
1 files changed, 1707 insertions, 1707 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index d832f87a886..22b4c28cf23 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -41,189 +41,189 @@ Ice::ConnectionI::validate()
if(!_endpoint->datagram()) // Datagram connections are always implicitly validated.
{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl())
- {
- //
- // In thread per connection mode, this connection's thread
- // will take care of connection validation. Therefore all we
- // have to do here is to wait until this thread has completed
- // validation.
- //
- while(_state == StateNotValidated)
- {
- wait();
- }
-
- if(_state >= StateClosing)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- return;
- }
-
- //
- // The connection might already be closed (e.g.: the communicator
- // was destroyed or object adapter deactivated.)
- //
- assert(_state == StateNotValidated || _state == StateClosed);
- if(_state == StateClosed)
- {
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- if(_adapter)
- {
- active = true; // The server side has the active role for connection validation.
- }
- else
- {
- active = false; // The client side has the passive role for connection validation.
- }
- }
-
- try
- {
- Int timeout;
- if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
- {
- timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint->timeout();
- }
-
- if(active)
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- BasicStream os(_instance.get());
- os.write(magic[0]);
- os.write(magic[1]);
- os.write(magic[2]);
- os.write(magic[3]);
- os.write(protocolMajor);
- os.write(protocolMinor);
- os.write(encodingMajor);
- os.write(encodingMinor);
- os.write(validateConnectionMsg);
- os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
- os.write(headerSize); // Message size.
- os.i = os.b.begin();
- traceHeader("sending validate connection", os, _logger, _traceLevels);
- try
- {
- _transceiver->initialize(timeout);
- _transceiver->write(os, timeout);
- }
- catch(const TimeoutException&)
- {
- throw ConnectTimeoutException(__FILE__, __LINE__);
- }
- }
- else
- {
- BasicStream is(_instance.get());
- is.b.resize(headerSize);
- is.i = is.b.begin();
- try
- {
- _transceiver->initialize(timeout);
- _transceiver->read(is, timeout);
- }
- catch(const TimeoutException&)
- {
- throw ConnectTimeoutException(__FILE__, __LINE__);
- }
- assert(is.i == is.b.end());
- is.i = is.b.begin();
- Byte m[4];
- is.read(m[0]);
- is.read(m[1]);
- is.read(m[2]);
- is.read(m[3]);
- if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
- {
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
- throw ex;
- }
- Byte pMajor;
- Byte pMinor;
- is.read(pMajor);
- is.read(pMinor);
- if(pMajor != protocolMajor)
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- Byte eMajor;
- Byte eMinor;
- is.read(eMajor);
- is.read(eMinor);
- if(eMajor != encodingMajor)
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
- Byte messageType;
- is.read(messageType);
- if(messageType != validateConnectionMsg)
- {
- throw ConnectionNotValidatedException(__FILE__, __LINE__);
- }
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl())
+ {
+ //
+ // In thread per connection mode, this connection's thread
+ // will take care of connection validation. Therefore all we
+ // have to do here is to wait until this thread has completed
+ // validation.
+ //
+ while(_state == StateNotValidated)
+ {
+ wait();
+ }
+
+ if(_state >= StateClosing)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ return;
+ }
+
+ //
+ // The connection might already be closed (e.g.: the communicator
+ // was destroyed or object adapter deactivated.)
+ //
+ assert(_state == StateNotValidated || _state == StateClosed);
+ if(_state == StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ if(_adapter)
+ {
+ active = true; // The server side has the active role for connection validation.
+ }
+ else
+ {
+ active = false; // The client side has the passive role for connection validation.
+ }
+ }
+
+ try
+ {
+ Int timeout;
+ if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
+ {
+ timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
+
+ if(active)
+ {
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ BasicStream os(_instance.get());
+ os.write(magic[0]);
+ os.write(magic[1]);
+ os.write(magic[2]);
+ os.write(magic[3]);
+ os.write(protocolMajor);
+ os.write(protocolMinor);
+ os.write(encodingMajor);
+ os.write(encodingMinor);
+ os.write(validateConnectionMsg);
+ os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ os.write(headerSize); // Message size.
+ os.i = os.b.begin();
+ traceHeader("sending validate connection", os, _logger, _traceLevels);
+ try
+ {
+ _transceiver->initialize(timeout);
+ _transceiver->write(os, timeout);
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+ }
+ else
+ {
+ BasicStream is(_instance.get());
+ is.b.resize(headerSize);
+ is.i = is.b.begin();
+ try
+ {
+ _transceiver->initialize(timeout);
+ _transceiver->read(is, timeout);
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+ assert(is.i == is.b.end());
+ is.i = is.b.begin();
+ Byte m[4];
+ is.read(m[0]);
+ is.read(m[1]);
+ is.read(m[2]);
+ is.read(m[3]);
+ if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
+ throw ex;
+ }
+ Byte pMajor;
+ Byte pMinor;
+ is.read(pMajor);
+ is.read(pMinor);
+ if(pMajor != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ is.read(eMajor);
+ is.read(eMinor);
+ if(eMajor != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ is.read(messageType);
+ if(messageType != validateConnectionMsg)
+ {
+ throw ConnectionNotValidatedException(__FILE__, __LINE__);
+ }
Byte compress;
is.read(compress); // Ignore compression status for validate connection.
- Int size;
- is.read(size);
- if(size != headerSize)
- {
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- traceHeader("received validate connection", is, _logger, _traceLevels);
- }
- }
- catch(const LocalException& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
- }
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
- }
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
+ Int size;
+ is.read(size);
+ if(size != headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ traceHeader("received validate connection", is, _logger, _traceLevels);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+ }
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
}
}
@@ -234,7 +234,7 @@ Ice::ConnectionI::activate()
while(_state == StateNotValidated)
{
- wait();
+ wait();
}
setState(StateActive);
@@ -247,7 +247,7 @@ Ice::ConnectionI::hold()
while(_state == StateNotValidated)
{
- wait();
+ wait();
}
setState(StateHolding);
@@ -260,17 +260,17 @@ Ice::ConnectionI::destroy(DestructionReason reason)
switch(reason)
{
- case ObjectAdapterDeactivated:
- {
- setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
- break;
- }
+ case ObjectAdapterDeactivated:
+ {
+ setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
+ break;
+ }
- case CommunicatorDestroyed:
- {
- setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
- break;
- }
+ case CommunicatorDestroyed:
+ {
+ setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
+ break;
+ }
}
}
@@ -281,23 +281,23 @@ Ice::ConnectionI::close(bool force)
if(force)
{
- setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
}
else
{
- //
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise, the
- // CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the server
- // has processed them or not.
- //
- while(!_requests.empty() || !_asyncRequests.empty())
- {
- wait();
- }
-
- setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ //
+ // If we do a graceful shutdown, then we wait until all
+ // outstanding requests have been completed. Otherwise, the
+ // CloseConnectionException will cause all outstanding
+ // requests to be retried, regardless of whether the server
+ // has processed them or not.
+ //
+ while(!_requests.empty() || !_asyncRequests.empty())
+ {
+ wait();
+ }
+
+ setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
@@ -320,37 +320,37 @@ Ice::ConnectionI::isFinished() const
IceUtil::ThreadPtr threadPerConnection;
{
- //
- // We can use trylock here, because as long as there are still
- // threads operating in this connection object, connection
- // destruction is considered as not yet finished.
- //
- IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
-
- if(!sync.acquired())
- {
- return false;
- }
+ //
+ // We can use trylock here, because as long as there are still
+ // threads operating in this connection object, connection
+ // destruction is considered as not yet finished.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
+
+ if(!sync.acquired())
+ {
+ return false;
+ }
- if(_transceiver || _dispatchCount != 0)
- {
- return false;
- }
+ if(_transceiver || _dispatchCount != 0)
+ {
+ return false;
+ }
- if(_thread && _thread->isAlive())
- {
- return false;
- }
+ if(_thread && _thread->isAlive())
+ {
+ return false;
+ }
- assert(_state == StateClosed);
+ assert(_state == StateClosed);
- threadPerConnection = _thread;
- _thread = 0;
+ threadPerConnection = _thread;
+ _thread = 0;
}
if(threadPerConnection)
{
- threadPerConnection->getThreadControl().join();
+ threadPerConnection->getThreadControl().join();
}
return true;
@@ -363,8 +363,8 @@ Ice::ConnectionI::throwException() const
if(_exception.get())
{
- assert(_state >= StateClosing);
- _exception->ice_throw();
+ assert(_state >= StateClosing);
+ _exception->ice_throw();
}
}
@@ -375,7 +375,7 @@ Ice::ConnectionI::waitUntilHolding() const
while(_state < StateHolding || _dispatchCount > 0)
{
- wait();
+ wait();
}
}
@@ -385,76 +385,76 @@ Ice::ConnectionI::waitUntilFinished()
IceUtil::ThreadPtr threadPerConnection;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // We wait indefinitely until connection closing has been
- // initiated. We also wait indefinitely until all outstanding
- // requests are completed. Otherwise we couldn't guarantee
- // that there are no outstanding calls when deactivate() is
- // called on the servant locators.
- //
- while(_state < StateClosing || _dispatchCount > 0)
- {
- wait();
- }
-
- //
- // Now we must wait until close() has been called on the
- // transceiver.
- //
- while(_transceiver)
- {
- if(_state != StateClosed && _endpoint->timeout() >= 0)
- {
- IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
- IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now();
-
- if(waitTime > IceUtil::Time())
- {
- //
- // We must wait a bit longer until we close this
- // connection.
- //
- if(!timedWait(waitTime))
- {
- setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
- }
- }
- else
- {
- //
- // We already waited long enough, so let's close this
- // connection!
- //
- setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
- }
-
- //
- // No return here, we must still wait until close() is
- // called on the _transceiver.
- //
- }
- else
- {
- wait();
- }
- }
-
- assert(_state == StateClosed);
-
- threadPerConnection = _thread;
- _thread = 0;
-
- //
- // Clear the OA. See bug 1673 for the details of why this is necessary.
- //
- _adapter = 0;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
+ {
+ wait();
+ }
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver)
+ {
+ if(_state != StateClosed && _endpoint->timeout() >= 0)
+ {
+ IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout());
+ IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now();
+
+ if(waitTime > IceUtil::Time())
+ {
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ if(!timedWait(waitTime))
+ {
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+ }
+ else
+ {
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+
+ //
+ // No return here, we must still wait until close() is
+ // called on the _transceiver.
+ //
+ }
+ else
+ {
+ wait();
+ }
+ }
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _thread;
+ _thread = 0;
+
+ //
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
+ //
+ _adapter = 0;
}
if(threadPerConnection)
{
- threadPerConnection->getThreadControl().join();
+ threadPerConnection->getThreadControl().join();
}
}
@@ -465,12 +465,12 @@ Ice::ConnectionI::monitor()
if(!sync.acquired())
{
- return;
+ return;
}
if(_state != StateActive)
{
- return;
+ return;
}
//
@@ -478,11 +478,11 @@ Ice::ConnectionI::monitor()
//
for(map<Int, AsyncRequest>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
{
- if(p->second.t > IceUtil::Time() && p->second.t <= IceUtil::Time::now())
- {
- setState(StateClosed, TimeoutException(__FILE__, __LINE__));
- return;
- }
+ if(p->second.t > IceUtil::Time() && p->second.t <= IceUtil::Time::now())
+ {
+ setState(StateClosed, TimeoutException(__FILE__, __LINE__));
+ return;
+ }
}
//
@@ -493,11 +493,11 @@ Ice::ConnectionI::monitor()
!_batchStreamInUse && _batchStream.b.empty() &&
_dispatchCount == 0)
{
- if(IceUtil::Time::now() >= _acmAbsoluteTimeout)
- {
- setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
- return;
- }
+ if(IceUtil::Time::now() >= _acmAbsoluteTimeout)
+ {
+ setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
+ return;
+ }
}
}
@@ -507,162 +507,162 @@ Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress)
Int requestId;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams.
-
- if(_exception.get())
- {
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw LocalExceptionWrapper(*_exception.get(), true);
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Only add to the request map if this is a twoway call.
- //
- if(out)
- {
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams.
+
+ if(_exception.get())
+ {
+ //
+ // If the connection is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
+ throw LocalExceptionWrapper(*_exception.get(), true);
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out)
+ {
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
- //
- // Add to the requests map.
- //
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
- }
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
- }
+ //
+ // Add to the requests map.
+ //
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes.
- {
- //
- // Message compressed. Request compressed response, if any.
- //
- os->b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(*os, cstream);
-
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending request", *os, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
- if(compress)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- os->b[9] = 1;
- }
-
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ os->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance.get());
+ doCompress(*os, cstream);
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending request", *os, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ if(compress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ os->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(os->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
-
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
- }
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending request", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
-
- if(out)
- {
- //
- // If the request has already been removed from the
- // request map, we are out of luck. It would mean that
- // finished() has been called already, and therefore the
- // exception has been set using the Outgoing::finished()
- // callback. In this case, we cannot throw the exception
- // here, because we must not both raise an exception and
- // have Outgoing::finished() called with an
- // exception. This means that in some rare cases, a
- // request will not be retried even though it could. But I
- // honestly don't know how I could avoid this, without a
- // very elaborate and complex design, which would be bad
- // for performance.
- //
- map<Int, Outgoing*>::iterator p = _requests.find(requestId);
- if(p != _requests.end())
- {
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
-
- _exception->ice_throw();
- }
- }
- else
- {
- _exception->ice_throw();
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+
+ if(out)
+ {
+ //
+ // If the request has already been removed from the
+ // request map, we are out of luck. It would mean that
+ // finished() has been called already, and therefore the
+ // exception has been set using the Outgoing::finished()
+ // callback. In this case, we cannot throw the exception
+ // here, because we must not both raise an exception and
+ // have Outgoing::finished() called with an
+ // exception. This means that in some rare cases, a
+ // request will not be retried even though it could. But I
+ // honestly don't know how I could avoid this, without a
+ // very elaborate and complex design, which would be bad
+ // for performance.
+ //
+ map<Int, Outgoing*>::iterator p = _requests.find(requestId);
+ if(p != _requests.end())
+ {
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+
+ _exception->ice_throw();
+ }
+ }
+ else
+ {
+ _exception->ice_throw();
+ }
}
}
@@ -672,156 +672,156 @@ Ice::ConnectionI::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPtr& out,
Int requestId;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway.
-
- if(_exception.get())
- {
- //
- // If the exception is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw LocalExceptionWrapper(*_exception.get(), true);
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway.
+
+ if(_exception.get())
+ {
+ //
+ // If the exception is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
+ throw LocalExceptionWrapper(*_exception.get(), true);
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
-
- //
- // Add to the async requests map.
- //
- struct AsyncRequest asyncRequest;
- asyncRequest.p = out;
- if(_endpoint->timeout() > 0)
- {
- asyncRequest.t = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_endpoint->timeout());
- }
- _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
- pair<const Int, AsyncRequest>(requestId, asyncRequest));
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
- }
+
+ //
+ // Add to the async requests map.
+ //
+ struct AsyncRequest asyncRequest;
+ asyncRequest.p = out;
+ if(_endpoint->timeout() > 0)
+ {
+ asyncRequest.t = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_endpoint->timeout());
+ }
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
+ pair<const Int, AsyncRequest>(requestId, asyncRequest));
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
}
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes.
- {
- //
- // Message compressed. Request compressed response, if any.
- //
- os->b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(*os, cstream);
-
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
- if(compress)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- os->b[9] = 1;
- }
-
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ os->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance.get());
+ doCompress(*os, cstream);
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ if(compress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ os->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(os->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
- //
- // Send the request.
- //
- os->i = os->b.begin();
- traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
- }
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
-
- //
- // If the request has already been removed from the async
- // request map, we are out of luck. It would mean that
- // finished() has been called already, and therefore the
- // exception has been set using the
- // OutgoingAsync::__finished() callback. In this case, we
- // cannot throw the exception here, because we must not both
- // raise an exception and have OutgoingAsync::__finished()
- // called with an exception. This means that in some rare
- // cases, a request will not be retried even though it
- // could. But I honestly don't know how I could avoid this,
- // without a very elaborate and complex design, which would be
- // bad for performance.
- //
- map<Int, AsyncRequest>::iterator p = _asyncRequests.find(requestId);
- if(p != _asyncRequests.end())
- {
- if(p == _asyncRequestsHint)
- {
- _asyncRequests.erase(p++);
- _asyncRequestsHint = p;
- }
- else
- {
- _asyncRequests.erase(p);
- }
-
- _exception->ice_throw();
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+
+ //
+ // If the request has already been removed from the async
+ // request map, we are out of luck. It would mean that
+ // finished() has been called already, and therefore the
+ // exception has been set using the
+ // OutgoingAsync::__finished() callback. In this case, we
+ // cannot throw the exception here, because we must not both
+ // raise an exception and have OutgoingAsync::__finished()
+ // called with an exception. This means that in some rare
+ // cases, a request will not be retried even though it
+ // could. But I honestly don't know how I could avoid this,
+ // without a very elaborate and complex design, which would be
+ // bad for performance.
+ //
+ map<Int, AsyncRequest>::iterator p = _asyncRequests.find(requestId);
+ if(p != _asyncRequests.end())
+ {
+ if(p == _asyncRequestsHint)
+ {
+ _asyncRequests.erase(p++);
+ _asyncRequestsHint = p;
+ }
+ else
+ {
+ _asyncRequests.erase(p);
+ }
+
+ _exception->ice_throw();
+ }
}
}
@@ -835,12 +835,12 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
//
while(_batchStreamInUse && !_exception.get())
{
- wait();
+ wait();
}
if(_exception.get())
{
- _exception->ice_throw();
+ _exception->ice_throw();
}
assert(_state > StateNotValidated);
@@ -848,15 +848,15 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
if(_batchStream.b.empty())
{
- try
- {
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- ex.ice_throw();
- }
+ try
+ {
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.ice_throw();
+ }
}
_batchStreamInUse = true;
@@ -884,41 +884,41 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
_batchStream.swap(*os);
if(_batchAutoFlush)
- {
- IceUtil::Mutex::Lock sendSync(_sendMutex);
- if(!_transceiver)
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- //
- // Throw memory limit exception if the first message added causes us to
- // go over limit. Otherwise put aside the marshalled message that caused
- // limit to be exceeded and rollback stream to the marker.
- //
- try
- {
- _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax());
- }
- catch(const Ice::Exception&)
- {
- if(_batchRequestNum == 0)
- {
- resetBatch(true);
- throw;
- }
- vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
- _batchStream.b.resize(_batchMarker);
- autoflush = true;
- }
- }
-
- if(!autoflush)
- {
- //
- // Increment the number of requests in the batch.
- //
+ {
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ if(!_transceiver)
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ //
+ // Throw memory limit exception if the first message added causes us to
+ // go over limit. Otherwise put aside the marshalled message that caused
+ // limit to be exceeded and rollback stream to the marker.
+ //
+ try
+ {
+ _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax());
+ }
+ catch(const Ice::Exception&)
+ {
+ if(_batchRequestNum == 0)
+ {
+ resetBatch(true);
+ throw;
+ }
+ vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
+ _batchStream.b.resize(_batchMarker);
+ autoflush = true;
+ }
+ }
+
+ if(!autoflush)
+ {
+ //
+ // Increment the number of requests in the batch.
+ //
++_batchRequestNum;
//
@@ -927,7 +927,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
//
if(compress)
{
- _batchRequestCompress = true;
+ _batchRequestCompress = true;
}
//
@@ -942,41 +942,41 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
if(autoflush)
{
//
- // We have to keep _batchStreamInUse set until after we insert the
- // saved marshalled data into a new stream.
- //
+ // We have to keep _batchStreamInUse set until after we insert the
+ // saved marshalled data into a new stream.
+ //
flushBatchRequestsInternal(true);
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Throw memory limit exception if the message that caused us to go over
- // limit causes us to exceed the limit by itself.
- //
+ //
+ // Throw memory limit exception if the message that caused us to go over
+ // limit causes us to exceed the limit by itself.
+ //
if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax())
- {
- resetBatch(true);
- throw MemoryLimitException(__FILE__, __LINE__);
- }
-
- //
- // Start a new batch with the last message that caused us to
- // go over the limit.
- //
- try
- {
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- ex.ice_throw();
- }
+ {
+ resetBatch(true);
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+
+ //
+ // Start a new batch with the last message that caused us to
+ // go over the limit.
+ //
+ try
+ {
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.ice_throw();
+ }
if(compress)
{
- _batchRequestCompress = true;
+ _batchRequestCompress = true;
}
//
@@ -984,7 +984,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
//
++_batchRequestNum;
_batchStreamInUse = false;
- notifyAll();
+ notifyAll();
}
}
@@ -1011,133 +1011,133 @@ void
Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse)
{
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(!ignoreInUse)
- {
- while(_batchStreamInUse && !_exception.get())
- {
- wait();
- }
- }
-
- if(_exception.get())
- {
- _exception->ice_throw();
- }
-
- if(_batchStream.b.empty())
- {
- return; // Nothing to do.
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- _batchStream.i = _batchStream.b.begin();
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
- }
-
- //
- // Prevent that new batch requests are added while we are
- // flushing.
- //
- _batchStreamInUse = true;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(!ignoreInUse)
+ {
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ if(_batchStream.b.empty())
+ {
+ return; // Nothing to do.
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ _batchStream.i = _batchStream.b.begin();
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+
+ //
+ // Prevent that new batch requests are added while we are
+ // flushing.
+ //
+ _batchStreamInUse = true;
}
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
- //
- // Fill in the number of requests in the batch.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
-
- if(_batchRequestCompress && _batchStream.b.size() >= 100) // Only compress messages larger than 100 bytes.
- {
- //
- // Message compressed. Request compressed response, if any.
- //
- _batchStream.b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(_batchStream, cstream);
-
- //
- // Send the batch request.
- //
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
- if(_batchRequestCompress)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- _batchStream.b[9] = 1;
- }
-
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(_batchStream.b.size());
- const Byte* q = reinterpret_cast<const Byte*>(&sz);
+
+ if(_batchRequestCompress && _batchStream.b.size() >= 100) // Only compress messages larger than 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ _batchStream.b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance.get());
+ doCompress(_batchStream, cstream);
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ if(_batchRequestCompress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ _batchStream.b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(_batchStream.b.size());
+ const Byte* q = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(q, q + sizeof(Int), _batchStream.b.begin() + 10);
+ reverse_copy(q, q + sizeof(Int), _batchStream.b.begin() + 10);
#else
- copy(q, q + sizeof(Int), _batchStream.b.begin() + 10);
+ copy(q, q + sizeof(Int), _batchStream.b.begin() + 10);
#endif
-
- //
- // Send the batch request.
- //
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver->write(_batchStream, _endpoint->timeout());
- }
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver->write(_batchStream, _endpoint->timeout());
+ }
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- assert(_exception.get());
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
- //
- // Since batch requests are all oneways (or datagrams), we
- // must report the exception to the caller.
- //
- _exception->ice_throw();
+ //
+ // Since batch requests are all oneways (or datagrams), we
+ // must report the exception to the caller.
+ //
+ _exception->ice_throw();
}
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Reset the batch stream, and notify that flushing is over.
- //
- resetBatch(!ignoreInUse);
+ //
+ // Reset the batch stream, and notify that flushing is over.
+ //
+ resetBatch(!ignoreInUse);
}
}
@@ -1167,100 +1167,100 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
{
try
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(!_transceiver) // Has the transceiver already been closed?
- {
- assert(_exception.get());
- _exception->ice_throw(); // The exception is immutable at this point.
- }
-
- //
- // Only compress if compression was requested by the client,
- // and if the message is larger than 100 bytes.
- //
- if(compressFlag > 0 && os->b.size() >= 100)
- {
- //
- // Message compressed. Request compressed response, if any.
- //
- os->b[9] = 2;
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(*os, cstream);
-
- //
- // Send the reply.
- //
- os->i = os->b.begin();
- traceReply("sending reply", *os, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
- if(compressFlag > 0)
- {
- //
- // Message not compressed. Request compressed response, if any.
- //
- os->b[9] = 1;
- }
-
- //
- // No compression, just fill in the message size.
- //
- Int sz = static_cast<Int>(os->b.size());
- const Byte* p = reinterpret_cast<const Byte*>(&sz);
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ //
+ // Only compress if compression was requested by the client,
+ // and if the message is larger than 100 bytes.
+ //
+ if(compressFlag > 0 && os->b.size() >= 100)
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ os->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance.get());
+ doCompress(*os, cstream);
+
+ //
+ // Send the reply.
+ //
+ os->i = os->b.begin();
+ traceReply("sending reply", *os, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ if(compressFlag > 0)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ os->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(os->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
- copy(p, p + sizeof(Int), os->b.begin() + 10);
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
-
- //
- // Send the reply.
- //
- os->i = os->b.begin();
- traceReply("sending reply", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
- }
+
+ //
+ // Send the reply.
+ //
+ os->i = os->b.begin();
+ traceReply("sending reply", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
}
catch(const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- setState(StateClosed, ex);
- }
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(_state > StateNotValidated);
-
- try
- {
- if(--_dispatchCount == 0)
- {
- notifyAll();
- }
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
- }
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
- }
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ }
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
}
}
@@ -1273,19 +1273,19 @@ Ice::ConnectionI::sendNoResponse()
try
{
- if(--_dispatchCount == 0)
- {
- notifyAll();
- }
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
- }
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
}
catch(const LocalException& ex)
{
- setState(StateClosed, ex);
+ setState(StateClosed, ex);
}
}
@@ -1308,7 +1308,7 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
if(_exception.get())
{
- _exception->ice_throw();
+ _exception->ice_throw();
}
assert(_state < StateClosing);
@@ -1317,15 +1317,15 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
if(_adapter)
{
- _servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager();
- if(!_servantManager)
- {
- _adapter = 0;
- }
+ _servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager();
+ if(!_servantManager)
+ {
+ _adapter = 0;
+ }
}
else
{
- _servantManager = 0;
+ _servantManager = 0;
}
//
@@ -1351,7 +1351,7 @@ Ice::ConnectionI::createProxy(const Identity& ident) const
vector<ConnectionIPtr> connections;
connections.push_back(const_cast<ConnectionI*>(this));
ReferencePtr ref = _instance->referenceFactory()->create(ident, _instance->getDefaultContext(),
- "", Reference::ModeTwoway, connections);
+ "", Reference::ModeTwoway, connections);
return _instance->proxyFactory()->referenceToProxy(ref);
}
@@ -1396,28 +1396,28 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
OutgoingAsyncPtr outAsync;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // We must promote within the synchronization, otherwise there
- // could be various race conditions with close connection
- // messages and other messages.
- //
- threadPool->promoteFollower();
+ //
+ // We must promote within the synchronization, otherwise there
+ // could be various race conditions with close connection
+ // messages and other messages.
+ //
+ threadPool->promoteFollower();
- if(_state != StateClosed)
- {
- parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
- }
+ if(_state != StateClosed)
+ {
+ parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
+ }
- //
- // parseMessage() can close the connection, so we must check
- // for closed state again.
- //
- if(_state == StateClosed)
- {
- return;
- }
+ //
+ // parseMessage() can close the connection, so we must check
+ // for closed state again.
+ //
+ if(_state == StateClosed)
+ {
+ return;
+ }
}
//
@@ -1426,7 +1426,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
//
if(outAsync)
{
- outAsync->__finished(stream);
+ outAsync->__finished(stream);
}
//
@@ -1450,57 +1450,57 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
map<Int, AsyncRequest> asyncRequests;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- --_finishedCount;
- assert(threadPool.get() == _threadPool.get());
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ --_finishedCount;
+ assert(threadPool.get() == _threadPool.get());
- if(_finishedCount == 0 && _state == StateClosed)
- {
- _threadPool->decFdsInUse();
+ if(_finishedCount == 0 && _state == StateClosed)
+ {
+ _threadPool->decFdsInUse();
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
- try
- {
- _transceiver->close();
- }
- catch(const LocalException& ex)
- {
- localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
- }
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException& ex)
+ {
+ localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ }
- _transceiver = 0;
- notifyAll();
- }
+ _transceiver = 0;
+ notifyAll();
+ }
- if(_state == StateClosed || _state == StateClosing)
- {
- requests.swap(_requests);
- _requestsHint = _requests.end();
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests.swap(_requests);
+ _requestsHint = _requests.end();
- asyncRequests.swap(_asyncRequests);
- _asyncRequestsHint = _asyncRequests.end();
- }
+ asyncRequests.swap(_asyncRequests);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
}
for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
{
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
}
for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
{
- q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
+ q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
}
if(localEx.get())
{
- localEx->ice_throw();
+ localEx->ice_throw();
}
}
@@ -1524,13 +1524,13 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum)
if(invokeNum > 0)
{
- assert(_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert(_dispatchCount >= 0);
- if(_dispatchCount == 0)
- {
- notifyAll();
- }
+ assert(_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert(_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ notifyAll();
+ }
}
}
@@ -1565,9 +1565,9 @@ Ice::ConnectionI::getTransceiver() const
}
Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
- const TransceiverPtr& transceiver,
- const EndpointIPtr& endpoint,
- const ObjectAdapterPtr& adapter,
+ const TransceiverPtr& transceiver,
+ const EndpointIPtr& endpoint,
+ const ObjectAdapterPtr& adapter,
bool threadPerConnection,
size_t threadPerConnectionStackSize) :
EventHandler(instance),
@@ -1601,92 +1601,92 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
Int& acmTimeout = const_cast<Int&>(_acmTimeout);
if(_endpoint->datagram())
{
- acmTimeout = 0;
+ acmTimeout = 0;
}
else
{
- if(_adapter)
- {
- acmTimeout = _instance->serverACM();
- }
- else
- {
- acmTimeout = _instance->clientACM();
- }
+ if(_adapter)
+ {
+ acmTimeout = _instance->serverACM();
+ }
+ else
+ {
+ acmTimeout = _instance->clientACM();
+ }
}
int& compressionLevel = const_cast<int&>(_compressionLevel);
compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault(
- "Ice.Compression.Level", 1);
+ "Ice.Compression.Level", 1);
if(compressionLevel < 1)
{
- compressionLevel = 1;
+ compressionLevel = 1;
}
else if(compressionLevel > 9)
{
- compressionLevel = 9;
+ compressionLevel = 9;
}
ObjectAdapterI* adapterImpl = _adapter ? dynamic_cast<ObjectAdapterI*>(_adapter.get()) : 0;
if(adapterImpl)
{
- _servantManager = adapterImpl->getServantManager();
+ _servantManager = adapterImpl->getServantManager();
}
__setNoDelete(true);
try
{
- if(!threadPerConnection)
- {
- //
- // Only set _threadPool if we really need it, i.e., if we are
- // not in thread per connection mode. Thread pools have lazy
- // initialization in Instance, and we don't want them to be
- // created if they are not needed.
- //
- if(adapterImpl)
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool();
- }
- else
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
- }
- _threadPool->incFdsInUse();
- }
- else
- {
- //
- // If we are in thread per connection mode, create the
- // thread for this connection.
- //
- _thread = new ThreadPerConnection(this);
- _thread->start(threadPerConnectionStackSize);
- }
+ if(!threadPerConnection)
+ {
+ //
+ // Only set _threadPool if we really need it, i.e., if we are
+ // not in thread per connection mode. Thread pools have lazy
+ // initialization in Instance, and we don't want them to be
+ // created if they are not needed.
+ //
+ if(adapterImpl)
+ {
+ const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool();
+ }
+ else
+ {
+ const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
+ }
+ _threadPool->incFdsInUse();
+ }
+ else
+ {
+ //
+ // If we are in thread per connection mode, create the
+ // thread for this connection.
+ //
+ _thread = new ThreadPerConnection(this);
+ _thread->start(threadPerConnectionStackSize);
+ }
}
catch(const IceUtil::Exception& ex)
{
- {
- Error out(_logger);
- if(threadPerConnection)
- {
- out << "cannot create thread for connection:\n" << ex;
- }
- // Otherwise with thread pool the thread pool itself
- // prints a warning if the threads cannot be created.
- }
-
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- __setNoDelete(false);
- ex.ice_throw();
+ {
+ Error out(_logger);
+ if(threadPerConnection)
+ {
+ out << "cannot create thread for connection:\n" << ex;
+ }
+ // Otherwise with thread pool the thread pool itself
+ // prints a warning if the threads cannot be created.
+ }
+
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ __setNoDelete(false);
+ ex.ice_throw();
}
__setNoDelete(false);
}
@@ -1710,40 +1710,40 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
if(_state == state) // Don't switch twice.
{
- return;
+ return;
}
if(!_exception.get())
{
- //
- // If we are in closed state, an exception must be set.
- //
- assert(_state != StateClosed);
-
- _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
-
- if(_warn)
- {
- //
- // We don't warn if we are not validated.
- //
- if(_state > StateNotValidated)
- {
- //
- // Don't warn about certain expected exceptions.
- //
- if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
- dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
- (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
- {
- Warning out(_logger);
- out << "connection exception:\n" << *_exception.get() << '\n' << _desc;
- }
- }
- }
+ //
+ // If we are in closed state, an exception must be set.
+ //
+ assert(_state != StateClosed);
+
+ _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+
+ if(_warn)
+ {
+ //
+ // We don't warn if we are not validated.
+ //
+ if(_state > StateNotValidated)
+ {
+ //
+ // Don't warn about certain expected exceptions.
+ //
+ if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
+ dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
+ (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
+ {
+ Warning out(_logger);
+ out << "connection exception:\n" << *_exception.get() << '\n' << _desc;
+ }
+ }
+ }
}
//
@@ -1763,7 +1763,7 @@ Ice::ConnectionI::setState(State state)
//
if(_endpoint->datagram() && state == StateClosing)
{
- state = StateClosed;
+ state = StateClosed;
}
//
@@ -1771,133 +1771,133 @@ Ice::ConnectionI::setState(State state)
//
if(_state == StateNotValidated && state == StateClosing)
{
- state = StateClosed;
+ state = StateClosed;
}
if(_state == state) // Don't switch twice.
{
- return;
+ return;
}
switch(state)
{
- case StateNotValidated:
- {
- assert(false);
- break;
- }
+ case StateNotValidated:
+ {
+ assert(false);
+ break;
+ }
- case StateActive:
- {
- //
+ case StateActive:
+ {
+ //
// Can only switch from holding or not validated to
// active.
- //
- if(_state != StateHolding && _state != StateNotValidated)
- {
- return;
- }
- if(!_threadPerConnection)
- {
- registerWithPool();
- }
- break;
- }
-
- case StateHolding:
- {
- //
- // Can only switch from active or not validated to
- // holding.
- //
- if(_state != StateActive && _state != StateNotValidated)
- {
- return;
- }
- if(!_threadPerConnection)
- {
- unregisterWithPool();
- }
- break;
- }
-
- case StateClosing:
- {
- //
- // Can't change back from closed.
- //
- if(_state == StateClosed)
- {
- return;
- }
- if(!_threadPerConnection)
- {
- registerWithPool(); // We need to continue to read in closing state.
- }
- break;
- }
-
- case StateClosed:
- {
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we
- // shutdown both for reading and writing. This will
- // unblock and read call with an exception. The thread
- // per connection then closes the transceiver.
- //
- _transceiver->shutdownReadWrite();
- }
- else if(_state == StateNotValidated)
- {
- //
- // If we change from not validated we can close right
- // away.
- //
- assert(!_registeredWithPool);
-
- _threadPool->decFdsInUse();
-
- //
- // We must make sure that nobody is sending when we
- // close the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- _transceiver = 0;
- //notifyAll(); // We notify already below.
- }
- else
- {
- //
- // Otherwise we first must make sure that we are
- // registered, then we unregister, and let finished()
- // do the close.
- //
- registerWithPool();
- unregisterWithPool();
-
- //
- // We must prevent any further writes when _state == StateClosed.
- // However, functions such as sendResponse cannot acquire the main
- // mutex in order to check _state. Therefore we shut down the write
- // end of the transceiver, which causes subsequent write attempts
- // to fail with an exception.
- //
- _transceiver->shutdownWrite();
- }
- break;
- }
+ //
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
+ }
+ if(!_threadPerConnection)
+ {
+ registerWithPool();
+ }
+ break;
+ }
+
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return;
+ }
+ if(!_threadPerConnection)
+ {
+ unregisterWithPool();
+ }
+ break;
+ }
+
+ case StateClosing:
+ {
+ //
+ // Can't change back from closed.
+ //
+ if(_state == StateClosed)
+ {
+ return;
+ }
+ if(!_threadPerConnection)
+ {
+ registerWithPool(); // We need to continue to read in closing state.
+ }
+ break;
+ }
+
+ case StateClosed:
+ {
+ if(_threadPerConnection)
+ {
+ //
+ // If we are in thread per connection mode, we
+ // shutdown both for reading and writing. This will
+ // unblock and read call with an exception. The thread
+ // per connection then closes the transceiver.
+ //
+ _transceiver->shutdownReadWrite();
+ }
+ else if(_state == StateNotValidated)
+ {
+ //
+ // If we change from not validated we can close right
+ // away.
+ //
+ assert(!_registeredWithPool);
+
+ _threadPool->decFdsInUse();
+
+ //
+ // We must make sure that nobody is sending when we
+ // close the transceiver.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ _transceiver = 0;
+ //notifyAll(); // We notify already below.
+ }
+ else
+ {
+ //
+ // Otherwise we first must make sure that we are
+ // registered, then we unregister, and let finished()
+ // do the close.
+ //
+ registerWithPool();
+ unregisterWithPool();
+
+ //
+ // We must prevent any further writes when _state == StateClosed.
+ // However, functions such as sendResponse cannot acquire the main
+ // mutex in order to check _state. Therefore we shut down the write
+ // end of the transceiver, which causes subsequent write attempts
+ // to fail with an exception.
+ //
+ _transceiver->shutdownWrite();
+ }
+ break;
+ }
}
//
@@ -1909,14 +1909,14 @@ Ice::ConnectionI::setState(State state)
ConnectionMonitorPtr connectionMonitor = _instance->connectionMonitor();
if(connectionMonitor)
{
- if(state == StateActive)
- {
- connectionMonitor->add(this);
- }
- else if(_state == StateActive)
- {
- connectionMonitor->remove(this);
- }
+ if(state == StateActive)
+ {
+ connectionMonitor->add(this);
+ }
+ else if(_state == StateActive)
+ {
+ connectionMonitor->remove(this);
+ }
}
_state = state;
@@ -1926,14 +1926,14 @@ Ice::ConnectionI::setState(State state)
if(_state == StateClosing && _dispatchCount == 0)
{
- try
- {
- initiateShutdown();
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- }
+ try
+ {
+ initiateShutdown();
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
}
}
@@ -1945,39 +1945,39 @@ Ice::ConnectionI::initiateShutdown() const
if(!_endpoint->datagram())
{
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- //
- // Before we shut down, we send a close connection message.
- //
- BasicStream os(_instance.get());
- os.write(magic[0]);
- os.write(magic[1]);
- os.write(magic[2]);
- os.write(magic[3]);
- os.write(protocolMajor);
- os.write(protocolMinor);
- os.write(encodingMajor);
- os.write(encodingMinor);
- os.write(closeConnectionMsg);
- os.write((Byte)1); // Compression status: compression supported but not used.
- os.write(headerSize); // Message size.
-
- //
- // Send the message.
- //
- os.i = os.b.begin();
- traceHeader("sending close connection", os, _logger, _traceLevels);
- _transceiver->write(os, _endpoint->timeout());
- //
- // The CloseConnection message should be sufficient. Closing the write
- // end of the socket is probably an artifact of how things were done
- // in IIOP. In fact, shutting down the write end of the socket causes
- // problems on Windows by preventing the peer from using the socket.
- // For example, the peer is no longer able to continue writing a large
- // message after the socket is shutdown.
- //
- //_transceiver->shutdownWrite();
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ //
+ // Before we shut down, we send a close connection message.
+ //
+ BasicStream os(_instance.get());
+ os.write(magic[0]);
+ os.write(magic[1]);
+ os.write(magic[2]);
+ os.write(magic[3]);
+ os.write(protocolMajor);
+ os.write(protocolMinor);
+ os.write(encodingMajor);
+ os.write(encodingMinor);
+ os.write(closeConnectionMsg);
+ os.write((Byte)1); // Compression status: compression supported but not used.
+ os.write(headerSize); // Message size.
+
+ //
+ // Send the message.
+ //
+ os.i = os.b.begin();
+ traceHeader("sending close connection", os, _logger, _traceLevels);
+ _transceiver->write(os, _endpoint->timeout());
+ //
+ // The CloseConnection message should be sufficient. Closing the write
+ // end of the socket is probably an artifact of how things were done
+ // in IIOP. In fact, shutting down the write end of the socket causes
+ // problems on Windows by preventing the peer from using the socket.
+ // For example, the peer is no longer able to continue writing a large
+ // message after the socket is shutdown.
+ //
+ //_transceiver->shutdownWrite();
}
}
@@ -1988,8 +1988,8 @@ Ice::ConnectionI::registerWithPool()
if(!_registeredWithPool)
{
- _threadPool->_register(_transceiver->fd(), this);
- _registeredWithPool = true;
+ _threadPool->_register(_transceiver->fd(), this);
+ _registeredWithPool = true;
}
}
@@ -2000,9 +2000,9 @@ Ice::ConnectionI::unregisterWithPool()
if(_registeredWithPool)
{
- _threadPool->unregister(_transceiver->fd());
- _registeredWithPool = false;
- ++_finishedCount; // For each unregistration, finished() is called once.
+ _threadPool->unregister(_transceiver->fd());
+ _registeredWithPool = false;
+ ++_finishedCount; // For each unregistration, finished() is called once.
}
}
@@ -2011,59 +2011,59 @@ getBZ2Error(int bzError)
{
if(bzError == BZ_RUN_OK)
{
- return ": BZ_RUN_OK";
+ return ": BZ_RUN_OK";
}
else if(bzError == BZ_FLUSH_OK)
{
- return ": BZ_FLUSH_OK";
+ return ": BZ_FLUSH_OK";
}
else if(bzError == BZ_FINISH_OK)
{
- return ": BZ_FINISH_OK";
+ return ": BZ_FINISH_OK";
}
else if(bzError == BZ_STREAM_END)
{
- return ": BZ_STREAM_END";
+ return ": BZ_STREAM_END";
}
else if(bzError == BZ_CONFIG_ERROR)
{
- return ": BZ_CONFIG_ERROR";
+ return ": BZ_CONFIG_ERROR";
}
else if(bzError == BZ_SEQUENCE_ERROR)
{
- return ": BZ_SEQUENCE_ERROR";
+ return ": BZ_SEQUENCE_ERROR";
}
else if(bzError == BZ_PARAM_ERROR)
{
- return ": BZ_PARAM_ERROR";
+ return ": BZ_PARAM_ERROR";
}
else if(bzError == BZ_MEM_ERROR)
{
- return ": BZ_MEM_ERROR";
+ return ": BZ_MEM_ERROR";
}
else if(bzError == BZ_DATA_ERROR)
{
- return ": BZ_DATA_ERROR";
+ return ": BZ_DATA_ERROR";
}
else if(bzError == BZ_DATA_ERROR_MAGIC)
{
- return ": BZ_DATA_ERROR_MAGIC";
+ return ": BZ_DATA_ERROR_MAGIC";
}
else if(bzError == BZ_IO_ERROR)
{
- return ": BZ_IO_ERROR";
+ return ": BZ_IO_ERROR";
}
else if(bzError == BZ_UNEXPECTED_EOF)
{
- return ": BZ_UNEXPECTED_EOF";
+ return ": BZ_UNEXPECTED_EOF";
}
else if(bzError == BZ_OUTBUFF_FULL)
{
- return ": BZ_OUTBUFF_FULL";
+ return ": BZ_OUTBUFF_FULL";
}
else
{
- return "";
+ return "";
}
}
@@ -2079,15 +2079,15 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed)
unsigned int compressedLen = static_cast<unsigned int>(uncompressedLen * 1.01 + 600);
compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
int bzError = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int),
- &compressedLen,
- reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize,
- uncompressedLen,
- _compressionLevel, 0, 0);
+ &compressedLen,
+ reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize,
+ uncompressedLen,
+ _compressionLevel, 0, 0);
if(bzError != BZ_OK)
{
- CompressionException ex(__FILE__, __LINE__);
- ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError);
- throw ex;
+ CompressionException ex(__FILE__, __LINE__);
+ ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError);
+ throw ex;
}
compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
@@ -2130,22 +2130,22 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
compressed.read(uncompressedSize);
if(uncompressedSize <= headerSize)
{
- throw IllegalMessageSizeException(__FILE__, __LINE__);
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
}
uncompressed.resize(uncompressedSize);
unsigned int uncompressedLen = uncompressedSize - headerSize;
unsigned int compressedLen = static_cast<unsigned int>(compressed.b.size() - headerSize - sizeof(Int));
int bzError = BZ2_bzBuffToBuffDecompress(reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize,
- &uncompressedLen,
- reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int),
- compressedLen,
- 0, 0);
+ &uncompressedLen,
+ reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int),
+ compressedLen,
+ 0, 0);
if(bzError != BZ_OK)
{
- CompressionException ex(__FILE__, __LINE__);
- ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError);
- throw ex;
+ CompressionException ex(__FILE__, __LINE__);
+ ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError);
+ throw ex;
}
copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin());
@@ -2153,222 +2153,222 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
void
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
- ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync)
+ ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
+ OutgoingAsyncPtr& outAsync)
{
assert(_state > StateNotValidated && _state < StateClosed);
if(_acmTimeout > 0)
{
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
}
try
{
- //
- // We don't need to check magic and version here. This has
- // already been done by the ThreadPool or the
- // ThreadPerConnection, which provides us with the stream.
- //
- assert(stream.i == stream.b.end());
- stream.i = stream.b.begin() + 8;
- Byte messageType;
- stream.read(messageType);
- stream.read(compress);
- if(compress == 2)
- {
- BasicStream ustream(_instance.get());
- doUncompress(stream, ustream);
- stream.b.swap(ustream.b);
- }
- stream.i = stream.b.begin() + headerSize;
+ //
+ // We don't need to check magic and version here. This has
+ // already been done by the ThreadPool or the
+ // ThreadPerConnection, which provides us with the stream.
+ //
+ assert(stream.i == stream.b.end());
+ stream.i = stream.b.begin() + 8;
+ Byte messageType;
+ stream.read(messageType);
+ stream.read(compress);
+ if(compress == 2)
+ {
+ BasicStream ustream(_instance.get());
+ doUncompress(stream, ustream);
+ stream.b.swap(ustream.b);
+ }
+ stream.i = stream.b.begin() + headerSize;
- switch(messageType)
- {
- case closeConnectionMsg:
- {
- traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "ignoring close connection message for datagram connection:\n" << _desc;
- }
- }
- else
- {
- setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
- }
- break;
- }
-
- case requestMsg:
- {
- if(_state == StateClosing)
- {
- traceRequest("received request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- traceRequest("received request", stream, _logger, _traceLevels);
- stream.read(requestId);
- invokeNum = 1;
- servantManager = _servantManager;
- adapter = _adapter;
- ++_dispatchCount;
- }
- break;
- }
-
- case requestBatchMsg:
- {
- if(_state == StateClosing)
- {
- traceBatchRequest("received batch request during closing\n"
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- traceBatchRequest("received batch request", stream, _logger, _traceLevels);
- stream.read(invokeNum);
- if(invokeNum < 0)
- {
- invokeNum = 0;
- throw NegativeSizeException(__FILE__, __LINE__);
- }
- servantManager = _servantManager;
- adapter = _adapter;
- _dispatchCount += invokeNum;
- }
- break;
- }
-
- case replyMsg:
- {
- traceReply("received reply", stream, _logger, _traceLevels);
-
- stream.read(requestId);
-
- map<Int, Outgoing*>::iterator p = _requests.end();
- map<Int, AsyncRequest>::iterator q = _asyncRequests.end();
-
- if(_requestsHint != _requests.end())
- {
- if(_requestsHint->first == requestId)
- {
- p = _requestsHint;
- }
- }
-
- if(p == _requests.end())
- {
- if(_asyncRequestsHint != _asyncRequests.end())
- {
- if(_asyncRequestsHint->first == requestId)
- {
- q = _asyncRequestsHint;
- }
- }
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
- {
- p = _requests.find(requestId);
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
- {
- q = _asyncRequests.find(requestId);
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
- {
- throw UnknownRequestIdException(__FILE__, __LINE__);
- }
-
- if(p != _requests.end())
- {
- p->second->finished(stream);
-
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
- }
- else
- {
- assert(q != _asyncRequests.end());
-
- outAsync = q->second.p;
-
- if(q == _asyncRequestsHint)
- {
- _asyncRequests.erase(q++);
- _asyncRequestsHint = q;
- }
- else
- {
- _asyncRequests.erase(q);
- }
- }
-
- break;
- }
-
- case validateConnectionMsg:
- {
- traceHeader("received validate connection", stream, _logger, _traceLevels);
- if(_warn)
- {
- Warning out(_logger);
- out << "ignoring unexpected validate connection message:\n" << _desc;
- }
- break;
- }
-
- default:
- {
- traceHeader("received unknown message\n"
- "(invalid, closing connection)",
- stream, _logger, _traceLevels);
- throw UnknownMessageException(__FILE__, __LINE__);
- break;
- }
- }
+ switch(messageType)
+ {
+ case closeConnectionMsg:
+ {
+ traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring close connection message for datagram connection:\n" << _desc;
+ }
+ }
+ else
+ {
+ setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ }
+ break;
+ }
+
+ case requestMsg:
+ {
+ if(_state == StateClosing)
+ {
+ traceRequest("received request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceRequest("received request", stream, _logger, _traceLevels);
+ stream.read(requestId);
+ invokeNum = 1;
+ servantManager = _servantManager;
+ adapter = _adapter;
+ ++_dispatchCount;
+ }
+ break;
+ }
+
+ case requestBatchMsg:
+ {
+ if(_state == StateClosing)
+ {
+ traceBatchRequest("received batch request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceBatchRequest("received batch request", stream, _logger, _traceLevels);
+ stream.read(invokeNum);
+ if(invokeNum < 0)
+ {
+ invokeNum = 0;
+ throw NegativeSizeException(__FILE__, __LINE__);
+ }
+ servantManager = _servantManager;
+ adapter = _adapter;
+ _dispatchCount += invokeNum;
+ }
+ break;
+ }
+
+ case replyMsg:
+ {
+ traceReply("received reply", stream, _logger, _traceLevels);
+
+ stream.read(requestId);
+
+ map<Int, Outgoing*>::iterator p = _requests.end();
+ map<Int, AsyncRequest>::iterator q = _asyncRequests.end();
+
+ if(_requestsHint != _requests.end())
+ {
+ if(_requestsHint->first == requestId)
+ {
+ p = _requestsHint;
+ }
+ }
+
+ if(p == _requests.end())
+ {
+ if(_asyncRequestsHint != _asyncRequests.end())
+ {
+ if(_asyncRequestsHint->first == requestId)
+ {
+ q = _asyncRequestsHint;
+ }
+ }
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ p = _requests.find(requestId);
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ q = _asyncRequests.find(requestId);
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ throw UnknownRequestIdException(__FILE__, __LINE__);
+ }
+
+ if(p != _requests.end())
+ {
+ p->second->finished(stream);
+
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+ }
+ else
+ {
+ assert(q != _asyncRequests.end());
+
+ outAsync = q->second.p;
+
+ if(q == _asyncRequestsHint)
+ {
+ _asyncRequests.erase(q++);
+ _asyncRequestsHint = q;
+ }
+ else
+ {
+ _asyncRequests.erase(q);
+ }
+ }
+
+ break;
+ }
+
+ case validateConnectionMsg:
+ {
+ traceHeader("received validate connection", stream, _logger, _traceLevels);
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring unexpected validate connection message:\n" << _desc;
+ }
+ break;
+ }
+
+ default:
+ {
+ traceHeader("received unknown message\n"
+ "(invalid, closing connection)",
+ stream, _logger, _traceLevels);
+ throw UnknownMessageException(__FILE__, __LINE__);
+ break;
+ }
+ }
}
catch(const SocketException& ex)
{
- exception(ex);
+ exception(ex);
}
catch(const LocalException& ex)
{
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "datagram connection exception:\n" << ex << '\n' << _desc;
- }
- }
- else
- {
- setState(StateClosed, ex);
- }
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "datagram connection exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
}
}
void
Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, Byte compress,
- const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter)
+ const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter)
{
//
// Note: In contrast to other private or protected methods, this
@@ -2377,45 +2377,45 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
try
{
- while(invokeNum > 0)
- {
- //
- // Prepare the invocation.
- //
- bool response = !_endpoint->datagram() && requestId != 0;
- Incoming in(_instance.get(), this, adapter, response, compress, requestId);
- BasicStream* is = in.is();
- stream.swap(*is);
- BasicStream* os = in.os();
-
- //
- // Prepare the response if necessary.
- //
- if(response)
- {
- assert(invokeNum == 1); // No further invocations if a response is expected.
- os->writeBlob(replyHdr, sizeof(replyHdr));
-
- //
- // Add the request ID.
- //
- os->write(requestId);
- }
-
- in.invoke(servantManager);
-
- //
- // If there are more invocations, we need the stream back.
- //
- if(--invokeNum > 0)
- {
- stream.swap(*is);
- }
- }
+ while(invokeNum > 0)
+ {
+ //
+ // Prepare the invocation.
+ //
+ bool response = !_endpoint->datagram() && requestId != 0;
+ Incoming in(_instance.get(), this, adapter, response, compress, requestId);
+ BasicStream* is = in.is();
+ stream.swap(*is);
+ BasicStream* os = in.os();
+
+ //
+ // Prepare the response if necessary.
+ //
+ if(response)
+ {
+ assert(invokeNum == 1); // No further invocations if a response is expected.
+ os->writeBlob(replyHdr, sizeof(replyHdr));
+
+ //
+ // Add the request ID.
+ //
+ os->write(requestId);
+ }
+
+ in.invoke(servantManager);
+
+ //
+ // If there are more invocations, we need the stream back.
+ //
+ if(--invokeNum > 0)
+ {
+ stream.swap(*is);
+ }
+ }
}
catch(const LocalException& ex)
{
- invokeException(ex, invokeNum); // Fatal invocation exception
+ invokeException(ex, invokeNum); // Fatal invocation exception
}
}
@@ -2430,40 +2430,40 @@ Ice::ConnectionI::run()
//
if(!_endpoint->datagram())
{
- try
- {
- validate();
- }
- catch(const LocalException&)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- assert(_state == StateClosed);
-
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- if(_transceiver)
- {
- try
- {
- _transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- _transceiver = 0;
- }
- notifyAll();
- return;
- }
-
- activate();
+ try
+ {
+ validate();
+ }
+ catch(const LocalException&)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ assert(_state == StateClosed);
+
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ if(_transceiver)
+ {
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ _transceiver = 0;
+ }
+ notifyAll();
+ return;
+ }
+
+ activate();
}
const bool warnUdp = _instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0;
@@ -2472,213 +2472,213 @@ Ice::ConnectionI::run()
while(!closed)
{
- //
- // We must accept new connections outside the thread
- // synchronization, because we use blocking accept.
- //
-
- BasicStream stream(_instance.get());
-
- try
- {
- stream.b.resize(headerSize);
- stream.i = stream.b.begin();
- _transceiver->read(stream, -1);
-
- ptrdiff_t pos = stream.i - stream.b.begin();
- if(pos < headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- stream.i = stream.b.begin();
- const Byte* header;
- stream.readBlob(header, headerSize);
- if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3])
- {
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic));
- throw ex;
- }
- if(header[4] != protocolMajor)
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(header[4]);
- ex.badMinor = static_cast<unsigned char>(header[5]);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- if(header[6] != encodingMajor)
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(header[6]);
- ex.badMinor = static_cast<unsigned char>(header[7]);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
-
- Int size;
- stream.i -= sizeof(Int);
- stream.read(size);
- if(size < headerSize)
- {
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(_instance->messageSizeMax()))
- {
- throw MemoryLimitException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(stream.b.size()))
- {
- stream.b.resize(size);
- }
- stream.i = stream.b.begin() + pos;
-
- if(stream.i != stream.b.end())
- {
- if(_endpoint->datagram())
- {
- if(warnUdp)
- {
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << pos << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
- else
- {
- _transceiver->read(stream, -1);
- assert(stream.i == stream.b.end());
- }
- }
- }
- catch(const DatagramLimitException&) // Expected.
- {
- continue;
- }
- catch(const SocketException& ex)
- {
- exception(ex);
- }
- catch(const LocalException& ex)
- {
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "datagram connection exception:\n" << ex << '\n' << _desc;
- }
- continue;
- }
- else
- {
- exception(ex);
- }
- }
-
- Byte compress = 0;
- Int requestId = 0;
- Int invokeNum = 0;
- ServantManagerPtr servantManager;
- ObjectAdapterPtr adapter;
- OutgoingAsyncPtr outAsync;
-
- auto_ptr<LocalException> localEx;
-
- map<Int, Outgoing*> requests;
- map<Int, AsyncRequest> asyncRequests;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_state == StateHolding)
- {
- wait();
- }
-
- if(_state != StateClosed)
- {
- parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
- }
-
- //
+ //
+ // We must accept new connections outside the thread
+ // synchronization, because we use blocking accept.
+ //
+
+ BasicStream stream(_instance.get());
+
+ try
+ {
+ stream.b.resize(headerSize);
+ stream.i = stream.b.begin();
+ _transceiver->read(stream, -1);
+
+ ptrdiff_t pos = stream.i - stream.b.begin();
+ if(pos < headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ stream.i = stream.b.begin();
+ const Byte* header;
+ stream.readBlob(header, headerSize);
+ if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3])
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic));
+ throw ex;
+ }
+ if(header[4] != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(header[4]);
+ ex.badMinor = static_cast<unsigned char>(header[5]);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ if(header[6] != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(header[6]);
+ ex.badMinor = static_cast<unsigned char>(header[7]);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+
+ Int size;
+ stream.i -= sizeof(Int);
+ stream.read(size);
+ if(size < headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(_instance->messageSizeMax()))
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(stream.b.size()))
+ {
+ stream.b.resize(size);
+ }
+ stream.i = stream.b.begin() + pos;
+
+ if(stream.i != stream.b.end())
+ {
+ if(_endpoint->datagram())
+ {
+ if(warnUdp)
+ {
+ Warning out(_logger);
+ out << "DatagramLimitException: maximum size of " << pos << " exceeded";
+ }
+ throw DatagramLimitException(__FILE__, __LINE__);
+ }
+ else
+ {
+ _transceiver->read(stream, -1);
+ assert(stream.i == stream.b.end());
+ }
+ }
+ }
+ catch(const DatagramLimitException&) // Expected.
+ {
+ continue;
+ }
+ catch(const SocketException& ex)
+ {
+ exception(ex);
+ }
+ catch(const LocalException& ex)
+ {
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "datagram connection exception:\n" << ex << '\n' << _desc;
+ }
+ continue;
+ }
+ else
+ {
+ exception(ex);
+ }
+ }
+
+ Byte compress = 0;
+ Int requestId = 0;
+ Int invokeNum = 0;
+ ServantManagerPtr servantManager;
+ ObjectAdapterPtr adapter;
+ OutgoingAsyncPtr outAsync;
+
+ auto_ptr<LocalException> localEx;
+
+ map<Int, Outgoing*> requests;
+ map<Int, AsyncRequest> asyncRequests;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(_state == StateHolding)
+ {
+ wait();
+ }
+
+ if(_state != StateClosed)
+ {
+ parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
+ }
+
+ //
// parseMessage() can close the connection, so we must
// check for closed state again.
- //
- if(_state == StateClosed)
- {
- //
- // We must make sure that nobody is sending when we close
- // the transceiver.
- //
- IceUtil::Mutex::Lock sendSync(_sendMutex);
-
- try
- {
- _transceiver->close();
- }
- catch(const LocalException& ex)
- {
- localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
- }
-
- _transceiver = 0;
- notifyAll();
-
- //
- // We cannot simply return here. We have to make sure
- // that all requests (regular and async) are notified
- // about the closed connection below.
- //
- closed = true;
- }
-
- if(_state == StateClosed || _state == StateClosing)
- {
- requests.swap(_requests);
- _requestsHint = _requests.end();
-
- asyncRequests.swap(_asyncRequests);
- _asyncRequestsHint = _asyncRequests.end();
- }
- }
-
- //
- // Asynchronous replies must be handled outside the thread
- // synchronization, so that nested calls are possible.
- //
- if(outAsync)
- {
- outAsync->__finished(stream);
- }
-
- //
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
- //
- invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
-
- for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
- {
- p->second->finished(*_exception.get()); // The exception is immutable at this point.
- }
-
- for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
- {
- q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
- }
-
- if(localEx.get())
- {
- assert(closed);
- localEx->ice_throw();
- }
+ //
+ if(_state == StateClosed)
+ {
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException& ex)
+ {
+ localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ }
+
+ _transceiver = 0;
+ notifyAll();
+
+ //
+ // We cannot simply return here. We have to make sure
+ // that all requests (regular and async) are notified
+ // about the closed connection below.
+ //
+ closed = true;
+ }
+
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests.swap(_requests);
+ _requestsHint = _requests.end();
+
+ asyncRequests.swap(_asyncRequests);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
+ }
+
+ //
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(outAsync)
+ {
+ outAsync->__finished(stream);
+ }
+
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
+
+ for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q)
+ {
+ q->second.p->__finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ if(localEx.get())
+ {
+ assert(closed);
+ localEx->ice_throw();
+ }
}
}
@@ -2700,19 +2700,19 @@ Ice::ConnectionI::ThreadPerConnection::run()
_connection->run();
}
catch(const Exception& ex)
- {
- Error out(_connection->_logger);
- out << "exception in thread per connection:\n" << _connection->toString() << ex;
+ {
+ Error out(_connection->_logger);
+ out << "exception in thread per connection:\n" << _connection->toString() << ex;
}
catch(const std::exception& ex)
{
- Error out(_connection->_logger);
- out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what();
+ Error out(_connection->_logger);
+ out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what();
}
catch(...)
{
- Error out(_connection->_logger);
- out << "unknown exception in thread per connection:\n" << _connection->toString();
+ Error out(_connection->_logger);
+ out << "unknown exception in thread per connection:\n" << _connection->toString();
}
if(_connection->_instance->initializationData().threadHook)