summaryrefslogtreecommitdiff
path: root/cppe/src/IceE/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-xcppe/src/IceE/Connection.cpp1711
1 files changed, 1711 insertions, 0 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
new file mode 100755
index 00000000000..8984e15da04
--- /dev/null
+++ b/cppe/src/IceE/Connection.cpp
@@ -0,0 +1,1711 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2005 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICEE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceE/Connection.h>
+#include <IceE/Instance.h>
+#include <IceE/LoggerUtil.h>
+#include <IceE/Properties.h>
+#include <IceE/TraceUtil.h>
+#include <IceE/DefaultsAndOverrides.h>
+#include <IceE/Transceiver.h>
+#include <IceE/Endpoint.h>
+#include <IceE/Outgoing.h>
+#include <IceE/Incoming.h>
+#include <IceE/LocalException.h>
+#include <IceE/Protocol.h>
+#include <IceE/ReferenceFactory.h> // For createProxy().
+#include <IceE/ProxyFactory.h> // For createProxy().
+#ifndef ICEE_PURE_CLIENT
+# include <IceE/ObjectAdapter.h>
+#endif
+
+using namespace std;
+using namespace IceE;
+using namespace IceEInternal;
+
+void IceEInternal::incRef(Connection* p) { p->__incRef(); }
+void IceEInternal::decRef(Connection* p) { p->__decRef(); }
+
+bool
+IceE::operator==(const Connection& l, const Connection& r)
+{
+ return &l == &r;
+}
+
+bool
+IceE::operator!=(const Connection& l, const Connection& r)
+{
+ return &l != &r;
+}
+
+bool
+IceE::operator<(const Connection& l, const Connection& r)
+{
+ return &l < &r;
+}
+
+void
+IceE::Connection::waitForValidation()
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ while(_state == StateNotValidated)
+ {
+ wait();
+ }
+
+ if(_state >= StateClosing)
+ {
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+}
+
+#ifndef ICEE_PURE_CLIENT
+void
+IceE::Connection::activate()
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateActive);
+}
+
+void
+IceE::Connection::hold()
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateHolding);
+}
+#endif
+
+void
+IceE::Connection::destroy(DestructionReason reason)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ switch(reason)
+ {
+#ifndef ICEE_PURE_CLIENT
+ case ObjectAdapterDeactivated:
+ {
+ setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__));
+ break;
+ }
+#endif
+
+ case CommunicatorDestroyed:
+ {
+ setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__));
+ break;
+ }
+ }
+}
+
+void
+IceE::Connection::close(bool force)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ if(force)
+ {
+ setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ }
+ else
+ {
+ setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ }
+}
+
+bool
+IceE::Connection::isDestroyed() const
+{
+ //
+ // We can not use trylock here, otherwise the outgoing connection
+ // factory might return destroyed (closing or closed) connections,
+ // resulting in connection retry exhaustion.
+ //
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ return _state >= StateClosing;
+}
+
+bool
+IceE::Connection::isFinished() const
+{
+ IceE::ThreadPtr threadPerConnection;
+
+ {
+ //
+ // We can use trylock here, because as long as there are still
+ // threads operating in this connection object, connection
+ // destruction is considered as not yet finished.
+ //
+ IceE::Monitor<IceE::Mutex>::TryLock sync(*this);
+
+ if(!sync.acquired())
+ {
+ return false;
+ }
+
+ if(_transceiver != 0 || _dispatchCount != 0 ||
+ (_threadPerConnection && _threadPerConnection->getThreadControl().isAlive()))
+ {
+ return false;
+ }
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = 0;
+ }
+
+ threadPerConnection->getThreadControl().join();
+
+ return true;
+}
+
+#ifndef ICEE_PURE_CLIENT
+void
+IceE::Connection::waitUntilHolding() const
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ while(_state < StateHolding || _dispatchCount > 0)
+ {
+ wait();
+ }
+}
+#endif
+
+void
+IceE::Connection::waitUntilFinished()
+{
+ IceE::ThreadPtr threadPerConnection;
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
+ {
+ wait();
+ }
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver)
+ {
+ if(_state != StateClosed && _endpoint->timeout() >= 0)
+ {
+ IceE::Time timeout = IceE::Time::milliSeconds(_endpoint->timeout());
+ IceE::Time waitTime = _stateTime + timeout - IceE::Time::now();
+
+ if(waitTime > IceE::Time())
+ {
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ if(!timedWait(waitTime))
+ {
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+ }
+ else
+ {
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
+ setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ }
+
+ //
+ // No return here, we must still wait until close() is
+ // called on the _transceiver.
+ //
+ }
+ else
+ {
+ wait();
+ }
+ }
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = 0;
+ }
+
+ if(threadPerConnection)
+ {
+ threadPerConnection->getThreadControl().join();
+ }
+}
+
+//
+// TODO: Should not be a member function of Connection.
+//
+void
+IceE::Connection::prepareRequest(BasicStream* os)
+{
+ os->writeBlob(_requestHdr);
+}
+
+void
+IceE::Connection::sendRequest(BasicStream* os, Outgoing* out)
+{
+ Int requestId;
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out)
+ {
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
+#ifdef ICEE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+
+ //
+ // Add to the requests map.
+ //
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ }
+ }
+
+ try
+ {
+ IceE::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ Int sz = static_cast<Int>(os->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICEE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
+#endif
+
+ //
+ // Send the request.
+ //
+ os->i = os->b.begin();
+ traceRequest("sending request", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
+ catch(const LocalException& ex)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+
+ if(out)
+ {
+ //
+ // If the request has already been removed from the
+ // request map, we are out of luck. It would mean that
+ // finished() has been called already, and therefore the
+ // exception has been set using the Outgoing::finished()
+ // callback. In this case, we cannot throw the exception
+ // here, because we must not both raise an exception and
+ // have Outgoing::finished() called with an
+ // exception. This means that in some rare cases, a
+ // request will not be retried even though it could. But I
+ // honestly don't know how I could avoid this, without a
+ // very elaborate and complex design, which would be bad
+ // for performance.
+ //
+ map<Int, Outgoing*>::iterator p = _requests.find(requestId);
+ if(p != _requests.end())
+ {
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+
+ _exception->ice_throw();
+ }
+ }
+ else
+ {
+ _exception->ice_throw();
+ }
+ }
+}
+
+#ifndef ICEE_NO_BATCH
+void
+IceE::Connection::prepareBatchRequest(BasicStream* os)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // Wait if flushing is currently in progress.
+ //
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ if(_batchStream.b.empty())
+ {
+ try
+ {
+ _batchStream.writeBlob(_requestBatchHdr);
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.ice_throw();
+ }
+ }
+
+ _batchStreamInUse = true;
+ _batchStream.swap(*os);
+
+ //
+ // The batch stream now belongs to the caller, until
+ // finishBatchRequest() or abortBatchRequest() is called.
+ //
+}
+
+void
+IceE::Connection::finishBatchRequest(BasicStream* os)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // Get the batch stream back and increment the number of requests
+ // in the batch.
+ //
+ _batchStream.swap(*os);
+ ++_batchRequestNum;
+
+ //
+ // Notify about the batch stream not being in use anymore.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+}
+
+void
+IceE::Connection::abortBatchRequest()
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // Destroy and reset the batch stream and batch count. We cannot
+ // safe old requests in the batch stream, as they might be
+ // corrupted due to incomplete marshaling.
+ //
+ BasicStream dummy(_instance.get());
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+
+ //
+ // Notify about the batch stream not being in use
+ // anymore.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+}
+
+void
+IceE::Connection::flushBatchRequests()
+{
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ if(_batchStream.b.empty())
+ {
+ return; // Nothing to do.
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ _batchStream.i = _batchStream.b.begin();
+
+ //
+ // Prevent that new batch requests are added while we are
+ // flushing.
+ //
+ _batchStreamInUse = true;
+ }
+
+ try
+ {
+ IceE::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ //
+ // Fill in the number of requests in the batch.
+ //
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICEE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+
+ Int sz = static_cast<Int>(_batchStream.b.size());
+ p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICEE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
+#endif
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver->write(_batchStream, _endpoint->timeout());
+ }
+ catch(const LocalException& ex)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+
+ //
+ // Since batch requests are all oneways, we
+ // must report the exception to the caller.
+ //
+ _exception->ice_throw();
+ }
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // Reset the batch stream, and notify that flushing is over.
+ //
+ BasicStream dummy(_instance.get());
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+}
+#endif
+
+#ifndef ICEE_PURE_CLIENT
+void
+IceE::Connection::sendResponse(BasicStream* os)
+{
+ try
+ {
+ IceE::Mutex::Lock sendSync(_sendMutex);
+
+ if(!_transceiver) // Has the transceiver already been closed?
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ Int sz = static_cast<Int>(os->b.size());
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
+#ifdef ICEE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + 10);
+#endif
+
+ //
+ // Send the reply.
+ //
+ os->i = os->b.begin();
+ traceReply("sending reply", *os, _logger, _traceLevels);
+ _transceiver->write(*os, _endpoint->timeout());
+ }
+ catch(const LocalException& ex)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ }
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+}
+
+void
+IceE::Connection::sendNoResponse()
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+}
+#endif
+
+EndpointPtr
+IceE::Connection::endpoint() const
+{
+ return _endpoint; // No mutex protection necessary, _endpoint is immutable.
+}
+
+ObjectPrx
+IceE::Connection::createProxy(const Identity& ident) const
+{
+ //
+ // Create a reference and return a reverse proxy for this
+ // reference.
+ //
+ vector<ConnectionPtr> connections;
+ connections.push_back(const_cast<Connection*>(this));
+ ReferencePtr ref = _instance->referenceFactory()->create(ident, Context(), "", Reference::ModeTwoway, connections);
+ return _instance->proxyFactory()->referenceToProxy(ref);
+}
+
+void
+IceE::Connection::exception(const LocalException& ex)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+}
+
+string
+IceE::Connection::type() const
+{
+ return _type; // No mutex lock, _type is immutable.
+}
+
+IceE::Int
+IceE::Connection::timeout() const
+{
+ return _endpoint->timeout(); // No mutex lock, _endpoint is immutable.
+}
+
+string
+IceE::Connection::toString() const
+{
+ return _desc; // No mutex lock, _desc is immutable.
+}
+
+#ifndef ICEE_PURE_CLIENT
+IceE::Connection::Connection(const InstancePtr& instance,
+ const TransceiverPtr& transceiver,
+ const EndpointPtr& endpoint,
+ const ObjectAdapterPtr& adapter) :
+#else
+IceE::Connection::Connection(const InstancePtr& instance,
+ const TransceiverPtr& transceiver,
+ const EndpointPtr& endpoint) :
+#endif
+
+ _instance(instance),
+ _transceiver(transceiver),
+ _desc(transceiver->toString()),
+ _type(transceiver->type()),
+ _endpoint(endpoint),
+#ifndef ICEE_PURE_CLIENT
+ _adapter(adapter),
+#endif
+ _logger(_instance->logger()), // Cached for better performance.
+ _traceLevels(_instance->traceLevels()), // Cached for better performance.
+ _warn(_instance->properties()->getPropertyAsInt("IceE.Warn.Connections") > 0),
+ _requestHdr(headerSize + sizeof(Int), 0),
+#ifndef ICEE_NO_BATCH
+ _requestBatchHdr(headerSize + sizeof(Int), 0),
+ _batchStream(_instance.get()),
+ _batchStreamInUse(false),
+ _batchRequestNum(0),
+#endif
+#ifndef ICEE_PURE_CLIENT
+ _replyHdr(headerSize, 0),
+#endif
+ _nextRequestId(1),
+ _requestsHint(_requests.end()),
+ _dispatchCount(0),
+ _state(StateNotValidated),
+ _stateTime(IceE::Time::now())
+{
+ vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr);
+ requestHdr[0] = magic[0];
+ requestHdr[1] = magic[1];
+ requestHdr[2] = magic[2];
+ requestHdr[3] = magic[3];
+ requestHdr[4] = protocolMajor;
+ requestHdr[5] = protocolMinor;
+ requestHdr[6] = encodingMajor;
+ requestHdr[7] = encodingMinor;
+ requestHdr[8] = requestMsg;
+ requestHdr[9] = 0;
+
+#ifndef ICEE_NO_BATCH
+ vector<Byte>& requestBatchHdr = const_cast<vector<Byte>&>(_requestBatchHdr);
+ requestBatchHdr[0] = magic[0];
+ requestBatchHdr[1] = magic[1];
+ requestBatchHdr[2] = magic[2];
+ requestBatchHdr[3] = magic[3];
+ requestBatchHdr[4] = protocolMajor;
+ requestBatchHdr[5] = protocolMinor;
+ requestBatchHdr[6] = encodingMajor;
+ requestBatchHdr[7] = encodingMinor;
+ requestBatchHdr[8] = requestBatchMsg;
+ requestBatchHdr[9] = 0;
+#endif
+
+#ifndef ICEE_PURE_CLIENT
+ vector<Byte>& replyHdr = const_cast<vector<Byte>&>(_replyHdr);
+ replyHdr[0] = magic[0];
+ replyHdr[1] = magic[1];
+ replyHdr[2] = magic[2];
+ replyHdr[3] = magic[3];
+ replyHdr[4] = protocolMajor;
+ replyHdr[5] = protocolMinor;
+ replyHdr[6] = encodingMajor;
+ replyHdr[7] = encodingMinor;
+ replyHdr[8] = replyMsg;
+ replyHdr[9] = 0;
+
+ if(_adapter)
+ {
+ _servantManager = _adapter->getServantManager();
+ }
+#endif
+
+ __setNoDelete(true);
+ try
+ {
+ //
+ // If we are in thread per connection mode, create the thread
+ // for this connection.
+ //
+ _threadPerConnection = new ThreadPerConnection(this);
+ _threadPerConnection->start(_instance->threadPerConnectionStackSize());
+ }
+ catch(const IceE::Exception& ex)
+ {
+ {
+ Error out(_logger);
+ out << "cannot create thread for connection:\n" << ex.toString();
+ }
+
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ __setNoDelete(false);
+ ex.ice_throw();
+ }
+ __setNoDelete(false);
+}
+
+IceE::Connection::~Connection()
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_state == StateClosed);
+ assert(!_transceiver);
+ assert(_dispatchCount == 0);
+ assert(!_threadPerConnection);
+}
+
+void
+IceE::Connection::validate()
+{
+#ifndef ICEE_PURE_CLIENT
+ bool active;
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_state == StateNotValidated);
+
+ if(_adapter)
+ {
+ active = true; // The server side has the active role for connection validation.
+ }
+ else
+ {
+ active = false; // The client side has the passive role for connection validation.
+ }
+ }
+#endif
+
+ try
+ {
+ Int timeout;
+ if(_instance->defaultsAndOverrides()->overrideConnectTimeout)
+ {
+ timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint->timeout();
+ }
+
+#ifndef ICEE_PURE_CLIENT
+ if(active)
+ {
+ BasicStream os(_instance.get());
+ os.writeBlob(magic, sizeof(magic));
+ os.write(protocolMajor);
+ os.write(protocolMinor);
+ os.write(encodingMajor);
+ os.write(encodingMinor);
+ os.write(validateConnectionMsg);
+ os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ os.write(headerSize); // Message size.
+ os.i = os.b.begin();
+ traceHeader("sending validate connection", os, _logger, _traceLevels);
+ try
+ {
+ _transceiver->write(os, timeout);
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+ }
+ else
+#endif
+ {
+ BasicStream is(_instance.get());
+ is.b.resize(headerSize);
+ is.i = is.b.begin();
+ try
+ {
+ _transceiver->read(is, timeout);
+ }
+ catch(const TimeoutException&)
+ {
+ throw ConnectTimeoutException(__FILE__, __LINE__);
+ }
+ assert(is.i == is.b.end());
+ is.i = is.b.begin();
+ ByteSeq m(sizeof(magic), 0);
+ is.readBlob(m, static_cast<Int>(sizeof(magic)));
+ if(!equal(m.begin(), m.end(), magic))
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = m;
+ throw ex;
+ }
+ Byte pMajor;
+ Byte pMinor;
+ is.read(pMajor);
+ is.read(pMinor);
+ if(pMajor != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ is.read(eMajor);
+ is.read(eMinor);
+ if(eMajor != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ is.read(messageType);
+ if(messageType != validateConnectionMsg)
+ {
+ throw ConnectionNotValidatedException(__FILE__, __LINE__);
+ }
+ Byte compress;
+ is.read(compress); // Ignore compression status for validate connection.
+ Int size;
+ is.read(size);
+ if(size != headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ traceHeader("received validate connection", is, _logger, _traceLevels);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+#ifndef ICEE_PURE_CLIENT
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+#else
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // We start out in active state.
+ //
+ setState(StateActive);
+ }
+#endif
+}
+
+void
+IceE::Connection::setState(State state, const LocalException& ex)
+{
+ //
+ // If setState() is called with an exception, then only closed and
+ // closing states are permissible.
+ //
+ assert(state == StateClosing || state == StateClosed);
+
+ if(_state == state) // Don't switch twice.
+ {
+ return;
+ }
+
+ if(!_exception.get())
+ {
+ //
+ // If we are in closed state, an exception must be set.
+ //
+ assert(_state != StateClosed);
+
+ _exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+
+ if(_warn)
+ {
+ //
+ // We don't warn if we are not validated.
+ //
+ if(_state > StateNotValidated)
+ {
+ //
+ // Don't warn about certain expected exceptions.
+ //
+ if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
+ dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
+#ifndef ICEE_PURE_CLIENT
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
+#endif
+ (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
+ {
+ Warning out(_logger);
+ out << "connection exception:\n" << (*_exception.get()).toString() << "\n" << _desc;
+ }
+ }
+ }
+ }
+
+ //
+ // We must set the new state before we notify requests of any
+ // exceptions. Otherwise new requests may retry on a connection
+ // that is not yet marked as closed or closing.
+ //
+ setState(state);
+}
+
+void
+IceE::Connection::setState(State state)
+{
+ if(_state == state) // Don't switch twice.
+ {
+ return;
+ }
+
+ switch(state)
+ {
+ case StateNotValidated:
+ {
+ assert(false);
+ break;
+ }
+
+ case StateActive:
+ {
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+#ifndef ICEE_PURE_CLIENT
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
+ }
+#else
+ if(_state != StateNotValidated)
+ {
+ return;
+ }
+#endif
+ break;
+ }
+
+#ifndef ICEE_PURE_CLIENT
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return;
+ }
+ break;
+ }
+#endif
+
+ case StateClosing:
+ {
+ //
+ // Can't change back from closed.
+ //
+ if(_state == StateClosed)
+ {
+ return;
+ }
+ break;
+ }
+
+ case StateClosed:
+ {
+ //
+ // If we are in thread per connection mode, we
+ // shutdown both for reading and writing. This will
+ // unblock and read call with an exception. The thread
+ // per connection then closes the transceiver.
+ //
+ _transceiver->shutdownReadWrite();
+ break;
+ }
+ }
+
+ _state = state;
+ _stateTime = IceE::Time::now();
+
+ notifyAll();
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ try
+ {
+ initiateShutdown();
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+}
+
+void
+IceE::Connection::initiateShutdown() const
+{
+ assert(_state == StateClosing);
+ assert(_dispatchCount == 0);
+
+ IceE::Mutex::Lock sendSync(_sendMutex);
+
+ //
+ // Before we shut down, we send a close connection message.
+ //
+ BasicStream os(_instance.get());
+ os.writeBlob(magic, sizeof(magic));
+ os.write(protocolMajor);
+ os.write(protocolMinor);
+ os.write(encodingMajor);
+ os.write(encodingMinor);
+ os.write(closeConnectionMsg);
+ os.write((Byte)0); // Compression status: compression not supported.
+ os.write(headerSize); // Message size.
+
+ //
+ // Send the message.
+ //
+ os.i = os.b.begin();
+ traceHeader("sending close connection", os, _logger, _traceLevels);
+ _transceiver->write(os, _endpoint->timeout());
+
+ //
+ // The CloseConnection message should be sufficient. Closing the
+ // write end of the socket is probably an artifact of how things
+ // were done in IIOP. In fact, shutting down the write end of the
+ // socket causes problems on Windows by preventing the peer from
+ // using the socket. For example, the peer is no longer able to
+ // continue writing a large message after the socket is shutdown.
+ //
+ //_transceiver->shutdownWrite();
+}
+
+void
+IceE::Connection::parseMessage(BasicStream& stream, Int& requestId
+#ifndef ICEE_PURE_CLIENT
+ ,Int& invokeNum, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter
+#endif
+)
+{
+ assert(_state > StateNotValidated && _state < StateClosed);
+
+ try
+ {
+ //
+ // We don't need to check magic and version here. This has
+ // already been done by the ThreadPerConnection,
+ // which provides us with the stream.
+ //
+ assert(stream.i == stream.b.end());
+ stream.i = stream.b.begin() + 8;
+ Byte messageType;
+ stream.read(messageType);
+
+ Byte compress;
+ stream.read(compress);
+ if(compress == 2)
+ {
+ throw CompressionNotSupportedException(__FILE__, __LINE__);
+ }
+ stream.i = stream.b.begin() + headerSize;
+
+ switch(messageType)
+ {
+ case closeConnectionMsg:
+ {
+ traceHeader("received close connection", stream, _logger, _traceLevels);
+ setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ break;
+ }
+
+#ifndef ICEE_PURE_CLIENT
+ case requestMsg:
+ {
+ if(_state == StateClosing)
+ {
+ traceRequest("received request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceRequest("received request", stream, _logger, _traceLevels);
+ stream.read(requestId);
+ invokeNum = 1;
+ servantManager = _servantManager;
+ adapter = _adapter;
+ ++_dispatchCount;
+ }
+ break;
+ }
+
+ case requestBatchMsg:
+ {
+ if(_state == StateClosing)
+ {
+ traceBatchRequest("received batch request during closing\n"
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
+ }
+ else
+ {
+ traceBatchRequest("received batch request", stream, _logger, _traceLevels);
+ stream.read(invokeNum);
+ if(invokeNum < 0)
+ {
+ invokeNum = 0;
+ throw NegativeSizeException(__FILE__, __LINE__);
+ }
+ servantManager = _servantManager;
+ adapter = _adapter;
+ _dispatchCount += invokeNum;
+ }
+ break;
+ }
+#endif
+
+ case replyMsg:
+ {
+ traceReply("received reply", stream, _logger, _traceLevels);
+
+ stream.read(requestId);
+
+ map<Int, Outgoing*>::iterator p = _requests.end();
+
+ if(_requestsHint != _requests.end())
+ {
+ if(_requestsHint->first == requestId)
+ {
+ p = _requestsHint;
+ }
+ }
+
+ if(p == _requests.end())
+ {
+ p = _requests.find(requestId);
+ }
+
+ if(p == _requests.end())
+ {
+ throw UnknownRequestIdException(__FILE__, __LINE__);
+ }
+
+ if(p != _requests.end())
+ {
+ p->second->finished(stream);
+
+ if(p == _requestsHint)
+ {
+ _requests.erase(p++);
+ _requestsHint = p;
+ }
+ else
+ {
+ _requests.erase(p);
+ }
+ }
+
+ break;
+ }
+
+ case validateConnectionMsg:
+ {
+ traceHeader("received validate connection", stream, _logger, _traceLevels);
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring unexpected validate connection message:\n" << _desc;
+ }
+ break;
+ }
+
+ default:
+ {
+ traceHeader("received unknown message\n"
+ "(invalid, closing connection)",
+ stream, _logger, _traceLevels);
+ throw UnknownMessageException(__FILE__, __LINE__);
+ break;
+ }
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+}
+
+#ifndef ICEE_PURE_CLIENT
+void
+IceE::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId,
+ const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter)
+{
+ //
+ // Note: In contrast to other private or protected methods, this
+ // operation must be called *without* the mutex locked.
+ //
+
+ try
+ {
+ while(invokeNum > 0)
+ {
+ //
+ // Prepare the invocation.
+ //
+ bool response = requestId != 0;
+ Incoming in(_instance.get(), this, adapter, response);
+ BasicStream* is = in.is();
+ stream.swap(*is);
+ BasicStream* os = in.os();
+
+ //
+ // Prepare the response if necessary.
+ //
+ if(response)
+ {
+ assert(invokeNum == 1); // No further invocations if a response is expected.
+ os->writeBlob(_replyHdr);
+
+ //
+ // Add the request ID.
+ //
+ os->write(requestId);
+ }
+
+ in.invoke(servantManager);
+
+ //
+ // If there are more invocations, we need the stream back.
+ //
+ if(--invokeNum > 0)
+ {
+ stream.swap(*is);
+ }
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateClosed, ex);
+ }
+ catch(const std::exception& ex)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ UnknownException uex(__FILE__, __LINE__);
+ uex.unknown = string("std::exception: ") + ex.what();
+ setState(StateClosed, uex);
+ }
+ catch(...)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ UnknownException uex(__FILE__, __LINE__);
+ uex.unknown = "unknown c++ exception";
+ setState(StateClosed, uex);
+ }
+
+ //
+ // If invoke() above raised an exception, and therefore neither
+ // sendResponse() nor sendNoResponse() has been called, then we
+ // must decrement _dispatchCount here.
+ //
+ if(invokeNum > 0)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ assert(_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert(_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+ }
+}
+#endif
+
+void
+IceE::Connection::run()
+{
+ //
+ // The thread-per-connection must validate and activate this connection,
+ // and not in the connection factory. Please see the comments in the connection
+ // factory for details.
+ //
+ try
+ {
+ validate();
+ }
+ catch(const LocalException&)
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_state == StateClosed);
+
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ IceE::Mutex::Lock sendSync(_sendMutex);
+
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ _transceiver = 0;
+ notifyAll();
+ return;
+ }
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ setState(StateActive);
+ }
+
+ bool closed = false;
+
+ while(!closed)
+ {
+ //
+ // We must accept new connections outside the thread
+ // synchronization, because we use blocking accept.
+ //
+
+ BasicStream stream(_instance.get());
+
+ try
+ {
+ stream.b.resize(headerSize);
+ stream.i = stream.b.begin();
+ _transceiver->read(stream, -1);
+
+ ptrdiff_t pos = stream.i - stream.b.begin();
+ assert(pos >= headerSize);
+ stream.i = stream.b.begin();
+ ByteSeq m(sizeof(magic), 0);
+ stream.readBlob(m, static_cast<Int>(sizeof(magic)));
+ if(!equal(m.begin(), m.end(), magic))
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = m;
+ throw ex;
+ }
+ Byte pMajor;
+ Byte pMinor;
+ stream.read(pMajor);
+ stream.read(pMinor);
+ if(pMajor != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ stream.read(eMajor);
+ stream.read(eMinor);
+ if(eMajor != encodingMajor)
+ {
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ stream.read(messageType);
+ Byte compress;
+ stream.read(compress);
+ Int size;
+ stream.read(size);
+ if(size < headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(_instance->messageSizeMax()))
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(stream.b.size()))
+ {
+ stream.b.resize(size);
+ }
+ stream.i = stream.b.begin() + pos;
+
+ if(stream.i != stream.b.end())
+ {
+ _transceiver->read(stream, -1);
+ assert(stream.i == stream.b.end());
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ exception(ex);
+ }
+
+ Int requestId = 0;
+ Int invokeNum = 0;
+#ifndef ICEE_PURE_CLIENT
+ ServantManagerPtr servantManager;
+ ObjectAdapterPtr adapter;
+#endif
+
+ auto_ptr<LocalException> exception;
+
+ map<Int, Outgoing*> requests;
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+#ifndef ICEE_PURE_CLIENT
+ while(_state == StateHolding)
+ {
+ wait();
+ }
+#endif
+
+ if(_state != StateClosed)
+ {
+#ifndef ICEE_PURE_CLIENT
+ parseMessage(stream, requestId, invokeNum, servantManager, adapter);
+#else
+ parseMessage(stream, requestId);
+#endif
+ }
+
+ //
+ // parseMessage() can close the connection, so we must
+ // check for closed state again.
+ //
+ if(_state == StateClosed)
+ {
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ IceE::Mutex::Lock sendSync(_sendMutex);
+
+ try
+ {
+ _transceiver->close();
+ }
+ catch(const LocalException& ex)
+ {
+ exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+ }
+
+ _transceiver = 0;
+ notifyAll();
+
+ //
+ // We cannot simply return here. We have to make sure
+ // that all requests are notified about the closed
+ // connection below.
+ //
+ closed = true;
+ }
+
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests.swap(_requests);
+ _requestsHint = _requests.end();
+ }
+ }
+
+ //
+ // Method invocation (or multiple invocations for batch
+ // messages) must be done outside the thread synchronization,
+ // so that nested calls are possible.
+ //
+#ifndef ICEE_PURE_CLIENT
+ invokeAll(stream, invokeNum, requestId, servantManager, adapter);
+#endif
+
+ for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ p->second->finished(*_exception.get()); // The exception is immutable at this point.
+ }
+
+ if(exception.get())
+ {
+ assert(closed);
+ exception->ice_throw();
+ }
+ }
+}
+
+IceE::Connection::ThreadPerConnection::ThreadPerConnection(const ConnectionPtr& connection) :
+ _connection(connection)
+{
+}
+
+void
+IceE::Connection::ThreadPerConnection::run()
+{
+ try
+ {
+ _connection->run();
+ }
+ catch(const Exception& ex)
+ {
+ Error out(_connection->_instance->logger());
+ out << "exception in thread per connection:\n" << _connection->toString() << ex.toString();
+ }
+ catch(const std::exception& ex)
+ {
+ Error out(_connection->_instance->logger());
+ out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what();
+ }
+ catch(...)
+ {
+ Error out(_connection->_instance->logger());
+ out << "unknown exception in thread per connection:\n" << _connection->toString();
+ }
+
+ _connection = 0; // Resolve cyclic dependency.
+}
+
+#ifndef ICEE_PURE_CLIENT
+
+void
+IceE::Connection::setAdapter(const ObjectAdapterPtr& adapter)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+
+ assert(_state < StateClosing);
+
+ //
+ // Before we set an adapter (or reset it) we wait until the
+ // dispatch count with any old adapter is zero.
+ //
+ // A deadlock can occur if we wait while an operation is
+ // outstanding on this adapter that holds a lock while
+ // calling this function (e.g., __getDelegate).
+ //
+ // In order to avoid such a deadlock, we only wait if the new
+ // adapter is different than the current one.
+ //
+ // TODO: Verify that this fix solves all cases.
+ //
+ if(_adapter.get() != adapter.get())
+ {
+ while(_dispatchCount > 0)
+ {
+ wait();
+ }
+
+ _adapter = adapter;
+ if(_adapter)
+ {
+ _servantManager = _adapter->getServantManager();
+ }
+ else
+ {
+ _servantManager = 0;
+ }
+ }
+}
+
+ObjectAdapterPtr
+IceE::Connection::getAdapter() const
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+ return _adapter;
+}
+#endif
+