summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp964
1 files changed, 378 insertions, 586 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 50b3dffe9b3..ce042d87ba5 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -20,7 +20,6 @@
#include <Ice/ACM.h>
#include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager().
#include <Ice/EndpointI.h>
-#include <Ice/Outgoing.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/Incoming.h>
#include <Ice/LocalException.h>
@@ -38,14 +37,15 @@ using namespace Ice;
using namespace Ice::Instrumentation;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; }
+#endif
namespace
{
const ::std::string __flushBatchRequests_name = "flushBatchRequests";
-
class TimeoutCallback : public IceUtil::TimerTask
{
public:
@@ -72,8 +72,8 @@ public:
DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
- const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
- BasicStream& stream) :
+ const OutgoingAsyncBasePtr& outAsync, const ICE_HEARTBEAT_CALLBACK& heartbeatCallback,
+ InputStream& stream) :
DispatchWorkItem(connection),
_connection(connection),
_startCB(startCB),
@@ -108,8 +108,8 @@ private:
const ServantManagerPtr _servantManager;
const ObjectAdapterPtr _adapter;
const OutgoingAsyncBasePtr _outAsync;
- const ConnectionCallbackPtr _heartbeatCallback;
- BasicStream _stream;
+ const ICE_HEARTBEAT_CALLBACK _heartbeatCallback;
+ InputStream _stream;
};
class FinishCall : public DispatchWorkItem
@@ -211,7 +211,7 @@ Ice::ConnectionI::Observer::attach(const Ice::Instrumentation::ConnectionObserve
void
-Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
+Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str)
{
if(adopted)
{
@@ -228,7 +228,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
}
else if(!str)
{
- if(out || outAsync)
+ if(outAsync)
{
return; // Adopting request stream is not necessary.
}
@@ -240,7 +240,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
}
assert(str);
- stream = new BasicStream(str->instance(), currentProtocolEncoding);
+ stream = new OutputStream(str->instance(), currentProtocolEncoding);
stream->swap(*str);
adopted = true;
}
@@ -248,8 +248,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
void
Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream)
{
- assert((out || outAsync)); // Only requests can timeout.
- out = 0;
+ assert(outAsync); // Only requests can timeout.
outAsync = 0;
if(adoptStream)
{
@@ -270,11 +269,7 @@ Ice::ConnectionI::OutgoingMessage::sent()
}
stream = 0;
- if(out)
- {
- out->sent();
- }
- else if(outAsync)
+ if(outAsync)
{
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
invokeSent = outAsync->sent();
@@ -289,15 +284,11 @@ Ice::ConnectionI::OutgoingMessage::sent()
void
Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex)
{
- if(out)
- {
- out->completed(ex);
- }
- else if(outAsync)
+ if(outAsync)
{
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompleted();
+ outAsync->invokeException();
}
}
@@ -316,7 +307,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
{
- assert(_exception.get());
+ assert(_exception);
_exception->ice_throw();
}
@@ -338,7 +329,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
if(_state >= StateClosing)
{
- assert(_exception.get());
+ assert(_exception);
_exception->ice_throw();
}
}
@@ -353,7 +344,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
exception(ex);
if(callback)
{
- callback->connectionStartFailed(this, *_exception.get());
+ callback->connectionStartFailed(ICE_SHARED_FROM_THIS, ex);
return;
}
else
@@ -365,7 +356,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
if(callback)
{
- callback->connectionStartCompleted(this);
+ callback->connectionStartCompleted(ICE_SHARED_FROM_THIS);
}
}
@@ -435,7 +426,7 @@ Ice::ConnectionI::close(bool force)
// requests to be retried, regardless of whether the server
// has processed them or not.
//
- while(!_requests.empty() || !_asyncRequests.empty())
+ while(!_asyncRequests.empty())
{
wait();
}
@@ -486,7 +477,7 @@ Ice::ConnectionI::throwException() const
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_exception.get())
+ if(_exception)
{
assert(_state >= StateClosing);
_exception->ice_throw();
@@ -538,10 +529,12 @@ Ice::ConnectionI::updateObserver()
}
assert(_instance->initializationData().observer);
- _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer.get()));
+
+ ConnectionObserverPtr o = _instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer.get());
+ _observer.attach(o);
}
void
@@ -588,8 +581,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout))
{
- if(acm.close == CloseOnIdleForceful ||
- (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty())))
+ if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && !_asyncRequests.empty()))
{
//
// Close the connection if we didn't receive a heartbeat in
@@ -597,8 +589,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
//
setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__));
}
- else if(acm.close != CloseOnInvocation &&
- _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty())
+ else if(acm.close != CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue->isEmpty() &&
+ _asyncRequests.empty())
{
//
// The connection is idle, close it.
@@ -608,110 +600,21 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
}
}
-bool
-Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum)
-{
- BasicStream* os = out->os();
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_exception.get())
- {
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw RetryException(*_exception.get());
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Ensure the message isn't bigger than what we can send with the
- // transport.
- //
- _transceiver->checkSendSize(*os);
-
- Int requestId = 0;
- if(response)
- {
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
- }
- else if(batchRequestNum > 0)
- {
- const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
- }
-
- out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
-
- //
- // Send the message. If it can't be sent without blocking the message is added
- // to _sendStreams and it will be sent by the selector thread.
- //
- bool sent = false;
- try
- {
- OutgoingMessage message(out, os, compress, requestId);
- sent = sendMessage(message) & AsyncStatusSent;
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- if(response)
- {
- //
- // Add to the requests map.
- //
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out));
- }
-
- return sent;
-}
-
AsyncStatus
Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum)
{
- BasicStream* os = out->getOs();
+ OutputStream* os = out->getOs();
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_exception.get())
+ //
+ // If the exception is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
+ if(_exception)
{
- //
- // If the exception is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw RetryException(*_exception.get());
+ throw RetryException(*_exception);
}
-
assert(_state > StateNotValidated);
assert(_state < StateClosing);
@@ -725,8 +628,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres
// Notify the request that it's cancelable with this connection.
// This will throw if the request is canceled.
//
- out->cancelable(this);
-
+ out->cancelable(ICE_SHARED_FROM_THIS);
Int requestId = 0;
if(response)
{
@@ -771,7 +673,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- assert(_exception.get());
+ assert(_exception);
_exception->ice_throw();
}
@@ -792,11 +694,38 @@ Ice::ConnectionI::getBatchRequestQueue() const
return _batchRequestQueue;
}
+#ifdef ICE_CPP11_MAPPING
+void
+Ice::ConnectionI::flushBatchRequests()
+{
+ Connection::flushBatchRequestsAsync().get();
+}
+
+std::function<void()>
+Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex,
+ ::std::function<void(bool)> sent)
+{
+ class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke
+ {
+ public:
+
+ ConnectionFlushBatchLambda(std::shared_ptr<Ice::ConnectionI>&& connection,
+ const InstancePtr& instance,
+ std::function<void(std::exception_ptr)> ex,
+ std::function<void(bool)> sent) :
+ ConnectionFlushBatchAsync(connection, instance), LambdaInvoke(std::move(ex), std::move(sent))
+ {
+ }
+ };
+ auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent);
+ outAsync->invoke(__flushBatchRequests_name);
+ return [outAsync]() { outAsync->cancel(); };
+}
+#else
void
Ice::ConnectionI::flushBatchRequests()
{
- ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name);
- out.invoke();
+ end_flushBatchRequests(begin_flushBatchRequests());
}
AsyncResultPtr
@@ -819,55 +748,49 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception,
- const IceInternal::Function<void (bool)>& sent)
+Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
-#ifdef ICE_CPP11
- class Cpp11CB : public IceInternal::Cpp11FnCallbackNC
+ class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion
{
public:
- Cpp11CB(const IceInternal::Function<void (const Exception&)>& excb,
- const IceInternal::Function<void (bool)>& sentcb) :
- IceInternal::Cpp11FnCallbackNC(excb, sentcb)
+ ConnectionFlushBatchAsyncWithCallback(const Ice::ConnectionIPtr& connection,
+ const Ice::CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const CallbackBasePtr& callback,
+ const Ice::LocalObjectPtr& cookie) :
+ ConnectionFlushBatchAsync(connection, instance),
+ CallbackCompletion(callback, cookie),
+ _communicator(communicator),
+ _connection(connection)
{
- CallbackBase::checkCallback(true, excb != nullptr);
+ _cookie = cookie;
}
- virtual void
- completed(const AsyncResultPtr& __result) const
+ virtual Ice::CommunicatorPtr getCommunicator() const
{
- ConnectionPtr __con = __result->getConnection();
- assert(__con);
- try
- {
- __con->end_flushBatchRequests(__result);
- assert(false);
- }
- catch(const Exception& ex)
- {
- IceInternal::Cpp11FnCallbackNC::exception(__result, ex);
- }
+ return _communicator;
}
- };
- return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0);
-#else
- assert(false); // Ice not built with C++11 support.
- return 0;
-#endif
-}
+ virtual Ice::ConnectionPtr getConnection() const
+ {
+ return _connection;
+ }
-AsyncResultPtr
-Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
-{
- ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(this,
- _communicator,
- _instance,
- __flushBatchRequests_name,
- cb,
- cookie);
- result->invoke();
+ virtual const std::string&
+ getOperation() const
+ {
+ return __flushBatchRequests_name;
+ }
+
+ private:
+
+ Ice::CommunicatorPtr _communicator;
+ Ice::ConnectionPtr _connection;
+ };
+
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie);
+ result->invoke(__flushBatchRequests_name);
return result;
}
@@ -877,52 +800,70 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
AsyncResult::__check(r, this, __flushBatchRequests_name);
r->__wait();
}
+#endif
void
-Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
+Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback)
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _heartbeatCallback = callback;
+}
+
+void
+Ice::ConnectionI::setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK) callback)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_state >= StateClosed)
+ if(callback)
{
- if(callback)
+ class CallbackWorkItem : public DispatchWorkItem
{
- class CallbackWorkItem : public DispatchWorkItem
- {
- public:
+ public:
- CallbackWorkItem(const ConnectionIPtr& connection, const ConnectionCallbackPtr& callback) :
- _connection(connection),
- _callback(callback)
- {
- }
+ CallbackWorkItem(const ConnectionIPtr& connection, ICE_IN(ICE_CLOSE_CALLBACK) callback) :
+ _connection(connection),
+#ifdef ICE_CPP11_MAPPING
+ _callback(move(callback))
+#else
+ _callback(callback)
+#endif
+ {
+ }
- virtual void run()
- {
- _connection->closeCallback(_callback);
- }
+ virtual void run()
+ {
+ _connection->closeCallback(_callback);
+ }
- private:
+ private:
- const ConnectionIPtr _connection;
- const ConnectionCallbackPtr _callback;
- };
- _threadPool->dispatch(new CallbackWorkItem(this, callback));
- }
- }
- else
- {
- _callback = callback;
+ const ConnectionIPtr _connection;
+ const ICE_CLOSE_CALLBACK _callback;
+ };
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->dispatch(new CallbackWorkItem(ICE_SHARED_FROM_THIS, move(callback)));
+#else
+ _threadPool->dispatch(new CallbackWorkItem(ICE_SHARED_FROM_THIS, callback));
+#endif
}
}
+ else
+ {
+ _closeCallback = callback;
+ }
}
void
-Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback)
+Ice::ConnectionI::closeCallback(const ICE_CLOSE_CALLBACK& callback)
{
try
{
- callback->closed(this);
+#ifdef ICE_CPP11_MAPPING
+ callback(ICE_SHARED_FROM_THIS);
+#else
+ callback->closed(ICE_SHARED_FROM_THIS);
+#endif
}
catch(const std::exception& ex)
{
@@ -949,7 +890,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
if(_state == StateActive)
{
- _monitor->remove(this);
+ _monitor->remove(ICE_SHARED_FROM_THIS);
}
_monitor = _monitor->acm(timeout, close, heartbeat);
@@ -964,7 +905,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
if(_state == StateActive)
{
- _monitor->add(this);
+ _monitor->add(ICE_SHARED_FROM_THIS);
}
}
@@ -980,96 +921,6 @@ Ice::ConnectionI::getACM()
}
void
-Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_state >= StateClosed)
- {
- return; // The request has already been or will be shortly notified of the failure.
- }
-
- for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
- {
- if(o->out == out)
- {
- if(o->requestId)
- {
- if(_requestsHint != _requests.end() && _requestsHint->second == out)
- {
- _requests.erase(_requestsHint);
- _requestsHint = _requests.end();
- }
- else
- {
- _requests.erase(o->requestId);
- }
- }
-
- if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
- {
- setState(StateClosed, ex);
- }
- else
- {
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- if(o == _sendStreams.begin())
- {
- o->canceled(true); // true = adopt the stream.
- }
- else
- {
- o->canceled(false);
- _sendStreams.erase(o);
- }
- out->completed(ex);
- }
- return;
- }
- }
-
- if(dynamic_cast<Outgoing*>(out))
- {
- if(_requestsHint != _requests.end() && _requestsHint->second == out)
- {
- if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
- {
- setState(StateClosed, ex);
- }
- else
- {
- out->completed(ex);
- _requests.erase(_requestsHint);
- _requestsHint = _requests.end();
- }
- return;
- }
- else
- {
- for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->second == out)
- {
- if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
- {
- setState(StateClosed, ex);
- }
- else
- {
- p->second->completed(ex);
- assert(p != _requestsHint);
- _requests.erase(p);
- }
- return; // We're done.
- }
- }
- }
- }
-}
-
-void
Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
//
@@ -1089,7 +940,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
if(o->requestId)
{
if(_asyncRequestsHint != _asyncRequests.end() &&
- _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
+ _asyncRequestsHint->second == ICE_DYNAMIC_CAST(OutgoingAsync, outAsync))
{
_asyncRequests.erase(_asyncRequestsHint);
_asyncRequestsHint = _asyncRequests.end();
@@ -1119,16 +970,16 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
o->canceled(false);
_sendStreams.erase(o);
}
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
}
return;
}
}
- if(OutgoingAsyncPtr::dynamicCast(outAsync))
+ if(ICE_DYNAMIC_CAST(OutgoingAsync, outAsync))
{
if(_asyncRequestsHint != _asyncRequests.end())
{
@@ -1142,9 +993,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
{
_asyncRequests.erase(_asyncRequestsHint);
_asyncRequestsHint = _asyncRequests.end();
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
}
return;
@@ -1163,9 +1014,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
{
assert(p != _asyncRequestsHint);
_asyncRequests.erase(p);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
}
return;
@@ -1175,7 +1026,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
void
-Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*amd*/)
+Ice::ConnectionI::sendResponse(Int, OutputStream* os, Byte compressFlag, bool /*amd*/)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
@@ -1193,7 +1044,7 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*a
if(_state >= StateClosed)
{
- assert(_exception.get());
+ assert(_exception);
_exception->ice_throw();
}
@@ -1232,7 +1083,7 @@ Ice::ConnectionI::sendNoResponse()
if(_state >= StateClosed)
{
- assert(_exception.get());
+ assert(_exception);
_exception->ice_throw();
}
@@ -1335,15 +1186,15 @@ Ice::ConnectionI::getEndpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
-ObjectPrx
+ObjectPrxPtr
Ice::ConnectionI::createProxy(const Identity& ident) const
{
//
// Create a reference and return a reverse proxy for this
// reference.
//
- ConnectionIPtr self = const_cast<ConnectionI*>(this);
- return _instance->proxyFactory()->referenceToProxy(_instance->referenceFactory()->create(ident, self));
+ return _instance->proxyFactory()->referenceToProxy(
+ _instance->referenceFactory()->create(ident, ICE_SHARED_FROM_CONST_THIS(ConnectionI)));
}
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
@@ -1372,19 +1223,12 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
- if(!_hasMoreData)
- {
- if(_observer && !_readHeader)
- {
- _observer.startRead(_readStream);
- }
-
- _transceiver->startRead(_readStream);
- }
- else
+ if(_observer && !_readHeader)
{
- _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead);
+ _observer.startRead(_readStream);
}
+
+ _transceiver->startRead(_readStream);
}
}
catch(const Ice::LocalException& ex)
@@ -1422,29 +1266,26 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
- if(!_hasMoreData)
+ Buffer::Container::iterator start = _readStream.i;
+ _transceiver->finishRead(_readStream);
+ if(_instance->traceLevels()->network >= 3 && _readStream.i != start)
{
- Buffer::Container::iterator start = _readStream.i;
- _transceiver->finishRead(_readStream, _hasMoreData);
- if(_instance->traceLevels()->network >= 3 && _readStream.i != start)
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+ out << "received ";
+ if(_endpoint->datagram())
{
- Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
- out << "received ";
- if(_endpoint->datagram())
- {
- out << _readStream.b.size();
- }
- else
- {
- out << (_readStream.i - start) << " of " << (_readStream.b.end() - start);
- }
- out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ out << _readStream.b.size();
}
-
- if(_observer && !_readHeader)
+ else
{
- _observer.finishRead(_readStream);
+ out << (_readStream.i - start) << " of " << (_readStream.b.end() - start);
}
+ out << " bytes via " << _endpoint->protocol() << "\n" << toString();
+ }
+
+ if(_observer && !_readHeader)
+ {
+ _observer.finishRead(_readStream);
}
}
}
@@ -1467,11 +1308,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
ServantManagerPtr servantManager;
ObjectAdapterPtr adapter;
OutgoingAsyncBasePtr outAsync;
- ConnectionCallbackPtr heartbeatCallback;
+ ICE_HEARTBEAT_CALLBACK heartbeatCallback;
int dispatchCount = 0;
ThreadPoolMessage<ConnectionI> msg(current, *this);
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1603,7 +1443,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
// satisfied before continuing.
//
scheduleTimeout(newOp);
- _threadPool->update(this, current.operation, newOp);
+ _threadPool->update(ICE_SHARED_FROM_THIS, current.operation, newOp);
return;
}
@@ -1617,7 +1457,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
return;
}
- _threadPool->unregister(this, current.operation);
+ _threadPool->unregister(ICE_SHARED_FROM_THIS, current.operation);
//
// We start out in holding state.
@@ -1665,7 +1505,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
if(_state < StateClosed)
{
scheduleTimeout(newOp);
- _threadPool->update(this, current.operation, newOp);
+ _threadPool->update(ICE_SHARED_FROM_THIS, current.operation, newOp);
}
}
@@ -1727,9 +1567,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
else
{
- _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum,
- servantManager, adapter, outAsync, heartbeatCallback,
- current.stream));
+ _threadPool->dispatchFromThisThread(new DispatchCall(ICE_SHARED_FROM_THIS, startCB, sentCBs, compress, requestId,
+ invokeNum, servantManager, adapter, outAsync,
+ heartbeatCallback, current.stream));
+
}
}
@@ -1737,7 +1578,7 @@ void
ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs,
Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync,
- const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
+ const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, InputStream& stream)
{
int dispatchedCount = 0;
@@ -1747,7 +1588,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(startCB)
{
- startCB->connectionStartCompleted(this);
+ startCB->connectionStartCompleted(ICE_SHARED_FROM_THIS);
++dispatchedCount;
}
@@ -1765,10 +1606,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
if(p->receivedReply)
{
- OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
- if(outAsync->completed())
+ OutgoingAsyncPtr o = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync);
+ if(o->response())
{
- outAsync->invokeCompleted();
+ o->invokeResponse();
}
}
#else
@@ -1784,7 +1625,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->invokeCompleted();
+ outAsync->invokeResponse();
++dispatchedCount;
}
@@ -1792,7 +1633,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
{
try
{
- heartbeatCallback->heartbeat(this);
+#ifdef ICE_CPP11_MAPPING
+ heartbeatCallback(ICE_SHARED_FROM_THIS);
+#else
+ heartbeatCallback->heartbeat(ICE_SHARED_FROM_THIS);
+#endif
}
catch(const std::exception& ex)
{
@@ -1871,7 +1716,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
// to call code that will potentially block (this avoids promoting a new leader and
// unecessary thread creation, especially if this is called on shutdown).
//
- if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback)
+ if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_closeCallback && !_heartbeatCallback)
{
finish(close);
return;
@@ -1884,7 +1729,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
}
else
{
- _threadPool->dispatchFromThisThread(new FinishCall(this, close));
+ _threadPool->dispatchFromThisThread(new FinishCall(ICE_SHARED_FROM_THIS, close));
}
}
@@ -1897,8 +1742,9 @@ Ice::ConnectionI::finish(bool close)
{
string verb = _connector ? "establish" : "accept";
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
+
out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString()
- << "\n" << *_exception.get();
+ << "\n" << *_exception;
}
}
else
@@ -1908,28 +1754,35 @@ Ice::ConnectionI::finish(bool close)
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
out << "closed " << _endpoint->protocol() << " connection\n" << toString();
- //
- // Trace the cause of unexpected connection closures
- //
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get())))
{
- out << "\n" << *_exception.get();
+ out << "\n" << *_exception;
}
}
}
if(close)
{
- _transceiver->close();
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Error out(_logger);
+ out << "unexpected connection exception:\n" << ex << '\n' << _desc;
+ }
}
if(_startCallback)
{
- _startCallback->connectionStartFailed(this, *_exception.get());
+ assert(_exception);
+
+ _startCallback->connectionStartFailed(ICE_SHARED_FROM_THIS, *_exception);
_startCallback = 0;
}
@@ -1958,10 +1811,10 @@ Ice::ConnectionI::finish(bool close)
}
if(message->receivedReply)
{
- OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
- if(outAsync->completed())
+ OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, message->outAsync);
+ if(outAsync->response())
{
- outAsync->invokeCompleted();
+ outAsync->invokeResponse();
}
}
_sendStreams.pop_front();
@@ -1969,37 +1822,27 @@ Ice::ConnectionI::finish(bool close)
#endif
}
+
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
- o->completed(*_exception.get());
+ o->completed(*_exception);
if(o->requestId) // Make sure finished isn't called twice.
{
- if(o->out)
- {
- _requests.erase(o->requestId);
- }
- else
- {
- _asyncRequests.erase(o->requestId);
- }
+ _asyncRequests.erase(o->requestId);
}
}
- _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
- }
- for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- p->second->completed(*_exception.get());
+ _sendStreams.clear();
}
- _requests.clear();
for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- if(q->second->completed(*_exception.get()))
+ if(q->second->exception(*_exception))
{
- q->second->invokeCompleted();
+ q->second->invokeException();
}
}
+
_asyncRequests.clear();
//
@@ -2010,12 +1853,14 @@ Ice::ConnectionI::finish(bool close)
_readStream.clear();
_readStream.b.clear();
- if(_callback)
+ if(_closeCallback)
{
- closeCallback(_callback);
- _callback = 0;
+ closeCallback(_closeCallback);
+ _closeCallback = ICE_NULLPTR;
}
+ _heartbeatCallback = ICE_NULLPTR;
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -2131,7 +1976,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0),
_compressionLevel(1),
_nextRequestId(1),
- _requestsHint(_requests.end()),
_asyncRequestsHint(_asyncRequests.end()),
_messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()),
_batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())),
@@ -2166,36 +2010,39 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
{
_acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
+}
- __setNoDelete(true);
- try
+Ice::ConnectionIPtr
+Ice::ConnectionI::create(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const ACMMonitorPtr& monitor,
+ const TransceiverPtr& transceiver,
+ const ConnectorPtr& connector,
+ const EndpointIPtr& endpoint,
+ const ObjectAdapterIPtr& adapter)
+{
+ Ice::ConnectionIPtr conn(new ConnectionI(communicator, instance, monitor, transceiver, connector,
+ endpoint, adapter));
+ if(adapter)
{
- if(adapter)
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = adapter->getThreadPool();
- }
- else
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
- }
- _threadPool->initialize(this);
+ const_cast<ThreadPoolPtr&>(conn->_threadPool) = adapter->getThreadPool();
}
- catch(const IceUtil::Exception&)
+ else
{
- __setNoDelete(false);
- throw;
+ const_cast<ThreadPoolPtr&>(conn->_threadPool) = conn->_instance->clientThreadPool();
}
- __setNoDelete(false);
+ conn->_threadPool->initialize(conn);
+ return conn;
}
Ice::ConnectionI::~ConnectionI()
{
assert(!_startCallback);
- assert(!_callback);
+ assert(!_closeCallback);
+ assert(!_heartbeatCallback);
assert(_state == StateFinished);
assert(_dispatchCount == 0);
assert(_sendStreams.empty());
- assert(_requests.empty());
assert(_asyncRequests.empty());
}
@@ -2213,15 +2060,13 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
return;
}
- if(!_exception.get())
+ if(!_exception)
{
//
// If we are in closed state, an exception must be set.
//
assert(_state != StateClosed);
-
- _exception.reset(ex.ice_clone());
-
+ ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone());
//
// We don't warn if we are not validated.
//
@@ -2230,15 +2075,15 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
//
// Don't warn about certain expected exceptions.
//
- if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
- dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
- (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing)))
+ if(!(dynamic_cast<const CloseConnectionException*>(&ex) ||
+ dynamic_cast<const ForcedCloseConnectionException*>(&ex) ||
+ dynamic_cast<const ConnectionTimeoutException*>(&ex) ||
+ dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
+ (dynamic_cast<const ConnectionLostException*>(&ex) && _state >= StateClosing)))
{
Warning out(_logger);
- out << "connection exception:\n" << *_exception.get() << '\n' << _desc;
+ out << "connection exception:\n" << ex << '\n' << _desc;
}
}
}
@@ -2280,92 +2125,92 @@ Ice::ConnectionI::setState(State state)
{
switch(state)
{
- case StateNotInitialized:
- {
- assert(false);
- break;
- }
-
- case StateNotValidated:
- {
- if(_state != StateNotInitialized)
+ case StateNotInitialized:
{
- assert(_state == StateClosed);
- return;
+ assert(false);
+ break;
}
- break;
- }
- case StateActive:
- {
- //
- // Can only switch from holding or not validated to
- // active.
- //
- if(_state != StateHolding && _state != StateNotValidated)
+ case StateNotValidated:
{
- return;
+ if(_state != StateNotInitialized)
+ {
+ assert(_state == StateClosed);
+ return;
+ }
+ break;
}
- _threadPool->_register(this, SocketOperationRead);
- break;
- }
- case StateHolding:
- {
- //
- // Can only switch from active or not validated to
- // holding.
- //
- if(_state != StateActive && _state != StateNotValidated)
+ case StateActive:
{
- return;
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
+ }
+ _threadPool->_register(ICE_SHARED_FROM_THIS, SocketOperationRead);
+ break;
}
- if(_state == StateActive)
+
+ case StateHolding:
{
- _threadPool->unregister(this, SocketOperationRead);
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return;
+ }
+ if(_state == StateActive)
+ {
+ _threadPool->unregister(ICE_SHARED_FROM_THIS, SocketOperationRead);
+ }
+ break;
}
- break;
- }
- case StateClosing:
- case StateClosingPending:
- {
- //
- // Can't change back from closing pending.
- //
- if(_state >= StateClosingPending)
+ case StateClosing:
+ case StateClosingPending:
{
- return;
+ //
+ // Can't change back from closing pending.
+ //
+ if(_state >= StateClosingPending)
+ {
+ return;
+ }
+ break;
}
- break;
- }
- case StateClosed:
- {
- if(_state == StateFinished)
+ case StateClosed:
{
- return;
- }
+ if(_state == StateFinished)
+ {
+ return;
+ }
- _batchRequestQueue->destroy(*_exception.get());
+ _batchRequestQueue->destroy(*_exception);
- //
- // Don't need to close now for connections so only close the transceiver
- // if the selector request it.
- //
- if(_threadPool->finish(this, false))
- {
- _transceiver->close();
+ //
+ // Don't need to close now for connections so only close the transceiver
+ // if the selector request it.
+ //
+ if(_threadPool->finish(ICE_SHARED_FROM_THIS, false))
+ {
+ _transceiver->close();
+ }
+ break;
}
- break;
- }
- case StateFinished:
- {
- assert(_state == StateClosed);
- _communicator = 0;
- break;
- }
+ case StateFinished:
+ {
+ assert(_state == StateClosed);
+ _communicator = 0;
+ break;
+ }
}
}
catch(const Ice::LocalException& ex)
@@ -2388,11 +2233,11 @@ Ice::ConnectionI::setState(State state)
{
_acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
- _monitor->add(this);
+ _monitor->add(ICE_SHARED_FROM_THIS);
}
else if(_state == StateActive)
{
- _monitor->remove(this);
+ _monitor->remove(ICE_SHARED_FROM_THIS);
}
}
@@ -2407,7 +2252,7 @@ Ice::ConnectionI::setState(State state)
newState,
_observer.get()));
}
- if(_observer && state == StateClosed && _exception.get())
+ if(_observer && state == StateClosed && _exception)
{
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
@@ -2416,7 +2261,7 @@ Ice::ConnectionI::setState(State state)
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
(dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing)))
{
- _observer->failed(_exception->ice_name());
+ _observer->failed(_exception->ice_id());
}
}
}
@@ -2454,7 +2299,7 @@ Ice::ConnectionI::initiateShutdown()
//
// Before we shut down, we send a close connection message.
//
- BasicStream os(_instance.get(), Ice::currentProtocolEncoding);
+ OutputStream os(_instance.get(), Ice::currentProtocolEncoding);
os.write(magic[0]);
os.write(magic[1]);
os.write(magic[2]);
@@ -2473,11 +2318,11 @@ Ice::ConnectionI::initiateShutdown()
//
// Notify the the transceiver of the graceful connection closure.
//
- SocketOperation op = _transceiver->closing(true, *_exception.get());
+ SocketOperation op = _transceiver->closing(true, *_exception);
if(op)
{
scheduleTimeout(op);
- _threadPool->_register(this, op);
+ _threadPool->_register(ICE_SHARED_FROM_THIS, op);
}
}
}
@@ -2490,7 +2335,7 @@ Ice::ConnectionI::heartbeat()
if(!_endpoint->datagram())
{
- BasicStream os(_instance.get(), Ice::currentProtocolEncoding);
+ OutputStream os(_instance.get(), Ice::currentProtocolEncoding);
os.write(magic[0]);
os.write(magic[1]);
os.write(magic[2]);
@@ -2509,7 +2354,7 @@ Ice::ConnectionI::heartbeat()
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- assert(_exception.get());
+ assert(_exception);
}
}
}
@@ -2517,11 +2362,11 @@ Ice::ConnectionI::heartbeat()
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
- SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData);
+ SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
if(s != SocketOperationNone)
{
scheduleTimeout(s);
- _threadPool->update(this, operation, s);
+ _threadPool->update(ICE_SHARED_FROM_THIS, operation, s);
return false;
}
@@ -2567,7 +2412,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(op)
{
scheduleTimeout(op);
- _threadPool->update(this, operation, op);
+ _threadPool->update(ICE_SHARED_FROM_THIS, operation, op);
return false;
}
}
@@ -2596,7 +2441,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(op)
{
scheduleTimeout(op);
- _threadPool->update(this, operation, op);
+ _threadPool->update(ICE_SHARED_FROM_THIS, operation, op);
return false;
}
}
@@ -2740,17 +2585,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
//
// Do compression.
//
- BasicStream stream(_instance.get(), Ice::currentProtocolEncoding);
+ OutputStream stream(_instance.get(), Ice::currentProtocolEncoding);
doCompress(*message->stream, stream);
- if(message->outAsync)
- {
- trace("sending asynchronous request", *message->stream, _logger, _traceLevels);
- }
- else
- {
- traceSend(*message->stream, _logger, _traceLevels);
- }
+ traceSend(*message->stream, _logger, _traceLevels);
message->adopt(&stream); // Adopt the compressed stream.
message->stream->i = message->stream->b.begin();
@@ -2777,14 +2615,8 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
copy(p, p + sizeof(Int), message->stream->b.begin() + 10);
#endif
message->stream->i = message->stream->b.begin();
- if(message->outAsync)
- {
- trace("sending asynchronous request", *message->stream, _logger, _traceLevels);
- }
- else
- {
- traceSend(*message->stream, _logger, _traceLevels);
- }
+ traceSend(*message->stream, _logger, _traceLevels);
+
#ifdef ICE_HAS_BZIP2
}
#endif
@@ -2819,7 +2651,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
if(_state == StateClosing && _shutdownInitiated)
{
setState(StateClosingPending);
- SocketOperation op = _transceiver->closing(true, *_exception.get());
+ SocketOperation op = _transceiver->closing(true, *_exception);
if(op)
{
return op;
@@ -2865,18 +2697,11 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
// Do compression.
//
- BasicStream stream(_instance.get(), Ice::currentProtocolEncoding);
+ OutputStream stream(_instance.get(), Ice::currentProtocolEncoding);
doCompress(*message.stream, stream);
stream.i = stream.b.begin();
- if(message.outAsync)
- {
- trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
- }
- else
- {
- traceSend(*message.stream, _logger, _traceLevels);
- }
+ traceSend(*message.stream, _logger, _traceLevels);
//
// Send the message without blocking.
@@ -2931,14 +2756,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
#endif
message.stream->i = message.stream->b.begin();
- if(message.outAsync)
- {
- trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
- }
- else
- {
- traceSend(*message.stream, _logger, _traceLevels);
- }
+ traceSend(*message.stream, _logger, _traceLevels);
//
// Send the message without blocking.
@@ -2974,7 +2792,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
_writeStream.swap(*_sendStreams.back().stream);
scheduleTimeout(op);
- _threadPool->_register(this, op);
+ _threadPool->_register(ICE_SHARED_FROM_THIS, op);
return AsyncStatusQueued;
}
@@ -3041,7 +2859,7 @@ getBZ2Error(int bzError)
}
void
-Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed)
+Ice::ConnectionI::doCompress(OutputStream& uncompressed, OutputStream& compressed)
{
const Byte* p;
@@ -3096,7 +2914,7 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed)
}
void
-Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompressed)
+Ice::ConnectionI::doUncompress(InputStream& compressed, InputStream& uncompressed)
{
Int uncompressedSize;
compressed.i = compressed.b.begin() + headerSize;
@@ -3131,9 +2949,9 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
#endif
SocketOperation
-Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
+Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ OutgoingAsyncBasePtr& outAsync, ICE_HEARTBEAT_CALLBACK& heartbeatCallback,
int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3169,7 +2987,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
if(compress == 2)
{
#ifdef ICE_HAS_BZIP2
- BasicStream ustream(_instance.get(), Ice::currentProtocolEncoding);
+ InputStream ustream(_instance.get(), Ice::currentProtocolEncoding);
doUncompress(stream, ustream);
stream.b.swap(ustream.b);
#else
@@ -3200,7 +3018,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
//
// Notify the the transceiver of the graceful connection closure.
//
- SocketOperation op = _transceiver->closing(false, *_exception.get());
+ SocketOperation op = _transceiver->closing(false, *_exception);
if(op)
{
return op;
@@ -3258,54 +3076,22 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
stream.read(requestId);
- map<Int, OutgoingBase*>::iterator p = _requests.end();
map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end();
- if(_requestsHint != _requests.end())
+ if(_asyncRequestsHint != _asyncRequests.end())
{
- if(_requestsHint->first == requestId)
+ if(_asyncRequestsHint->first == requestId)
{
- p = _requestsHint;
+ q = _asyncRequestsHint;
}
}
- if(p == _requests.end())
- {
- if(_asyncRequestsHint != _asyncRequests.end())
- {
- if(_asyncRequestsHint->first == requestId)
- {
- q = _asyncRequestsHint;
- }
- }
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
- {
- p = _requests.find(requestId);
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
+ if(q == _asyncRequests.end())
{
q = _asyncRequests.find(requestId);
}
- if(p != _requests.end())
- {
- p->second->completed(stream);
-
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
- notifyAll(); // Notify threads blocked in close(false)
- }
- else if(q != _asyncRequests.end())
+ if(q != _asyncRequests.end())
{
outAsync = q->second;
@@ -3333,7 +3119,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
- else if(outAsync->completed())
+ else if(outAsync->response())
{
++dispatchCount;
}
@@ -3342,7 +3128,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
outAsync = 0;
}
#else
- if(outAsync->completed())
+ if(outAsync->response())
{
++dispatchCount;
}
@@ -3360,9 +3146,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
case validateConnectionMsg:
{
traceRecv(stream, _logger, _traceLevels);
- if(_callback)
+ if(_heartbeatCallback)
{
- heartbeatCallback = _callback;
+ heartbeatCallback = _heartbeatCallback;
++dispatchCount;
}
break;
@@ -3395,7 +3181,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
void
-Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, Byte compress,
+Ice::ConnectionI::invokeAll(InputStream& stream, Int invokeNum, Int requestId, Byte compress,
const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter)
{
//
@@ -3530,11 +3316,17 @@ Ice::ConnectionI::initConnectionInfo() const
}
catch(const Ice::LocalException&)
{
- _info = new ConnectionInfo();
+ _info = ICE_MAKE_SHARED(ConnectionInfo);
+ }
+
+ Ice::ConnectionInfoPtr info = _info;
+ while(info)
+ {
+ info->connectionId = _endpoint->connectionId();
+ info->incoming = _connector == 0;
+ info->adapterName = _adapter ? _adapter->getName() : string();
+ info = info->underlying;
}
- _info->connectionId = _endpoint->connectionId();
- _info->incoming = _connector == 0;
- _info->adapterName = _adapter ? _adapter->getName() : string();
return _info;
}
@@ -3548,7 +3340,7 @@ SocketOperation
ConnectionI::read(Buffer& buf)
{
Buffer::Container::iterator start = buf.i;
- SocketOperation op = _transceiver->read(buf, _hasMoreData);
+ SocketOperation op = _transceiver->read(buf);
if(_instance->traceLevels()->network >= 3 && buf.i != start)
{
Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
@@ -3589,7 +3381,7 @@ ConnectionI::reap()
{
if(_monitor)
{
- _monitor->reap(this);
+ _monitor->reap(ICE_SHARED_FROM_THIS);
}
if(_observer)
{