diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2014-09-05 10:42:18 -0230 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2014-09-05 10:42:18 -0230 |
commit | 9786853ab2d88598021aaec5c0409d3a45a50a13 (patch) | |
tree | d64858749513c529fdb84a98d8637d19f2c125e4 /cpp/src/Ice/ConnectionI.cpp | |
parent | Minor change to JS print stack traces (diff) | |
download | ice-9786853ab2d88598021aaec5c0409d3a45a50a13.tar.bz2 ice-9786853ab2d88598021aaec5c0409d3a45a50a13.tar.xz ice-9786853ab2d88598021aaec5c0409d3a45a50a13.zip |
ICE-4891 Refactor network tracing
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 288 |
1 files changed, 196 insertions, 92 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index fcf05ef9db0..fad11c7cd0f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -13,6 +13,7 @@ #include <Ice/LoggerUtil.h> #include <Ice/Properties.h> #include <Ice/TraceUtil.h> +#include <Ice/TraceLevels.h> #include <Ice/DefaultsAndOverrides.h> #include <Ice/Transceiver.h> #include <Ice/ThreadPool.h> @@ -56,7 +57,7 @@ public: { _connection->timedOut(); } - + private: Ice::ConnectionI* _connection; @@ -66,9 +67,9 @@ class DispatchCall : public DispatchWorkItem { public: - DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, - const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, - Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, + DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, + const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, + Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) : DispatchWorkItem(connection), @@ -90,7 +91,7 @@ public: virtual void run() { - _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, + _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, _outAsync, _heartbeatCallback, _stream); } @@ -156,7 +157,7 @@ Ice::ConnectionI::Observer::startRead(const Buffer& buf) _readStreamPos = buf.b.empty() ? 0 : buf.i; } -void +void Ice::ConnectionI::Observer::finishRead(const Buffer& buf) { if(_readStreamPos == 0) @@ -179,7 +180,7 @@ Ice::ConnectionI::Observer::startWrite(const Buffer& buf) _writeStreamPos = buf.b.empty() ? 0 : buf.i; } -void +void Ice::ConnectionI::Observer::finishWrite(const Buffer& buf) { if(_writeStreamPos == 0) @@ -264,7 +265,7 @@ Ice::ConnectionI::OutgoingMessage::sent() delete stream; } stream = 0; - + if(out) { out->sent(); @@ -531,7 +532,7 @@ Ice::ConnectionI::updateObserver() assert(_instance->getObserver()); _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), - _endpoint, + _endpoint, toConnectionState(_state), _observer.get())); } @@ -550,8 +551,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) { // // If writing or reading, nothing to do, the connection - // timeout will kick-in if writes or reads don't progress. - // This check is necessary because the actitivy timer is + // timeout will kick-in if writes or reads don't progress. + // This check is necessary because the actitivy timer is // only set when a message is fully read/written. // return; @@ -570,7 +571,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // called every (timeout / 2) period. // - if(acm.heartbeat == HeartbeatAlways || + if(acm.heartbeat == HeartbeatAlways || (acm.heartbeat != HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4))) { if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0) @@ -578,10 +579,10 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) heartbeat(); } } - + if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout)) { - if(acm.close == CloseOnIdleForceful || + if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty()))) { // @@ -650,7 +651,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) #endif } - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, static_cast<Int>(os->b.size() - headerSize - 4)); // @@ -730,7 +731,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } - out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, + out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, static_cast<Int>(os->b.size() - headerSize - 4)); AsyncStatus status = AsyncStatusQueued; @@ -773,8 +774,8 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) if(_exception.get()) { // - // If there were no batch requests queued when the connection failed, we can safely - // retry with a new connection. Otherwise, we must throw to notify the caller that + // If there were no batch requests queued when the connection failed, we can safely + // retry with a new connection. Otherwise, we must throw to notify the caller that // some previous batch requests were not sent. // if(_batchStream.b.empty()) @@ -984,23 +985,23 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR } #ifdef ICE_CPP11 -AsyncResultPtr +AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (const Exception&)>& exception, const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC { public: - + Cpp11CB(const IceInternal::Function<void (const Exception&)>& excb, const IceInternal::Function<void (bool)>& sentcb) : IceInternal::Cpp11FnCallbackNC(excb, sentcb) { CallbackBase::checkCallback(true, excb != nullptr); } - + virtual void completed(const AsyncResultPtr& __result) const { @@ -1017,7 +1018,7 @@ Ice::ConnectionI::begin_flushBatchRequests( } } }; - + return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0); } #endif @@ -1075,7 +1076,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - out->attachRemoteObserver(initConnectionInfo(), _endpoint, + out->attachRemoteObserver(initConnectionInfo(), _endpoint, static_cast<Int>(_batchStream.b.size() - headerSize - 4)); _batchStream.swap(*out->os()); @@ -1173,7 +1174,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) return status; } -void +void Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1184,9 +1185,9 @@ Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) _callback = callback; } -void -Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, - const IceUtil::Optional<Ice::ACMClose>& close, +void +Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, + const IceUtil::Optional<Ice::ACMClose>& close, const IceUtil::Optional<Ice::ACMHeartbeat>& heartbeat) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1201,7 +1202,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, if(_monitor->getACM().timeout <= 0) { _acmLastActivity = IceUtil::Time(); // Disable the recording of last activity. - } + } else if(_acmLastActivity == IceUtil::Time() && _state == StateActive) { _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); @@ -1225,7 +1226,7 @@ Ice::ConnectionI::getACM() return _monitor ? _monitor->getACM() : acm; } -void +void Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1247,10 +1248,10 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } // - // If the request is being sent, don't remove it from the send streams, + // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - if(o == _sendStreams.begin()) + if(o == _sendStreams.begin()) { o->timedOut(true); // true = adopt the stream. } @@ -1293,18 +1294,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } } -void +void Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->outAsync.get() == outAsync.get()) { if(o->requestId) { - if(_asyncRequestsHint != _asyncRequests.end() && + if(_asyncRequestsHint != _asyncRequests.end() && _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) { _asyncRequests.erase(_asyncRequestsHint); @@ -1315,12 +1316,12 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou _asyncRequests.erase(o->requestId); } } - + // - // If the request is being sent, don't remove it from the send streams, + // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - if(o == _sendStreams.begin()) + if(o == _sendStreams.begin()) { o->timedOut(true); // true = adopt the stream } @@ -1347,7 +1348,7 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou return; // We're done } } - + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { if(p->second.get() == o.get()) @@ -1366,7 +1367,7 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) @@ -1377,21 +1378,21 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag) } notifyAll(); } - + if(_state >= StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - + OutgoingMessage message(os, compressFlag > 0); sendMessage(message); - + if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); } - + return; } catch(const LocalException& ex) @@ -1405,7 +1406,7 @@ Ice::ConnectionI::sendNoResponse() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) @@ -1416,13 +1417,13 @@ Ice::ConnectionI::sendNoResponse() } notifyAll(); } - + if(_state >= StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - + if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); @@ -1518,13 +1519,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) try { - if(operation & SocketOperationWrite) + if(operation & SocketOperationWrite) { if(_observer) { _observer.startWrite(_writeStream); } - + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) { // The whole message is written, assume it's sent now for at-most-once semantics. @@ -1539,7 +1540,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { _observer.startRead(_readStream); } - + _transceiver->startRead(_readStream); } else @@ -1563,7 +1564,19 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + Buffer::Container::iterator start = _writeStream.i; _transceiver->finishWrite(_writeStream); + if(_instance->traceLevels()->network >= 3 && _writeStream.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "sent " << (_writeStream.i - start); + if(!_endpoint->datagram()) + { + out << " of " << (_writeStream.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + if(_observer) { _observer.finishWrite(_writeStream); @@ -1573,7 +1586,23 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(!_hasMoreData) { + Buffer::Container::iterator start = _readStream.i; _transceiver->finishRead(_readStream, _hasMoreData); + if(_instance->traceLevels()->network >= 3 && _readStream.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "received "; + if(_endpoint->datagram()) + { + out << _readStream.b.size(); + } + else + { + out << (_readStream.i - start) << " of " << (_readStream.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + if(_observer && !_readHeader) { _observer.finishRead(_readStream); @@ -1632,7 +1661,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { _observer.startWrite(_writeStream); } - writeOp = _transceiver->write(_writeStream); + writeOp = write(_writeStream); if(_observer && !(writeOp & SocketOperationWrite)) { _observer.finishWrite(_writeStream); @@ -1646,7 +1675,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _observer.startRead(_readStream); } - readOp = _transceiver->read(_readStream, _hasMoreData); + readOp = read(_readStream); if(readOp & SocketOperationRead) { break; @@ -1665,7 +1694,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { _observer->receivedBytes(static_cast<int>(headerSize)); } - + ptrdiff_t pos = _readStream.i - _readStream.b.begin(); if(pos < headerSize) { @@ -1674,7 +1703,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // throw IllegalMessageSizeException(__FILE__, __LINE__); } - + _readStream.i = _readStream.b.begin(); const Byte* m; _readStream.readBlob(m, static_cast<Int>(sizeof(magic))); @@ -1690,7 +1719,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) EncodingVersion ev; _readStream.read(ev); checkSupportedProtocolEncoding(ev); - + Byte messageType; _readStream.read(messageType); Byte compress; @@ -1711,7 +1740,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } _readStream.i = _readStream.b.begin() + pos; } - + if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) @@ -1772,15 +1801,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // // We parse messages first, if we receive a close // connection message we won't send more messages. - // + // if(readyOp & SocketOperationRead) { newOp = static_cast<SocketOperation>(newOp | parseMessage(current.stream, - invokeNum, - requestId, - compress, - servantManager, - adapter, + invokeNum, + requestId, + compress, + servantManager, + adapter, outAsync, heartbeatCallback, dispatchCount)); @@ -1867,15 +1896,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) else { _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum, - servantManager, adapter, outAsync, heartbeatCallback, + servantManager, adapter, outAsync, heartbeatCallback, current.stream)); } } void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, - Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, - const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, + Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, + const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) { int dispatchedCount = 0; @@ -1954,7 +1983,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess if(invokeNum) { invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); - + // // Don't increase count, the dispatch count is // decreased when the incoming reply is sent. @@ -2007,7 +2036,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) // // If there are no callbacks to call, we don't call ioCompleted() since we're not going - // to call code that will potentially block (this avoids promoting a new leader and + // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) @@ -2030,6 +2059,24 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) void Ice::ConnectionI::finish() { + if(!_initialized) + { + if(_instance->traceLevels()->network >= 2) + { + string verb = _connector ? "establish" : "accept"; + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() << "\n" << *_exception.get(); + } + } + else + { + if(_instance->traceLevels()->network >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "closed " << _endpoint->protocol() << " connection\n" << toString(); + } + } + if(_startCallback) { _startCallback->connectionStartFailed(this, *_exception.get()); @@ -2041,16 +2088,16 @@ Ice::ConnectionI::finish() if(!_writeStream.b.empty()) { // - // Return the stream to the outgoing call. This is important for + // Return the stream to the outgoing call. This is important for // retriable AMI calls which are not marshalled again. // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); - + #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // // The current message might be sent but not yet removed from _sendStreams. If - // the response has been received in the meantime, we remove the message from + // the response has been received in the meantime, we remove the message from // _sendStreams to not call finished on a message which is already done. // if(message->isSent || message->receivedReply) @@ -2128,6 +2175,7 @@ Ice::ConnectionI::finish() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateFinished); + if(_dispatchCount == 0) { reap(); @@ -2264,6 +2312,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _dispatchCount(0), _state(StateNotInitialized), _shutdownInitiated(false), + _initialized(false), _validated(false) { int& compressionLevel = const_cast<int&>(_compressionLevel); @@ -2343,7 +2392,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) assert(_state != StateClosed); _exception.reset(ex.ice_clone()); - + // // We don't warn if we are not validated. // @@ -2522,7 +2571,7 @@ Ice::ConnectionI::setState(State state) if(oldState != newState) { _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), - _endpoint, + _endpoint, newState, _observer.get())); } @@ -2561,11 +2610,11 @@ Ice::ConnectionI::initiateShutdown() { assert(_state == StateClosing); assert(_dispatchCount == 0); - + if(_shutdownInitiated) { return; - } + } _shutdownInitiated = true; if(!_endpoint->datagram()) @@ -2591,7 +2640,7 @@ Ice::ConnectionI::initiateShutdown() // // Notify the the transceiver of the graceful connection closure. - // + // SocketOperation op = _transceiver->closing(true, *_exception.get()); if(op) { @@ -2648,8 +2697,8 @@ Ice::ConnectionI::initialize(SocketOperation operation) // Update the connection description once the transceiver is initialized. // const_cast<string&>(_desc) = _transceiver->toString(); + _initialized = true; setState(StateNotValidated); - return true; } @@ -2682,7 +2731,7 @@ Ice::ConnectionI::validate(SocketOperation operation) if(_writeStream.i != _writeStream.b.end()) { - SocketOperation op = _transceiver->write(_writeStream); + SocketOperation op = write(_writeStream); if(op) { scheduleTimeout(op); @@ -2711,7 +2760,7 @@ Ice::ConnectionI::validate(SocketOperation operation) if(_readStream.i != _readStream.b.end()) { - SocketOperation op = _transceiver->read(_readStream, _hasMoreData); + SocketOperation op = read(_readStream); if(op) { scheduleTimeout(op); @@ -2771,6 +2820,21 @@ Ice::ConnectionI::validate(SocketOperation operation) _readStream.i = _readStream.b.begin(); _readHeader = true; + if(_instance->traceLevels()->network >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + if(_endpoint->datagram()) + { + out << "starting to " << (_connector ? "send" : "receive") << " " << _endpoint->protocol() << " messages\n"; + out << _transceiver->toDetailedString(); + } + else + { + out << (_connector ? "established" : "accepted") << " " << _endpoint->protocol() << " connection\n"; + out << toString(); + } + } + return true; } @@ -2793,7 +2857,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) try { while(true) - { + { // // Notify the message that it was sent. // @@ -2822,7 +2886,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) // // This can occur if parseMessage (called before // sendNextMessage by message()) closes the connection. - // + // if(_state >= StateClosingPending) { return SocketOperationNone; @@ -2904,7 +2968,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) assert(_writeStream.i); if(_writeStream.i != _writeStream.b.end()) { - SocketOperation op = _transceiver->write(_writeStream); + SocketOperation op = write(_writeStream); if(op) { return op; @@ -2917,7 +2981,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) } // - // If all the messages were sent and we are in the closing state, we schedule + // If all the messages were sent and we are in the closing state, we schedule // the close timeout to wait for the peer to close the connection. // if(_state == StateClosing && _shutdownInitiated) @@ -2981,7 +3045,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { traceSend(*message.stream, _logger, _traceLevels); } - + // // Send the message without blocking. // @@ -2989,7 +3053,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { _observer.startWrite(stream); } - op = _transceiver->write(stream); + op = write(stream); if(!op) { if(_observer) @@ -3051,7 +3115,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { _observer.startWrite(*message.stream); } - op = _transceiver->write(*message.stream); + op = write(*message.stream); if(!op) { if(_observer) @@ -3232,7 +3296,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3294,7 +3358,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request // // Notify the the transceiver of the graceful connection closure. - // + // SocketOperation op = _transceiver->closing(false, *_exception.get()); if(op) { @@ -3418,8 +3482,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // - // If we just received the reply of a request which isn't acknowledge as - // sent yet, we queue the reply instead of processing it right away. It + // If we just received the reply of a request which isn't acknowledge as + // sent yet, we queue the reply instead of processing it right away. It // will be processed once the write callback is invoked for the message. // OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front(); @@ -3445,7 +3509,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { outAsync = 0; } -#endif +#endif notifyAll(); // Notify threads blocked in close(false) } @@ -3562,12 +3626,12 @@ Ice::ConnectionI::scheduleTimeout(SocketOperation status) timeout = _endpoint->timeout(); } } - + if(timeout < 0) { return; } - + try { if(status & IceInternal::SocketOperationRead) @@ -3636,6 +3700,46 @@ ConnectionI::toConnectionState(State state) const return connectionStateMap[static_cast<int>(state)]; } +SocketOperation +ConnectionI::read(Buffer& buf) +{ + Buffer::Container::iterator start = buf.i; + SocketOperation op = _transceiver->read(buf, _hasMoreData); + if(_instance->traceLevels()->network >= 3 && buf.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "received "; + if(_endpoint->datagram()) + { + out << buf.b.size(); + } + else + { + out << (buf.i - start) << " of " << (buf.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + return op; +} + +SocketOperation +ConnectionI::write(Buffer& buf) +{ + Buffer::Container::iterator start = buf.i; + SocketOperation op = _transceiver->write(buf); + if(_instance->traceLevels()->network >= 3 && buf.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "sent " << (buf.i - start); + if(!_endpoint->datagram()) + { + out << " of " << (buf.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + return op; +} + void ConnectionI::reap() { |