summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2015-03-03 17:30:50 -0500
committerJoe George <joe@zeroc.com>2015-05-12 11:41:55 -0400
commitd35bb9f5c19e34aee31f83d445695a8186ef675e (patch)
treed5324eaf44f5f9776495537c51653f50a66a7237 /cpp/src/Ice/ConnectionI.cpp
downloadice-d35bb9f5c19e34aee31f83d445695a8186ef675e.tar.bz2
ice-d35bb9f5c19e34aee31f83d445695a8186ef675e.tar.xz
ice-d35bb9f5c19e34aee31f83d445695a8186ef675e.zip
Ice 3.4.2 Source Distributionv3.4.2
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp2912
1 files changed, 2912 insertions, 0 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
new file mode 100644
index 00000000000..2d942ca02b7
--- /dev/null
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -0,0 +1,2912 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+#include <Ice/ConnectionI.h>
+#include <Ice/Instance.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/Properties.h>
+#include <Ice/TraceUtil.h>
+#include <Ice/DefaultsAndOverrides.h>
+#include <Ice/Transceiver.h>
+#include <Ice/ThreadPool.h>
+#include <Ice/ConnectionMonitor.h>
+#include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager().
+#include <Ice/EndpointI.h>
+#include <Ice/Outgoing.h>
+#include <Ice/OutgoingAsync.h>
+#include <Ice/Incoming.h>
+#include <Ice/LocalException.h>
+#include <Ice/ReferenceFactory.h> // For createProxy().
+#include <Ice/ProxyFactory.h> // For createProxy().
+#include <bzlib.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+Ice::LocalObject* IceInternal::upCast(ConnectionI* p) { return p; }
+
+namespace
+{
+
+class TimeoutCallback : public IceUtil::TimerTask
+{
+public:
+
+ TimeoutCallback(Ice::ConnectionI* connection) : _connection(connection)
+ {
+ }
+
+ void
+ runTimerTask()
+ {
+ _connection->timedOut();
+ }
+
+private:
+
+ Ice::ConnectionI* _connection;
+};
+
+class DispatchDispatcherCall : public DispatcherCall
+{
+public:
+
+ DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
+ const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId,
+ Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
+ const OutgoingAsyncPtr& outAsync, BasicStream& stream) :
+ _connection(connection),
+ _startCB(startCB),
+ _sentCBs(sentCBs),
+ _compress(compress),
+ _requestId(requestId),
+ _invokeNum(invokeNum),
+ _servantManager(servantManager),
+ _adapter(adapter),
+ _outAsync(outAsync),
+ _stream(stream.instance())
+ {
+ _stream.swap(stream);
+ }
+
+ virtual void
+ run()
+ {
+ _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter,
+ _outAsync, _stream);
+ }
+
+private:
+
+ ConnectionIPtr _connection;
+ ConnectionI::StartCallbackPtr _startCB;
+ vector<OutgoingAsyncMessageCallbackPtr> _sentCBs;
+ Byte _compress;
+ Int _requestId;
+ Int _invokeNum;
+ ServantManagerPtr _servantManager;
+ ObjectAdapterPtr _adapter;
+ OutgoingAsyncPtr _outAsync;
+ BasicStream _stream;
+};
+
+class FinishDispatcherCall : public DispatcherCall
+{
+public:
+
+ FinishDispatcherCall(const Ice::ConnectionIPtr& connection) :
+ _connection(connection)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _connection->finish();
+ }
+
+private:
+
+ ConnectionIPtr _connection;
+};
+
+}
+
+void
+IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection)
+{
+ Lock sync(*this);
+ _connections.push_back(connection);
+}
+
+void
+IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connections)
+{
+ Lock sync(*this);
+ _connections.swap(connections);
+}
+
+void
+Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
+{
+ if(adopted)
+ {
+ if(str)
+ {
+ delete stream;
+ stream = 0;
+ adopted = false;
+ }
+ else
+ {
+ return; // Stream is already adopted.
+ }
+ }
+ else if(!str)
+ {
+ if(out || outAsync)
+ {
+ return; // Adopting request stream is not necessary.
+ }
+ else
+ {
+ str = stream; // Adopt this stream
+ stream = 0;
+ }
+ }
+
+ assert(str);
+ stream = new BasicStream(str->instance());
+ stream->swap(*str);
+ adopted = true;
+}
+
+bool
+Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify)
+{
+ isSent = true; // The message is sent.
+
+ if(adopted)
+ {
+ delete stream;
+ stream = 0;
+ }
+
+ if(out)
+ {
+ out->sent(notify); // true = notify the waiting thread that the request was sent.
+ return false;
+ }
+ else if(outAsync)
+ {
+ return outAsync->__sent(connection);
+ }
+ else
+ {
+ return false;
+ }
+}
+
+void
+Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
+{
+ if(out)
+ {
+ out->finished(ex, isSent);
+ }
+ else if(outAsync)
+ {
+ outAsync->__finished(ex, isSent);
+ }
+
+ if(adopted)
+ {
+ delete stream;
+ stream = 0;
+ }
+}
+
+void
+Ice::ConnectionI::start(const StartCallbackPtr& callback)
+{
+ try
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ if(!initialize() || !validate())
+ {
+ if(callback)
+ {
+ _startCallback = callback;
+ return;
+ }
+
+ //
+ // Wait for the connection to be validated.
+ //
+ while(_state <= StateNotValidated)
+ {
+ wait();
+ }
+
+ if(_state >= StateClosing)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+ }
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception(ex);
+ if(callback)
+ {
+ callback->connectionStartFailed(this, *_exception.get());
+ return;
+ }
+ else
+ {
+ waitUntilFinished();
+ throw;
+ }
+ }
+
+ if(callback)
+ {
+ callback->connectionStartCompleted(this);
+ }
+}
+
+void
+Ice::ConnectionI::activate()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state <= StateNotValidated)
+ {
+ return;
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+
+ setState(StateActive);
+}
+
+void
+Ice::ConnectionI::hold()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state <= StateNotValidated)
+ {
+ return;
+ }
+
+ setState(StateHolding);
+}
+
+void
+Ice::ConnectionI::destroy(DestructionReason reason)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ switch(reason)
+ {
+ case ObjectAdapterDeactivated:
+ {
+ setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
+ break;
+ }
+
+ case CommunicatorDestroyed:
+ {
+ setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
+ break;
+ }
+ }
+}
+
+void
+Ice::ConnectionI::close(bool force)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(force)
+ {
+ setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ }
+ else
+ {
+ //
+ // If we do a graceful shutdown, then we wait until all
+ // outstanding requests have been completed. Otherwise, the
+ // CloseConnectionException will cause all outstanding
+ // requests to be retried, regardless of whether the server
+ // has processed them or not.
+ //
+ while(!_requests.empty() || !_asyncRequests.empty())
+ {
+ wait();
+ }
+
+ setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ }
+}
+
+bool
+Ice::ConnectionI::isActiveOrHolding() const
+{
+ //
+ // We can not use trylock here, otherwise the outgoing connection
+ // factory might return destroyed (closing or closed) connections,
+ // resulting in connection retry exhaustion.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ return _state > StateNotValidated && _state < StateClosing;
+}
+
+bool
+Ice::ConnectionI::isFinished() const
+{
+ //
+ // We can use trylock here, because as long as there are still
+ // threads operating in this connection object, connection
+ // destruction is considered as not yet finished.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
+
+ if(!sync.acquired())
+ {
+ return false;
+ }
+
+ if(_state != StateFinished || _dispatchCount != 0)
+ {
+ return false;
+ }
+
+ assert(_state == StateFinished);
+ return true;
+}
+
+void
+Ice::ConnectionI::throwException() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_exception.get())
+ {
+ assert(_state >= StateClosing);
+ _exception->ice_throw();
+ }
+}
+
+void
+Ice::ConnectionI::waitUntilHolding() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(_state < StateHolding || _dispatchCount > 0)
+ {
+ wait();
+ }
+}
+
+void
+Ice::ConnectionI::waitUntilFinished()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // We wait indefinitely until the connection is finished and all
+ // outstanding requests are completed. Otherwise we couldn't
+ // guarantee that there are no outstanding calls when deactivate()
+ // is called on the servant locators.
+ //
+ while(_state < StateFinished || _dispatchCount > 0)
+ {
+ wait();
+ }
+
+ assert(_state == StateFinished);
+
+ //
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
+ //
+ _adapter = 0;
+}
+
+void
+Ice::ConnectionI::monitor(const IceUtil::Time& now)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
+ if(!sync.acquired())
+ {
+ return;
+ }
+
+ if(_state != StateActive)
+ {
+ return;
+ }
+
+ //
+ // Active connection management for idle connections.
+ //
+ if(_acmTimeout <= 0 ||
+ !_requests.empty() || !_asyncRequests.empty() || _dispatchCount > 0 ||
+ static_cast<Int>(_readStream.b.size()) > headerSize || !_writeStream.b.empty() || !_batchStream.b.empty())
+ {
+ return;
+ }
+
+ if(now >= _acmAbsoluteTimeout)
+ {
+ setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
+ }
+}
+
+bool
+Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
+{
+ BasicStream* os = out->os();
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_exception.get())
+ {
+ //
+ // If the connection is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
+ throw LocalExceptionWrapper(*_exception.get(), true);
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ //
+ // Ensure the message isn't bigger than what we can send with the
+ // transport.
+ //
+ _transceiver->checkSendSize(*os, _instance->messageSizeMax());
+
+ Int requestId = 0;
+ if(response)
+ {
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+ }
+
+ //
+ // Send the message. If it can't be sent without blocking the message is added
+ // to _sendStreams and it will be sent by the selector thread.
+ //
+ bool sent = false;
+ try
+ {
+ OutgoingMessage message(out, os, compress, requestId);
+ sent = sendMessage(message) & AsyncStatusSent;
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ if(response)
+ {
+ //
+ // Add to the requests map.
+ //
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
+
+ return sent;
+}
+
+AsyncStatus
+Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
+{
+ BasicStream* os = out->__getOs();
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_exception.get())
+ {
+ //
+ // If the exception is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
+ throw LocalExceptionWrapper(*_exception.get(), true);
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ //
+ // Ensure the message isn't bigger than what we can send with the
+ // transport.
+ //
+ _transceiver->checkSendSize(*os, _instance->messageSizeMax());
+
+ Int requestId = 0;
+ if(response)
+ {
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+ }
+
+ AsyncStatus status;
+ try
+ {
+ OutgoingMessage message(out, os, compress, requestId);
+ status = sendMessage(message);
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ if(response)
+ {
+ //
+ // Add to the async requests map.
+ //
+ _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
+ pair<const Int, OutgoingAsyncPtr>(requestId, out));
+ }
+ return status;
+}
+
+void
+Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Wait if flushing is currently in progress.
+ //
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+
+ if(_exception.get())
+ {
+ //
+ // If there were no batch requests queued when the connection failed, we can safely
+ // retry with a new connection. Otherwise, we must throw to notify the caller that
+ // some previous batch requests were not sent.
+ //
+ if(_batchStream.b.empty())
+ {
+ throw LocalExceptionWrapper(*_exception.get(), true);
+ }
+ else
+ {
+ _exception->ice_throw();
+ }
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ if(_batchStream.b.empty())
+ {
+ try
+ {
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.ice_throw();
+ }
+ }
+
+ _batchStreamInUse = true;
+ _batchMarker = _batchStream.b.size();
+ _batchStream.swap(*os);
+
+ //
+ // The batch stream now belongs to the caller, until
+ // finishBatchRequest() or abortBatchRequest() is called.
+ //
+}
+
+void
+Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
+{
+ try
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Get the batch stream back.
+ //
+ _batchStream.swap(*os);
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ bool flush = false;
+ if(_batchAutoFlush)
+ {
+ //
+ // Throw memory limit exception if the first message added causes us to
+ // go over limit. Otherwise put aside the marshalled message that caused
+ // limit to be exceeded and rollback stream to the marker.
+ //
+ try
+ {
+ _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax());
+ }
+ catch(const Ice::Exception&)
+ {
+ if(_batchRequestNum > 0)
+ {
+ flush = true;
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+
+ if(flush)
+ {
+ //
+ // Temporarily save the last request.
+ //
+ vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
+ _batchStream.b.resize(_batchMarker);
+
+ //
+ // Send the batch stream without the last request.
+ //
+ try
+ {
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+
+ OutgoingMessage message(&_batchStream, _batchRequestCompress);
+ sendMessage(message);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // Reset the batch.
+ //
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+
+ //
+ // Check again if the last request doesn't exceed what we can send with the auto flush
+ //
+ if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax())
+ {
+ Ex::throwMemoryLimitException(__FILE__, __LINE__, sizeof(requestBatchHdr) + lastRequest.size(),
+ _instance->messageSizeMax());
+ }
+
+ //
+ // Start a new batch with the last message that caused us to go over the limit.
+ //
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
+ }
+
+ //
+ // Increment the number of requests in the batch.
+ //
+ ++_batchRequestNum;
+
+ //
+ // We compress the whole batch if there is at least one compressed
+ // message.
+ //
+ if(compress)
+ {
+ _batchRequestCompress = true;
+ }
+
+ //
+ // Notify about the batch stream not being in use anymore.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ catch(const Ice::LocalException&)
+ {
+ abortBatchRequest();
+ throw;
+ }
+}
+
+void
+Ice::ConnectionI::abortBatchRequest()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+}
+
+void
+Ice::ConnectionI::flushBatchRequests()
+{
+ BatchOutgoing out(this, _instance.get());
+ out.invoke();
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_flushBatchRequests()
+{
+ return begin_flushBatchRequestsInternal(__dummyCallback, 0);
+}
+
+static const ::std::string __flushBatchRequests_name = "flushBatchRequests";
+
+AsyncResultPtr
+Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+{
+ return begin_flushBatchRequestsInternal(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr& cb,
+ const LocalObjectPtr& cookie)
+{
+ return begin_flushBatchRequestsInternal(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_flushBatchRequestsInternal(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+{
+ ConnectionBatchOutgoingAsyncPtr result =
+ new ConnectionBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb, cookie);
+ try
+ {
+ result->__send();
+ }
+ catch(const LocalException& __ex)
+ {
+ result->__exceptionAsync(__ex);
+ }
+ return result;
+}
+
+void
+Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
+{
+ AsyncResult::__check(r, this, __flushBatchRequests_name);
+ r->__wait();
+}
+
+bool
+Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ if(_batchRequestNum == 0)
+ {
+ out->sent(false);
+ return true;
+ }
+
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+ _batchStream.swap(*out->os());
+
+ //
+ // Send the batch stream.
+ //
+ bool sent = false;
+ try
+ {
+ OutgoingMessage message(out, out->os(), _batchRequestCompress, 0);
+ sent = sendMessage(message) & AsyncStatusSent;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // Reset the batch stream.
+ //
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+ return sent;
+}
+
+AsyncStatus
+Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ if(_batchRequestNum == 0)
+ {
+ AsyncStatus status = AsyncStatusSent;
+ if(outAsync->__sent(this))
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ return status;
+ }
+
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+ _batchStream.swap(*outAsync->__getOs());
+
+ //
+ // Send the batch stream.
+ //
+ AsyncStatus status;
+ try
+ {
+ OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0);
+ status = sendMessage(message);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ //
+ // Reset the batch stream.
+ //
+ BasicStream dummy(_instance.get(), _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+ return status;
+}
+
+void
+Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
+ notifyAll();
+ }
+
+ if(_state >= StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ OutgoingMessage message(os, compressFlag > 0);
+ sendMessage(message);
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+
+ return;
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+}
+
+void
+Ice::ConnectionI::sendNoResponse()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
+ notifyAll();
+ }
+
+ if(_state >= StateClosed)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+}
+
+EndpointIPtr
+Ice::ConnectionI::endpoint() const
+{
+ return _endpoint; // No mutex protection necessary, _endpoint is immutable.
+}
+
+ConnectorPtr
+Ice::ConnectionI::connector() const
+{
+ return _connector; // No mutex protection necessary, _connector is immutable.
+}
+
+void
+Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(_state <= StateNotValidated || _state >= StateClosing)
+ {
+ return;
+ }
+
+ _adapter = adapter;
+
+ if(_adapter)
+ {
+ _servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager();
+ if(!_servantManager)
+ {
+ _adapter = 0;
+ }
+ }
+ else
+ {
+ _servantManager = 0;
+ }
+
+ //
+ // We never change the thread pool with which we were initially
+ // registered, even if we add or remove an object adapter.
+ //
+}
+
+ObjectAdapterPtr
+Ice::ConnectionI::getAdapter() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ return _adapter;
+}
+
+EndpointPtr
+Ice::ConnectionI::getEndpoint() const
+{
+ return _endpoint; // No mutex protection necessary, _endpoint is immutable.
+}
+
+ObjectPrx
+Ice::ConnectionI::createProxy(const Identity& ident) const
+{
+ //
+ // Create a reference and return a reverse proxy for this
+ // reference.
+ //
+ ConnectionIPtr self = const_cast<ConnectionI*>(this);
+ return _instance->proxyFactory()->referenceToProxy(_instance->referenceFactory()->create(ident, self));
+}
+
+#ifdef ICE_USE_IOCP
+bool
+Ice::ConnectionI::startAsync(SocketOperation operation)
+{
+ if(_state >= StateClosed)
+ {
+ return false;
+ }
+
+ try
+ {
+ if(operation & SocketOperationWrite)
+ {
+ if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
+ {
+ // The whole message is written, assume it's sent now for at-most-once semantics.
+ _sendStreams.front().isSent = true;
+ }
+ }
+ else if(operation & SocketOperationRead)
+ {
+ _transceiver->startRead(_readStream);
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ return false;
+ }
+ return true;
+}
+
+bool
+Ice::ConnectionI::finishAsync(SocketOperation operation)
+{
+ try
+ {
+ if(operation & SocketOperationWrite)
+ {
+ _transceiver->finishWrite(_writeStream);
+ }
+ else if(operation & SocketOperationRead)
+ {
+ _transceiver->finishRead(_readStream);
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+ return _state < StateClosed;
+}
+#endif
+
+void
+Ice::ConnectionI::message(ThreadPoolCurrent& current)
+{
+ StartCallbackPtr startCB;
+ vector<OutgoingAsyncMessageCallbackPtr> sentCBs;
+ Byte compress = 0;
+ Int requestId = 0;
+ Int invokeNum = 0;
+ ServantManagerPtr servantManager;
+ ObjectAdapterPtr adapter;
+ OutgoingAsyncPtr outAsync;
+
+ ThreadPoolMessage<ConnectionI> msg(current, *this);
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ ThreadPoolMessage<ConnectionI>::IOScope io(msg);
+ if(!io)
+ {
+ return;
+ }
+
+ if(_state >= StateClosed)
+ {
+ return;
+ }
+
+ try
+ {
+ unscheduleTimeout(current.operation);
+ if(current.operation & SocketOperationWrite && !_writeStream.b.empty())
+ {
+ if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
+ {
+ assert(!_writeStream.b.empty());
+ scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
+ return;
+ }
+ assert(_writeStream.i == _writeStream.b.end());
+ }
+ if(current.operation & SocketOperationRead && !_readStream.b.empty())
+ {
+ if(_readHeader) // Read header if necessary.
+ {
+ if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
+ {
+ return;
+ }
+ assert(_readStream.i == _readStream.b.end());
+ _readHeader = false;
+
+ ptrdiff_t pos = _readStream.i - _readStream.b.begin();
+ if(pos < headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+
+ _readStream.i = _readStream.b.begin();
+ const Byte* m;
+ _readStream.readBlob(m, static_cast<Int>(sizeof(magic)));
+ if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
+ throw ex;
+ }
+ Byte pMajor;
+ Byte pMinor;
+ _readStream.read(pMajor);
+ _readStream.read(pMinor);
+ if(pMajor != protocolMajor
+ || static_cast<unsigned char>(pMinor) > static_cast<unsigned char>(protocolMinor))
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ _readStream.read(eMajor);
+ _readStream.read(eMinor);
+ if(eMajor != encodingMajor
+ || static_cast<unsigned char>(eMinor) > static_cast<unsigned char>(encodingMinor))
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ _readStream.read(messageType);
+ Byte compress;
+ _readStream.read(compress);
+ Int size;
+ _readStream.read(size);
+ if(size < headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(_instance->messageSizeMax()))
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(_readStream.b.size()))
+ {
+ _readStream.b.resize(size);
+ }
+ _readStream.i = _readStream.b.begin() + pos;
+ }
+
+ if(_readStream.i != _readStream.b.end())
+ {
+ if(_endpoint->datagram())
+ {
+ throw DatagramLimitException(__FILE__, __LINE__); // The message was truncated.
+ }
+ else
+ {
+ if(!_transceiver->read(_readStream))
+ {
+ assert(!_readStream.b.empty());
+ scheduleTimeout(SocketOperationRead, _endpoint->timeout());
+ return;
+ }
+ assert(_readStream.i == _readStream.b.end());
+ }
+ }
+ }
+
+ if(_state <= StateNotValidated)
+ {
+ if(_state == StateNotInitialized && !initialize(current.operation))
+ {
+ return;
+ }
+
+ if(_state <= StateNotValidated && !validate(current.operation))
+ {
+ return;
+ }
+
+ _threadPool->unregister(this, current.operation);
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ swap(_startCallback, startCB);
+ }
+ else
+ {
+ assert(_state <= StateClosing);
+
+ if(current.operation & SocketOperationWrite)
+ {
+ sendNextMessage(sentCBs);
+ }
+
+ if(current.operation & SocketOperationRead)
+ {
+ parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
+ }
+
+ //
+ // We increment the dispatch count to prevent the
+ // communicator destruction during the callback.
+ //
+ if(!sentCBs.empty() || outAsync)
+ {
+ ++_dispatchCount;
+ }
+ }
+ }
+ catch(const DatagramLimitException&) // Expected.
+ {
+ if(_warnUdp)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "maximum datagram size of " << _readStream.i - _readStream.b.begin() << " exceeded";
+ }
+ _readStream.resize(headerSize);
+ _readStream.i = _readStream.b.begin();
+ _readHeader = true;
+ return;
+ }
+ catch(const SocketException& ex)
+ {
+ setState(StateClosed, ex);
+ return;
+ }
+ catch(const LocalException& ex)
+ {
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "datagram connection exception:\n" << ex << '\n' << _desc;
+ }
+ _readStream.resize(headerSize);
+ _readStream.i = _readStream.b.begin();
+ _readHeader = true;
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ return;
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+
+ io.completed();
+ }
+
+ if(_dispatcher)
+ {
+ try
+ {
+ _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum,
+ servantManager, adapter, outAsync, current.stream), this);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ }
+ }
+ else
+ {
+ dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream);
+ }
+}
+
+void
+ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs,
+ Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
+ const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream)
+{
+ //
+ // Notify the factory that the connection establishment and
+ // validation has completed.
+ //
+ if(startCB)
+ {
+ startCB->connectionStartCompleted(this);
+ }
+
+ //
+ // Notify AMI calls that the message was sent.
+ //
+ for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
+ {
+ (*p)->__sent();
+ }
+
+ //
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(outAsync)
+ {
+ outAsync->__finished(stream);
+ }
+
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ if(invokeNum)
+ {
+ invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
+ }
+
+ //
+ // Decrease dispatch count.
+ //
+ if(!sentCBs.empty() || outAsync)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(--_dispatchCount == 0)
+ {
+ if(_state == StateClosing)
+ {
+ try
+ {
+ initiateShutdown();
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+ else if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
+ notifyAll();
+ }
+ }
+}
+
+void
+Ice::ConnectionI::finished(ThreadPoolCurrent& current)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_state == StateClosed);
+ unscheduleTimeout(static_cast<SocketOperation>(SocketOperationRead | SocketOperationWrite));
+ }
+
+ //
+ // If there are no callbacks to call, we don't call ioCompleted() since we're not going
+ // to call code that will potentially block (this avoids promoting a new leader and
+ // unecessary thread creation, especially if this is called on shutdown).
+ //
+ if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty())
+ {
+ finish();
+ return;
+ }
+
+ if(!_dispatcher)
+ {
+ current.ioCompleted();
+ finish();
+ }
+ else
+ {
+ try
+ {
+ _dispatcher->dispatch(new FinishDispatcherCall(this), this);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ }
+ }
+}
+
+void
+Ice::ConnectionI::finish()
+{
+ if(_startCallback)
+ {
+ _startCallback->connectionStartFailed(this, *_exception.get());
+ _startCallback = 0;
+ }
+
+ if(!_sendStreams.empty())
+ {
+ assert(!_writeStream.b.empty());
+
+ //
+ // Return the stream to the outgoing call. This is important for
+ // retriable AMI calls which are not marshalled again.
+ //
+ OutgoingMessage* message = &_sendStreams.front();
+ _writeStream.swap(*message->stream);
+
+#ifdef ICE_USE_IOCP
+ //
+ // The current message might be sent but not yet removed from _sendStreams. If
+ // the response has been received in the meantime, we remove the message from
+ // _sendStreams to not call finished on a message which is already done.
+ //
+ if(message->requestId > 0 &&
+ (message->out && _requests.find(message->requestId) == _requests.end() ||
+ message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end()))
+ {
+ if(message->sent(this, true))
+ {
+ assert(message->outAsync);
+ message->outAsync->__sent();
+ }
+ _sendStreams.pop_front();
+ }
+#endif
+
+ for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
+ {
+ o->finished(*_exception.get());
+ if(o->requestId) // Make sure finished isn't called twice.
+ {
+ if(o->out)
+ {
+ _requests.erase(o->requestId);
+ }
+ else
+ {
+ _asyncRequests.erase(o->requestId);
+ }
+ }
+ }
+ _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
+ }
+
+ for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get(), true);
+ }
+ _requests.clear();
+
+ for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ {
+ q->second->__finished(*_exception.get(), true);
+ }
+ _asyncRequests.clear();
+
+ //
+ // This must be done last as this will cause waitUntilFinished() to return (and communicator
+ // objects such as the timer might be destroyed too).
+ //
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateFinished);
+ if(_dispatchCount == 0)
+ {
+ _reaper->add(this);
+ }
+ }
+}
+
+string
+Ice::ConnectionI::toString() const
+{
+ return _desc; // No mutex lock, _desc is immutable.
+}
+
+NativeInfoPtr
+Ice::ConnectionI::getNativeInfo()
+{
+ return _transceiver->getNativeInfo();
+}
+
+void
+Ice::ConnectionI::timedOut()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state <= StateNotValidated)
+ {
+ setState(StateClosed, ConnectTimeoutException(__FILE__, __LINE__));
+ }
+ else if(_state < StateClosing)
+ {
+ setState(StateClosed, TimeoutException(__FILE__, __LINE__));
+ }
+ else if(_state == StateClosing)
+ {
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+}
+
+string
+Ice::ConnectionI::type() const
+{
+ return _type; // No mutex lock, _type is immutable.
+}
+
+Ice::Int
+Ice::ConnectionI::timeout() const
+{
+ return _endpoint->timeout(); // No mutex lock, _endpoint is immutable.
+}
+
+ConnectionInfoPtr
+Ice::ConnectionI::getInfo() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
+ {
+ _exception->ice_throw();
+ }
+
+ ConnectionInfoPtr info = _transceiver->getInfo();
+ info->incoming = _connector == 0;
+ info->adapterName = _adapter ? _adapter->getName() : string();
+ return info;
+}
+
+void
+Ice::ConnectionI::exception(const LocalException& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+}
+
+void
+Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum)
+{
+ //
+ // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ assert(_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert(_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ _reaper->add(this);
+ }
+ notifyAll();
+ }
+ }
+}
+
+Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
+ const ConnectionReaperPtr& reaper,
+ const TransceiverPtr& transceiver,
+ const ConnectorPtr& connector,
+ const EndpointIPtr& endpoint,
+ const ObjectAdapterPtr& adapter) :
+ _transceiver(transceiver),
+ _instance(instance),
+ _reaper(reaper),
+ _desc(transceiver->toString()),
+ _type(transceiver->type()),
+ _connector(connector),
+ _endpoint(endpoint),
+ _adapter(adapter),
+ _dispatcher(_instance->initializationData().dispatcher), // Cached for better performance.
+ _logger(_instance->initializationData().logger), // Cached for better performance.
+ _traceLevels(_instance->traceLevels()), // Cached for better performance.
+ _timer(_instance->timer()), // Cached for better performance.
+ _writeTimeout(new TimeoutCallback(this)),
+ _writeTimeoutScheduled(false),
+ _readTimeout(new TimeoutCallback(this)),
+ _readTimeoutScheduled(false),
+ _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
+ _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0),
+ _acmTimeout(0),
+ _compressionLevel(1),
+ _nextRequestId(1),
+ _requestsHint(_requests.end()),
+ _asyncRequestsHint(_asyncRequests.end()),
+ _batchAutoFlush(
+ _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0),
+ _batchStream(_instance.get(), _batchAutoFlush),
+ _batchStreamInUse(false),
+ _batchRequestNum(0),
+ _batchRequestCompress(false),
+ _batchMarker(0),
+ _readStream(_instance.get()),
+ _readHeader(false),
+ _writeStream(_instance.get()),
+ _dispatchCount(0),
+ _state(StateNotInitialized)
+{
+
+ int& compressionLevel = const_cast<int&>(_compressionLevel);
+ compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault(
+ "Ice.Compression.Level", 1);
+ if(compressionLevel < 1)
+ {
+ compressionLevel = 1;
+ }
+ else if(compressionLevel > 9)
+ {
+ compressionLevel = 9;
+ }
+
+ ObjectAdapterI* adapterImpl = _adapter ? dynamic_cast<ObjectAdapterI*>(_adapter.get()) : 0;
+ if(adapterImpl)
+ {
+ _servantManager = adapterImpl->getServantManager();
+ }
+
+ Int& acmTimeout = const_cast<Int&>(_acmTimeout);
+ if(_endpoint->datagram())
+ {
+ acmTimeout = 0;
+ }
+ else
+ {
+ if(adapterImpl)
+ {
+ acmTimeout = adapterImpl->getACM();
+ }
+ else
+ {
+ acmTimeout = _instance->clientACM();
+ }
+ }
+
+ __setNoDelete(true);
+ try
+ {
+ if(adapterImpl)
+ {
+ const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool();
+ }
+ else
+ {
+ const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
+ }
+ _threadPool->initialize(this);
+ }
+ catch(const IceUtil::Exception&)
+ {
+ __setNoDelete(false);
+ throw;
+ }
+ __setNoDelete(false);
+}
+
+Ice::ConnectionI::~ConnectionI()
+{
+ assert(!_startCallback);
+ assert(_state == StateFinished);
+ assert(_dispatchCount == 0);
+ assert(_sendStreams.empty());
+ assert(_requests.empty());
+ assert(_asyncRequests.empty());
+}
+
+void
+Ice::ConnectionI::setState(State state, const LocalException& ex)
+{
+ //
+ // If setState() is called with an exception, then only closed and
+ // closing states are permissible.
+ //
+ assert(state >= StateClosing);
+
+ if(_state == state) // Don't switch twice.
+ {
+ return;
+ }
+
+ if(!_exception.get())
+ {
+ //
+ // If we are in closed state, an exception must be set.
+ //
+ assert(_state != StateClosed);
+
+ _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+
+ if(_warn)
+ {
+ //
+ // We don't warn if we are not validated.
+ //
+ if(_state > StateNotValidated)
+ {
+ //
+ // Don't warn about certain expected exceptions.
+ //
+ if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
+ dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
+ (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
+ {
+ Warning out(_logger);
+ out << "connection exception:\n" << *_exception.get() << '\n' << _desc;
+ }
+ }
+ }
+ }
+
+ //
+ // We must set the new state before we notify requests of any
+ // exceptions. Otherwise new requests may retry on a connection
+ // that is not yet marked as closed or closing.
+ //
+ setState(state);
+}
+
+void
+Ice::ConnectionI::setState(State state)
+{
+ //
+ // We don't want to send close connection messages if the endpoint
+ // only supports oneway transmission from client to server.
+ //
+ if(_endpoint->datagram() && state == StateClosing)
+ {
+ state = StateClosed;
+ }
+
+ //
+ // Skip graceful shutdown if we are destroyed before validation.
+ //
+ if(_state <= StateNotValidated && state == StateClosing)
+ {
+ state = StateClosed;
+ }
+
+ if(_state == state) // Don't switch twice.
+ {
+ return;
+ }
+
+ try
+ {
+ switch(state)
+ {
+ case StateNotInitialized:
+ {
+ assert(false);
+ break;
+ }
+
+ case StateNotValidated:
+ {
+ if(_state != StateNotInitialized)
+ {
+ assert(_state == StateClosed);
+ return;
+ }
+ break;
+ }
+
+ case StateActive:
+ {
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
+ }
+ _threadPool->_register(this, SocketOperationRead);
+ break;
+ }
+
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return;
+ }
+ if(_state == StateActive)
+ {
+ _threadPool->unregister(this, SocketOperationRead);
+ }
+ break;
+ }
+
+ case StateClosing:
+ {
+ //
+ // Can't change back from closed.
+ //
+ if(_state >= StateClosed)
+ {
+ return;
+ }
+ if(_state == StateHolding)
+ {
+ _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state.
+ }
+ break;
+ }
+
+ case StateClosed:
+ {
+ if(_state == StateFinished)
+ {
+ return;
+ }
+ _threadPool->finish(this);
+#ifdef ICE_USE_IOCP
+ _transceiver->close();
+#endif
+ break;
+ }
+
+ case StateFinished:
+ {
+ assert(_state == StateClosed);
+#ifndef ICE_USE_IOCP
+ _transceiver->close();
+#endif
+ break;
+ }
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Error out(_logger);
+ out << "unexpected connection exception:\n" << ex << '\n' << _desc;
+ }
+
+ //
+ // We only register with the connection monitor if our new state
+ // is StateActive. Otherwise we unregister with the connection
+ // monitor, but only if we were registered before, i.e., if our
+ // old state was StateActive.
+ //
+ if(_acmTimeout > 0)
+ {
+ if(state == StateActive)
+ {
+ _instance->connectionMonitor()->add(this);
+ }
+ else if(_state == StateActive)
+ {
+ _instance->connectionMonitor()->remove(this);
+ }
+ }
+
+ _state = state;
+
+ notifyAll();
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ try
+ {
+ initiateShutdown();
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+}
+
+void
+Ice::ConnectionI::initiateShutdown()
+{
+ assert(_state == StateClosing);
+ assert(_dispatchCount == 0);
+
+ if(!_endpoint->datagram())
+ {
+ //
+ // Before we shut down, we send a close connection message.
+ //
+ BasicStream os(_instance.get());
+ os.write(magic[0]);
+ os.write(magic[1]);
+ os.write(magic[2]);
+ os.write(magic[3]);
+ os.write(protocolMajor);
+ os.write(protocolMinor);
+ os.write(encodingMajor);
+ os.write(encodingMinor);
+ os.write(closeConnectionMsg);
+ os.write((Byte)1); // compression status: compression supported but not used.
+ os.write(headerSize); // Message size.
+
+ OutgoingMessage message(&os, false);
+ if(sendMessage(message) & AsyncStatusSent)
+ {
+ //
+ // Schedule the close timeout to wait for the peer to close the connection. If
+ // the message was queued for sending, sendNextMessage will schedule the timeout
+ // once all messages were sent.
+ //
+ scheduleTimeout(SocketOperationWrite, closeTimeout());
+ }
+
+ //
+ // The CloseConnection message should be sufficient. Closing the write
+ // end of the socket is probably an artifact of how things were done
+ // in IIOP. In fact, shutting down the write end of the socket causes
+ // problems on Windows by preventing the peer from using the socket.
+ // For example, the peer is no longer able to continue writing a large
+ // message after the socket is shutdown.
+ //
+ //_transceiver->shutdownWrite();
+ }
+}
+
+bool
+Ice::ConnectionI::initialize(SocketOperation operation)
+{
+ SocketOperation s = _transceiver->initialize();
+ if(s != SocketOperationNone)
+ {
+ scheduleTimeout(s, connectTimeout());
+ _threadPool->update(this, operation, s);
+ return false;
+ }
+
+ //
+ // Update the connection description once the transceiver is initialized.
+ //
+ const_cast<string&>(_desc) = _transceiver->toString();
+ setState(StateNotValidated);
+ return true;
+}
+
+bool
+Ice::ConnectionI::validate(SocketOperation operation)
+{
+ if(!_endpoint->datagram()) // Datagram connections are always implicitly validated.
+ {
+ if(_adapter) // The server side has the active role for connection validation.
+ {
+ if(_writeStream.b.empty())
+ {
+ _writeStream.write(magic[0]);
+ _writeStream.write(magic[1]);
+ _writeStream.write(magic[2]);
+ _writeStream.write(magic[3]);
+ _writeStream.write(protocolMajor);
+ _writeStream.write(protocolMinor);
+ _writeStream.write(encodingMajor);
+ _writeStream.write(encodingMinor);
+ _writeStream.write(validateConnectionMsg);
+ _writeStream.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ _writeStream.write(headerSize); // Message size.
+ _writeStream.i = _writeStream.b.begin();
+ traceSend(_writeStream, _logger, _traceLevels);
+ }
+
+ if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
+ {
+ scheduleTimeout(SocketOperationWrite, connectTimeout());
+ _threadPool->update(this, operation, SocketOperationWrite);
+ return false;
+ }
+ }
+ else // The client side has the passive role for connection validation.
+ {
+ if(_readStream.b.empty())
+ {
+ _readStream.b.resize(headerSize);
+ _readStream.i = _readStream.b.begin();
+ }
+
+ if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
+ {
+ scheduleTimeout(SocketOperationRead, connectTimeout());
+ _threadPool->update(this, operation, SocketOperationRead);
+ return false;
+ }
+
+ assert(_readStream.i == _readStream.b.end());
+ _readStream.i = _readStream.b.begin();
+ Byte m[4];
+ _readStream.read(m[0]);
+ _readStream.read(m[1]);
+ _readStream.read(m[2]);
+ _readStream.read(m[3]);
+ if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
+ throw ex;
+ }
+ Byte pMajor;
+ Byte pMinor;
+ _readStream.read(pMajor);
+ _readStream.read(pMinor);
+ if(pMajor != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ _readStream.read(eMajor);
+ _readStream.read(eMinor);
+ if(eMajor != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ _readStream.read(messageType);
+ if(messageType != validateConnectionMsg)
+ {
+ throw ConnectionNotValidatedException(__FILE__, __LINE__);
+ }
+ Byte compress;
+ _readStream.read(compress); // Ignore compression status for validate connection.
+ Int size;
+ _readStream.read(size);
+ if(size != headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ traceRecv(_readStream, _logger, _traceLevels);
+ }
+ }
+
+ _writeStream.resize(0);
+ _writeStream.i = _writeStream.b.begin();
+
+ _readStream.resize(headerSize);
+ _readStream.i = _readStream.b.begin();
+ _readHeader = true;
+
+ return true;
+}
+
+void
+Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callbacks)
+{
+ assert(!_sendStreams.empty());
+ assert(!_writeStream.b.empty() && _writeStream.i == _writeStream.b.end());
+ try
+ {
+ while(true)
+ {
+ //
+ // Notify the message that it was sent.
+ //
+ OutgoingMessage* message = &_sendStreams.front();
+ _writeStream.swap(*message->stream);
+ if(message->sent(this, true))
+ {
+ assert(message->outAsync);
+ callbacks.push_back(message->outAsync);
+ }
+ _sendStreams.pop_front();
+
+ //
+ // If there's nothing left to send, we're done.
+ //
+ if(_sendStreams.empty())
+ {
+ break;
+ }
+
+ //
+ // Otherwise, prepare the next message stream for writing.
+ //
+ message = &_sendStreams.front();
+ assert(!message->stream->i);
+ if(message->compress && message->stream->b.size() >= 100) // Only compress messages > 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ message->stream->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream stream(_instance.get());
+ doCompress(*message->stream, stream);
+
+ if(message->outAsync)
+ {
+ trace("sending asynchronous request", *message->stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message->stream, _logger, _traceLevels);
+ }
+
+ message->adopt(&stream); // Adopt the compressed stream.
+ message->stream->i = message->stream->b.begin();
+ }
+ else
+ {
+ if(message->compress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ message->stream->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(message->stream->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), message->stream->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), message->stream->b.begin() + 10);
+#endif
+ message->stream->i = message->stream->b.begin();
+ if(message->outAsync)
+ {
+ trace("sending asynchronous request", *message->stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message->stream, _logger, _traceLevels);
+ }
+ }
+ _writeStream.swap(*message->stream);
+
+ //
+ // Send the message.
+ //
+ assert(_writeStream.i);
+ if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
+ {
+ assert(!_writeStream.b.empty());
+ scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
+ return;
+ }
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ return;
+ }
+
+ assert(_writeStream.b.empty());
+ _threadPool->unregister(this, SocketOperationWrite);
+
+ //
+ // If all the messages were sent and we are in the closing state, we schedule
+ // the close timeout to wait for the peer to close the connection.
+ //
+ if(_state == StateClosing)
+ {
+ scheduleTimeout(SocketOperationWrite, closeTimeout());
+ }
+}
+
+AsyncStatus
+Ice::ConnectionI::sendMessage(OutgoingMessage& message)
+{
+ assert(_state < StateClosed);
+
+ message.stream->i = 0; // Reset the message stream iterator before starting sending the message.
+
+ if(!_sendStreams.empty())
+ {
+ _sendStreams.push_back(message);
+ _sendStreams.back().adopt(0);
+ return AsyncStatusQueued;
+ }
+
+ //
+ // Attempt to send the message without blocking. If the send blocks, we register
+ // the connection with the selector thread.
+ //
+
+ message.stream->i = message.stream->b.begin();
+
+ if(message.compress && message.stream->b.size() >= 100) // Only compress messages larger than 100 bytes.
+ {
+ //
+ // Message compressed. Request compressed response, if any.
+ //
+ message.stream->b[9] = 2;
+
+ //
+ // Do compression.
+ //
+ BasicStream stream(_instance.get());
+ doCompress(*message.stream, stream);
+ stream.i = stream.b.begin();
+
+ if(message.outAsync)
+ {
+ trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message.stream, _logger, _traceLevels);
+ }
+
+ //
+ // Send the message without blocking.
+ //
+ if(_transceiver->write(stream))
+ {
+ AsyncStatus status = AsyncStatusSent;
+ if(message.sent(this, false))
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout =
+ IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+ return status;
+ }
+
+ _sendStreams.push_back(message);
+ _sendStreams.back().adopt(&stream);
+ }
+ else
+ {
+ if(message.compress)
+ {
+ //
+ // Message not compressed. Request compressed response, if any.
+ //
+ message.stream->b[9] = 1;
+ }
+
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(message.stream->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), message.stream->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), message.stream->b.begin() + 10);
+#endif
+ message.stream->i = message.stream->b.begin();
+
+ if(message.outAsync)
+ {
+ trace("sending asynchronous request", *message.stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceSend(*message.stream, _logger, _traceLevels);
+ }
+
+ //
+ // Send the message without blocking.
+ //
+ if(_transceiver->write(*message.stream))
+ {
+ AsyncStatus status = AsyncStatusSent;
+ if(message.sent(this, false))
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout =
+ IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
+ }
+ return status;
+ }
+
+ _sendStreams.push_back(message);
+ _sendStreams.back().adopt(0); // Adopt the stream.
+ }
+
+ _writeStream.swap(*_sendStreams.back().stream);
+ scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
+ _threadPool->_register(this, SocketOperationWrite);
+ return AsyncStatusQueued;
+}
+
+static string
+getBZ2Error(int bzError)
+{
+ if(bzError == BZ_RUN_OK)
+ {
+ return ": BZ_RUN_OK";
+ }
+ else if(bzError == BZ_FLUSH_OK)
+ {
+ return ": BZ_FLUSH_OK";
+ }
+ else if(bzError == BZ_FINISH_OK)
+ {
+ return ": BZ_FINISH_OK";
+ }
+ else if(bzError == BZ_STREAM_END)
+ {
+ return ": BZ_STREAM_END";
+ }
+ else if(bzError == BZ_CONFIG_ERROR)
+ {
+ return ": BZ_CONFIG_ERROR";
+ }
+ else if(bzError == BZ_SEQUENCE_ERROR)
+ {
+ return ": BZ_SEQUENCE_ERROR";
+ }
+ else if(bzError == BZ_PARAM_ERROR)
+ {
+ return ": BZ_PARAM_ERROR";
+ }
+ else if(bzError == BZ_MEM_ERROR)
+ {
+ return ": BZ_MEM_ERROR";
+ }
+ else if(bzError == BZ_DATA_ERROR)
+ {
+ return ": BZ_DATA_ERROR";
+ }
+ else if(bzError == BZ_DATA_ERROR_MAGIC)
+ {
+ return ": BZ_DATA_ERROR_MAGIC";
+ }
+ else if(bzError == BZ_IO_ERROR)
+ {
+ return ": BZ_IO_ERROR";
+ }
+ else if(bzError == BZ_UNEXPECTED_EOF)
+ {
+ return ": BZ_UNEXPECTED_EOF";
+ }
+ else if(bzError == BZ_OUTBUFF_FULL)
+ {
+ return ": BZ_OUTBUFF_FULL";
+ }
+ else
+ {
+ return "";
+ }
+}
+
+void
+Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed)
+{
+ const Byte* p;
+
+ //
+ // Compress the message body, but not the header.
+ //
+ unsigned int uncompressedLen = static_cast<unsigned int>(uncompressed.b.size() - headerSize);
+ unsigned int compressedLen = static_cast<unsigned int>(uncompressedLen * 1.01 + 600);
+ compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
+ int bzError = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int),
+ &compressedLen,
+ reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize,
+ uncompressedLen,
+ _compressionLevel, 0, 0);
+ if(bzError != BZ_OK)
+ {
+ CompressionException ex(__FILE__, __LINE__);
+ ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError);
+ throw ex;
+ }
+ compressed.b.resize(headerSize + sizeof(Int) + compressedLen);
+
+ //
+ // Write the size of the compressed stream into the header of the
+ // uncompressed stream. Since the header will be copied, this size
+ // will also be in the header of the compressed stream.
+ //
+ Int compressedSize = static_cast<Int>(compressed.b.size());
+ p = reinterpret_cast<const Byte*>(&compressedSize);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), uncompressed.b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), uncompressed.b.begin() + 10);
+#endif
+
+ //
+ // Add the size of the uncompressed stream before the message body
+ // of the compressed stream.
+ //
+ Int uncompressedSize = static_cast<Int>(uncompressed.b.size());
+ p = reinterpret_cast<const Byte*>(&uncompressedSize);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), compressed.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), compressed.b.begin() + headerSize);
+#endif
+
+ //
+ // Copy the header from the uncompressed stream to the compressed one.
+ //
+ copy(uncompressed.b.begin(), uncompressed.b.begin() + headerSize, compressed.b.begin());
+}
+
+void
+Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompressed)
+{
+ Int uncompressedSize;
+ compressed.i = compressed.b.begin() + headerSize;
+ compressed.read(uncompressedSize);
+ if(uncompressedSize <= headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+
+ uncompressed.resize(uncompressedSize);
+ unsigned int uncompressedLen = uncompressedSize - headerSize;
+ unsigned int compressedLen = static_cast<unsigned int>(compressed.b.size() - headerSize - sizeof(Int));
+ int bzError = BZ2_bzBuffToBuffDecompress(reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize,
+ &uncompressedLen,
+ reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int),
+ compressedLen,
+ 0, 0);
+ if(bzError != BZ_OK)
+ {
+ CompressionException ex(__FILE__, __LINE__);
+ ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError);
+ throw ex;
+ }
+
+ copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin());
+}
+
+void
+Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
+ ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
+ OutgoingAsyncPtr& outAsync)
+{
+ assert(_state > StateNotValidated && _state < StateClosed);
+
+ _readStream.swap(stream);
+ _readStream.resize(headerSize);
+ _readStream.i = _readStream.b.begin();
+ _readHeader = true;
+
+ assert(stream.i == stream.b.end());
+
+ try
+ {
+ //
+ // We don't need to check magic and version here. This has
+ // already been done by the ThreadPool, which provides us
+ // with the stream.
+ //
+ assert(stream.i == stream.b.end());
+ stream.i = stream.b.begin() + 8;
+ Byte messageType;
+ stream.read(messageType);
+ stream.read(compress);
+ if(compress == 2)
+ {
+ BasicStream ustream(_instance.get());
+ doUncompress(stream, ustream);
+ stream.b.swap(ustream.b);
+ }
+ stream.i = stream.b.begin() + headerSize;
+
+ switch(messageType)
+ {
+ case closeConnectionMsg:
+ {
+ traceRecv(stream, _logger, _traceLevels);
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring close connection message for datagram connection:\n" << _desc;
+ }
+ }
+ else
+ {
+ setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ }
+ break;
+ }
+
+ case requestMsg:
+ {
+ if(_state == StateClosing)
+ {
+ trace("received request during closing\n(ignored by server, client will retry)", stream, _logger,
+ _traceLevels);
+ }
+ else
+ {
+ traceRecv(stream, _logger, _traceLevels);
+ stream.read(requestId);
+ invokeNum = 1;
+ servantManager = _servantManager;
+ adapter = _adapter;
+ ++_dispatchCount;
+ }
+ break;
+ }
+
+ case requestBatchMsg:
+ {
+ if(_state == StateClosing)
+ {
+ trace("received batch request during closing\n(ignored by server, client will retry)", stream,
+ _logger, _traceLevels);
+ }
+ else
+ {
+ traceRecv(stream, _logger, _traceLevels);
+ stream.read(invokeNum);
+ if(invokeNum < 0)
+ {
+ invokeNum = 0;
+ throw UnmarshalOutOfBoundsException(__FILE__, __LINE__);
+ }
+ servantManager = _servantManager;
+ adapter = _adapter;
+ _dispatchCount += invokeNum;
+ }
+ break;
+ }
+
+ case replyMsg:
+ {
+ traceRecv(stream, _logger, _traceLevels);
+
+ stream.read(requestId);
+
+ map<Int, Outgoing*>::iterator p = _requests.end();
+ map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end();
+
+ if(_requestsHint != _requests.end())
+ {
+ if(_requestsHint->first == requestId)
+ {
+ p = _requestsHint;
+ }
+ }
+
+ if(p == _requests.end())
+ {
+ if(_asyncRequestsHint != _asyncRequests.end())
+ {
+ if(_asyncRequestsHint->first == requestId)
+ {
+ q = _asyncRequestsHint;
+ }
+ }
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ p = _requests.find(requestId);
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ q = _asyncRequests.find(requestId);
+ }
+
+ if(p == _requests.end() && q == _asyncRequests.end())
+ {
+ throw UnknownRequestIdException(__FILE__, __LINE__);
+ }
+
+ if(p != _requests.end())
+ {
+ p->second->finished(stream);
+
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+ }
+ else
+ {
+ assert(q != _asyncRequests.end());
+
+ outAsync = q->second;
+
+ if(q == _asyncRequestsHint)
+ {
+ _asyncRequests.erase(q++);
+ _asyncRequestsHint = q;
+ }
+ else
+ {
+ _asyncRequests.erase(q);
+ }
+ }
+ notifyAll(); // Notify threads blocked in close(false)
+ break;
+ }
+
+ case validateConnectionMsg:
+ {
+ traceRecv(stream, _logger, _traceLevels);
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring unexpected validate connection message:\n" << _desc;
+ }
+ break;
+ }
+
+ default:
+ {
+ trace("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels);
+ throw UnknownMessageException(__FILE__, __LINE__);
+ break;
+ }
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "datagram connection exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ }
+}
+
+void
+Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, Byte compress,
+ const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter)
+{
+ //
+ // Note: In contrast to other private or protected methods, this
+ // operation must be called *without* the mutex locked.
+ //
+
+ try
+ {
+ while(invokeNum > 0)
+ {
+ //
+ // Prepare the invocation.
+ //
+ bool response = !_endpoint->datagram() && requestId != 0;
+ Incoming in(_instance.get(), this, adapter, response, compress, requestId);
+ BasicStream* is = in.is();
+ stream.swap(*is);
+ BasicStream* os = in.os();
+
+ //
+ // Prepare the response if necessary.
+ //
+ if(response)
+ {
+ assert(invokeNum == 1); // No further invocations if a response is expected.
+ os->writeBlob(replyHdr, sizeof(replyHdr));
+
+ //
+ // Add the request ID.
+ //
+ os->write(requestId);
+ }
+
+ in.invoke(servantManager);
+
+ //
+ // If there are more invocations, we need the stream back.
+ //
+ if(--invokeNum > 0)
+ {
+ stream.swap(*is);
+ }
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ invokeException(ex, invokeNum); // Fatal invocation exception
+ }
+}
+
+int
+Ice::ConnectionI::connectTimeout()
+{
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideConnectTimeout)
+ {
+ return defaultsAndOverrides->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ return _endpoint->timeout();
+ }
+}
+
+int
+Ice::ConnectionI::closeTimeout()
+{
+ DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideCloseTimeout)
+ {
+ return defaultsAndOverrides->overrideCloseTimeoutValue;
+ }
+ else
+ {
+ return _endpoint->timeout();
+ }
+}