summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2014-09-05 10:42:18 -0230
committerDwayne Boone <dwayne@zeroc.com>2014-09-05 10:42:18 -0230
commit9786853ab2d88598021aaec5c0409d3a45a50a13 (patch)
treed64858749513c529fdb84a98d8637d19f2c125e4 /cpp/src/Ice/ConnectionI.cpp
parentMinor change to JS print stack traces (diff)
downloadice-9786853ab2d88598021aaec5c0409d3a45a50a13.tar.bz2
ice-9786853ab2d88598021aaec5c0409d3a45a50a13.tar.xz
ice-9786853ab2d88598021aaec5c0409d3a45a50a13.zip
ICE-4891 Refactor network tracing
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp288
1 files changed, 196 insertions, 92 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index fcf05ef9db0..fad11c7cd0f 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -13,6 +13,7 @@
#include <Ice/LoggerUtil.h>
#include <Ice/Properties.h>
#include <Ice/TraceUtil.h>
+#include <Ice/TraceLevels.h>
#include <Ice/DefaultsAndOverrides.h>
#include <Ice/Transceiver.h>
#include <Ice/ThreadPool.h>
@@ -56,7 +57,7 @@ public:
{
_connection->timedOut();
}
-
+
private:
Ice::ConnectionI* _connection;
@@ -66,9 +67,9 @@ class DispatchCall : public DispatchWorkItem
{
public:
- DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
- const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
- Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
+ DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
+ const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
+ Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
BasicStream& stream) :
DispatchWorkItem(connection),
@@ -90,7 +91,7 @@ public:
virtual void
run()
{
- _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter,
+ _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter,
_outAsync, _heartbeatCallback, _stream);
}
@@ -156,7 +157,7 @@ Ice::ConnectionI::Observer::startRead(const Buffer& buf)
_readStreamPos = buf.b.empty() ? 0 : buf.i;
}
-void
+void
Ice::ConnectionI::Observer::finishRead(const Buffer& buf)
{
if(_readStreamPos == 0)
@@ -179,7 +180,7 @@ Ice::ConnectionI::Observer::startWrite(const Buffer& buf)
_writeStreamPos = buf.b.empty() ? 0 : buf.i;
}
-void
+void
Ice::ConnectionI::Observer::finishWrite(const Buffer& buf)
{
if(_writeStreamPos == 0)
@@ -264,7 +265,7 @@ Ice::ConnectionI::OutgoingMessage::sent()
delete stream;
}
stream = 0;
-
+
if(out)
{
out->sent();
@@ -531,7 +532,7 @@ Ice::ConnectionI::updateObserver()
assert(_instance->getObserver());
_observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(),
- _endpoint,
+ _endpoint,
toConnectionState(_state),
_observer.get()));
}
@@ -550,8 +551,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
{
//
// If writing or reading, nothing to do, the connection
- // timeout will kick-in if writes or reads don't progress.
- // This check is necessary because the actitivy timer is
+ // timeout will kick-in if writes or reads don't progress.
+ // This check is necessary because the actitivy timer is
// only set when a message is fully read/written.
//
return;
@@ -570,7 +571,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
// called every (timeout / 2) period.
//
- if(acm.heartbeat == HeartbeatAlways ||
+ if(acm.heartbeat == HeartbeatAlways ||
(acm.heartbeat != HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4)))
{
if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0)
@@ -578,10 +579,10 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
heartbeat();
}
}
-
+
if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout))
{
- if(acm.close == CloseOnIdleForceful ||
+ if(acm.close == CloseOnIdleForceful ||
(acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty())))
{
//
@@ -650,7 +651,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
#endif
}
- out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
static_cast<Int>(os->b.size() - headerSize - 4));
//
@@ -730,7 +731,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
#endif
}
- out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
+ out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
static_cast<Int>(os->b.size() - headerSize - 4));
AsyncStatus status = AsyncStatusQueued;
@@ -773,8 +774,8 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
if(_exception.get())
{
//
- // If there were no batch requests queued when the connection failed, we can safely
- // retry with a new connection. Otherwise, we must throw to notify the caller that
+ // If there were no batch requests queued when the connection failed, we can safely
+ // retry with a new connection. Otherwise, we must throw to notify the caller that
// some previous batch requests were not sent.
//
if(_batchStream.b.empty())
@@ -984,23 +985,23 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR
}
#ifdef ICE_CPP11
-AsyncResultPtr
+AsyncResultPtr
Ice::ConnectionI::begin_flushBatchRequests(
- const IceInternal::Function<void (const Exception&)>& exception,
+ const IceInternal::Function<void (const Exception&)>& exception,
const IceInternal::Function<void (bool)>& sent)
{
class Cpp11CB : public IceInternal::Cpp11FnCallbackNC
{
public:
-
+
Cpp11CB(const IceInternal::Function<void (const Exception&)>& excb,
const IceInternal::Function<void (bool)>& sentcb) :
IceInternal::Cpp11FnCallbackNC(excb, sentcb)
{
CallbackBase::checkCallback(true, excb != nullptr);
}
-
+
virtual void
completed(const AsyncResultPtr& __result) const
{
@@ -1017,7 +1018,7 @@ Ice::ConnectionI::begin_flushBatchRequests(
}
}
};
-
+
return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0);
}
#endif
@@ -1075,7 +1076,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
- out->attachRemoteObserver(initConnectionInfo(), _endpoint,
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint,
static_cast<Int>(_batchStream.b.size() - headerSize - 4));
_batchStream.swap(*out->os());
@@ -1173,7 +1174,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
return status;
}
-void
+void
Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1184,9 +1185,9 @@ Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
_callback = callback;
}
-void
-Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
- const IceUtil::Optional<Ice::ACMClose>& close,
+void
+Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
+ const IceUtil::Optional<Ice::ACMClose>& close,
const IceUtil::Optional<Ice::ACMHeartbeat>& heartbeat)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1201,7 +1202,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
if(_monitor->getACM().timeout <= 0)
{
_acmLastActivity = IceUtil::Time(); // Disable the recording of last activity.
- }
+ }
else if(_acmLastActivity == IceUtil::Time() && _state == StateActive)
{
_acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
@@ -1225,7 +1226,7 @@ Ice::ConnectionI::getACM()
return _monitor ? _monitor->getACM() : acm;
}
-void
+void
Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1247,10 +1248,10 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
}
//
- // If the request is being sent, don't remove it from the send streams,
+ // If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
- if(o == _sendStreams.begin())
+ if(o == _sendStreams.begin())
{
o->timedOut(true); // true = adopt the stream.
}
@@ -1293,18 +1294,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
}
}
-void
+void
Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
+
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
if(o->outAsync.get() == outAsync.get())
{
if(o->requestId)
{
- if(_asyncRequestsHint != _asyncRequests.end() &&
+ if(_asyncRequestsHint != _asyncRequests.end() &&
_asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
{
_asyncRequests.erase(_asyncRequestsHint);
@@ -1315,12 +1316,12 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou
_asyncRequests.erase(o->requestId);
}
}
-
+
//
- // If the request is being sent, don't remove it from the send streams,
+ // If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
- if(o == _sendStreams.begin())
+ if(o == _sendStreams.begin())
{
o->timedOut(true); // true = adopt the stream
}
@@ -1347,7 +1348,7 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou
return; // We're done
}
}
-
+
for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
{
if(p->second.get() == o.get())
@@ -1366,7 +1367,7 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
-
+
try
{
if(--_dispatchCount == 0)
@@ -1377,21 +1378,21 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag)
}
notifyAll();
}
-
+
if(_state >= StateClosed)
{
assert(_exception.get());
_exception->ice_throw();
}
-
+
OutgoingMessage message(os, compressFlag > 0);
sendMessage(message);
-
+
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
}
-
+
return;
}
catch(const LocalException& ex)
@@ -1405,7 +1406,7 @@ Ice::ConnectionI::sendNoResponse()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
-
+
try
{
if(--_dispatchCount == 0)
@@ -1416,13 +1417,13 @@ Ice::ConnectionI::sendNoResponse()
}
notifyAll();
}
-
+
if(_state >= StateClosed)
{
assert(_exception.get());
_exception->ice_throw();
}
-
+
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
@@ -1518,13 +1519,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
try
{
- if(operation & SocketOperationWrite)
+ if(operation & SocketOperationWrite)
{
if(_observer)
{
_observer.startWrite(_writeStream);
}
-
+
if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
{
// The whole message is written, assume it's sent now for at-most-once semantics.
@@ -1539,7 +1540,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
_observer.startRead(_readStream);
}
-
+
_transceiver->startRead(_readStream);
}
else
@@ -1563,7 +1564,19 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
+ Buffer::Container::iterator start = _writeStream.i;
_transceiver->finishWrite(_writeStream);
+ if(_instance->traceLevels()->network >= 3 && _writeStream.i != start)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "sent " << (_writeStream.i - start);
+ if(!_endpoint->datagram())
+ {
+ out << " of " << (_writeStream.b.end() - start);
+ }
+ out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ }
+
if(_observer)
{
_observer.finishWrite(_writeStream);
@@ -1573,7 +1586,23 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
if(!_hasMoreData)
{
+ Buffer::Container::iterator start = _readStream.i;
_transceiver->finishRead(_readStream, _hasMoreData);
+ if(_instance->traceLevels()->network >= 3 && _readStream.i != start)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "received ";
+ if(_endpoint->datagram())
+ {
+ out << _readStream.b.size();
+ }
+ else
+ {
+ out << (_readStream.i - start) << " of " << (_readStream.b.end() - start);
+ }
+ out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ }
+
if(_observer && !_readHeader)
{
_observer.finishRead(_readStream);
@@ -1632,7 +1661,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
_observer.startWrite(_writeStream);
}
- writeOp = _transceiver->write(_writeStream);
+ writeOp = write(_writeStream);
if(_observer && !(writeOp & SocketOperationWrite))
{
_observer.finishWrite(_writeStream);
@@ -1646,7 +1675,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
_observer.startRead(_readStream);
}
- readOp = _transceiver->read(_readStream, _hasMoreData);
+ readOp = read(_readStream);
if(readOp & SocketOperationRead)
{
break;
@@ -1665,7 +1694,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
_observer->receivedBytes(static_cast<int>(headerSize));
}
-
+
ptrdiff_t pos = _readStream.i - _readStream.b.begin();
if(pos < headerSize)
{
@@ -1674,7 +1703,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
throw IllegalMessageSizeException(__FILE__, __LINE__);
}
-
+
_readStream.i = _readStream.b.begin();
const Byte* m;
_readStream.readBlob(m, static_cast<Int>(sizeof(magic)));
@@ -1690,7 +1719,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
EncodingVersion ev;
_readStream.read(ev);
checkSupportedProtocolEncoding(ev);
-
+
Byte messageType;
_readStream.read(messageType);
Byte compress;
@@ -1711,7 +1740,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
_readStream.i = _readStream.b.begin() + pos;
}
-
+
if(_readStream.i != _readStream.b.end())
{
if(_endpoint->datagram())
@@ -1772,15 +1801,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
// We parse messages first, if we receive a close
// connection message we won't send more messages.
- //
+ //
if(readyOp & SocketOperationRead)
{
newOp = static_cast<SocketOperation>(newOp | parseMessage(current.stream,
- invokeNum,
- requestId,
- compress,
- servantManager,
- adapter,
+ invokeNum,
+ requestId,
+ compress,
+ servantManager,
+ adapter,
outAsync,
heartbeatCallback,
dispatchCount));
@@ -1867,15 +1896,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
else
{
_threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum,
- servantManager, adapter, outAsync, heartbeatCallback,
+ servantManager, adapter, outAsync, heartbeatCallback,
current.stream));
}
}
void
ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs,
- Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
- const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
+ Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
+ const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
{
int dispatchedCount = 0;
@@ -1954,7 +1983,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
if(invokeNum)
{
invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
-
+
//
// Don't increase count, the dispatch count is
// decreased when the incoming reply is sent.
@@ -2007,7 +2036,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
//
// If there are no callbacks to call, we don't call ioCompleted() since we're not going
- // to call code that will potentially block (this avoids promoting a new leader and
+ // to call code that will potentially block (this avoids promoting a new leader and
// unecessary thread creation, especially if this is called on shutdown).
//
if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback)
@@ -2030,6 +2059,24 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
void
Ice::ConnectionI::finish()
{
+ if(!_initialized)
+ {
+ if(_instance->traceLevels()->network >= 2)
+ {
+ string verb = _connector ? "establish" : "accept";
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() << "\n" << *_exception.get();
+ }
+ }
+ else
+ {
+ if(_instance->traceLevels()->network >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "closed " << _endpoint->protocol() << " connection\n" << toString();
+ }
+ }
+
if(_startCallback)
{
_startCallback->connectionStartFailed(this, *_exception.get());
@@ -2041,16 +2088,16 @@ Ice::ConnectionI::finish()
if(!_writeStream.b.empty())
{
//
- // Return the stream to the outgoing call. This is important for
+ // Return the stream to the outgoing call. This is important for
// retriable AMI calls which are not marshalled again.
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
-
+
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
//
// The current message might be sent but not yet removed from _sendStreams. If
- // the response has been received in the meantime, we remove the message from
+ // the response has been received in the meantime, we remove the message from
// _sendStreams to not call finished on a message which is already done.
//
if(message->isSent || message->receivedReply)
@@ -2128,6 +2175,7 @@ Ice::ConnectionI::finish()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
setState(StateFinished);
+
if(_dispatchCount == 0)
{
reap();
@@ -2264,6 +2312,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_dispatchCount(0),
_state(StateNotInitialized),
_shutdownInitiated(false),
+ _initialized(false),
_validated(false)
{
int& compressionLevel = const_cast<int&>(_compressionLevel);
@@ -2343,7 +2392,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
assert(_state != StateClosed);
_exception.reset(ex.ice_clone());
-
+
//
// We don't warn if we are not validated.
//
@@ -2522,7 +2571,7 @@ Ice::ConnectionI::setState(State state)
if(oldState != newState)
{
_observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(),
- _endpoint,
+ _endpoint,
newState,
_observer.get()));
}
@@ -2561,11 +2610,11 @@ Ice::ConnectionI::initiateShutdown()
{
assert(_state == StateClosing);
assert(_dispatchCount == 0);
-
+
if(_shutdownInitiated)
{
return;
- }
+ }
_shutdownInitiated = true;
if(!_endpoint->datagram())
@@ -2591,7 +2640,7 @@ Ice::ConnectionI::initiateShutdown()
//
// Notify the the transceiver of the graceful connection closure.
- //
+ //
SocketOperation op = _transceiver->closing(true, *_exception.get());
if(op)
{
@@ -2648,8 +2697,8 @@ Ice::ConnectionI::initialize(SocketOperation operation)
// Update the connection description once the transceiver is initialized.
//
const_cast<string&>(_desc) = _transceiver->toString();
+ _initialized = true;
setState(StateNotValidated);
-
return true;
}
@@ -2682,7 +2731,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(_writeStream.i != _writeStream.b.end())
{
- SocketOperation op = _transceiver->write(_writeStream);
+ SocketOperation op = write(_writeStream);
if(op)
{
scheduleTimeout(op);
@@ -2711,7 +2760,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(_readStream.i != _readStream.b.end())
{
- SocketOperation op = _transceiver->read(_readStream, _hasMoreData);
+ SocketOperation op = read(_readStream);
if(op)
{
scheduleTimeout(op);
@@ -2771,6 +2820,21 @@ Ice::ConnectionI::validate(SocketOperation operation)
_readStream.i = _readStream.b.begin();
_readHeader = true;
+ if(_instance->traceLevels()->network >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ if(_endpoint->datagram())
+ {
+ out << "starting to " << (_connector ? "send" : "receive") << " " << _endpoint->protocol() << " messages\n";
+ out << _transceiver->toDetailedString();
+ }
+ else
+ {
+ out << (_connector ? "established" : "accepted") << " " << _endpoint->protocol() << " connection\n";
+ out << toString();
+ }
+ }
+
return true;
}
@@ -2793,7 +2857,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
try
{
while(true)
- {
+ {
//
// Notify the message that it was sent.
//
@@ -2822,7 +2886,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
//
// This can occur if parseMessage (called before
// sendNextMessage by message()) closes the connection.
- //
+ //
if(_state >= StateClosingPending)
{
return SocketOperationNone;
@@ -2904,7 +2968,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
assert(_writeStream.i);
if(_writeStream.i != _writeStream.b.end())
{
- SocketOperation op = _transceiver->write(_writeStream);
+ SocketOperation op = write(_writeStream);
if(op)
{
return op;
@@ -2917,7 +2981,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
}
//
- // If all the messages were sent and we are in the closing state, we schedule
+ // If all the messages were sent and we are in the closing state, we schedule
// the close timeout to wait for the peer to close the connection.
//
if(_state == StateClosing && _shutdownInitiated)
@@ -2981,7 +3045,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
traceSend(*message.stream, _logger, _traceLevels);
}
-
+
//
// Send the message without blocking.
//
@@ -2989,7 +3053,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
_observer.startWrite(stream);
}
- op = _transceiver->write(stream);
+ op = write(stream);
if(!op)
{
if(_observer)
@@ -3051,7 +3115,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
_observer.startWrite(*message.stream);
}
- op = _transceiver->write(*message.stream);
+ op = write(*message.stream);
if(!op)
{
if(_observer)
@@ -3232,7 +3296,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3294,7 +3358,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
//
// Notify the the transceiver of the graceful connection closure.
- //
+ //
SocketOperation op = _transceiver->closing(false, *_exception.get());
if(op)
{
@@ -3418,8 +3482,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
//
- // If we just received the reply of a request which isn't acknowledge as
- // sent yet, we queue the reply instead of processing it right away. It
+ // If we just received the reply of a request which isn't acknowledge as
+ // sent yet, we queue the reply instead of processing it right away. It
// will be processed once the write callback is invoked for the message.
//
OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front();
@@ -3445,7 +3509,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
outAsync = 0;
}
-#endif
+#endif
notifyAll(); // Notify threads blocked in close(false)
}
@@ -3562,12 +3626,12 @@ Ice::ConnectionI::scheduleTimeout(SocketOperation status)
timeout = _endpoint->timeout();
}
}
-
+
if(timeout < 0)
{
return;
}
-
+
try
{
if(status & IceInternal::SocketOperationRead)
@@ -3636,6 +3700,46 @@ ConnectionI::toConnectionState(State state) const
return connectionStateMap[static_cast<int>(state)];
}
+SocketOperation
+ConnectionI::read(Buffer& buf)
+{
+ Buffer::Container::iterator start = buf.i;
+ SocketOperation op = _transceiver->read(buf, _hasMoreData);
+ if(_instance->traceLevels()->network >= 3 && buf.i != start)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "received ";
+ if(_endpoint->datagram())
+ {
+ out << buf.b.size();
+ }
+ else
+ {
+ out << (buf.i - start) << " of " << (buf.b.end() - start);
+ }
+ out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ }
+ return op;
+}
+
+SocketOperation
+ConnectionI::write(Buffer& buf)
+{
+ Buffer::Container::iterator start = buf.i;
+ SocketOperation op = _transceiver->write(buf);
+ if(_instance->traceLevels()->network >= 3 && buf.i != start)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "sent " << (buf.i - start);
+ if(!_endpoint->datagram())
+ {
+ out << " of " << (buf.b.end() - start);
+ }
+ out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ }
+ return op;
+}
+
void
ConnectionI::reap()
{