summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Outgoing.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2015-03-03 17:30:50 -0500
committerJoe George <joe@zeroc.com>2015-05-12 11:41:55 -0400
commitd35bb9f5c19e34aee31f83d445695a8186ef675e (patch)
treed5324eaf44f5f9776495537c51653f50a66a7237 /cpp/src/Ice/Outgoing.cpp
downloadice-d35bb9f5c19e34aee31f83d445695a8186ef675e.tar.bz2
ice-d35bb9f5c19e34aee31f83d445695a8186ef675e.tar.xz
ice-d35bb9f5c19e34aee31f83d445695a8186ef675e.zip
Ice 3.4.2 Source Distributionv3.4.2
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r--cpp/src/Ice/Outgoing.cpp598
1 files changed, 598 insertions, 0 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
new file mode 100644
index 00000000000..8024b9e64a7
--- /dev/null
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -0,0 +1,598 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Outgoing.h>
+#include <Ice/Object.h>
+#include <Ice/RequestHandler.h>
+#include <Ice/ConnectionI.h>
+#include <Ice/Reference.h>
+#include <Ice/Endpoint.h>
+#include <Ice/LocalException.h>
+#include <Ice/Protocol.h>
+#include <Ice/Instance.h>
+#include <Ice/ReplyStatus.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+IceInternal::LocalExceptionWrapper::LocalExceptionWrapper(const LocalException& ex, bool r) :
+ _retry(r)
+{
+ _ex.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+}
+
+IceInternal::LocalExceptionWrapper::LocalExceptionWrapper(const LocalExceptionWrapper& ex) :
+ _retry(ex._retry)
+{
+ _ex.reset(dynamic_cast<LocalException*>(ex.get()->ice_clone()));
+}
+
+void
+IceInternal::LocalExceptionWrapper::throwWrapper(const std::exception& ex)
+{
+
+ const UserException* ue = dynamic_cast<const UserException*>(&ex);
+ if(ue)
+ {
+ stringstream s;
+ s << *ue;
+ throw LocalExceptionWrapper(UnknownUserException(__FILE__, __LINE__, s.str()), false);
+ }
+
+ const LocalException* le = dynamic_cast<const LocalException*>(&ex);
+ if(le)
+ {
+ if(dynamic_cast<const UnknownException*>(le) ||
+ dynamic_cast<const ObjectNotExistException*>(le) ||
+ dynamic_cast<const OperationNotExistException*>(le) ||
+ dynamic_cast<const FacetNotExistException*>(le))
+ {
+ throw LocalExceptionWrapper(*le, false);
+ }
+ stringstream s;
+ s << *le;
+#ifdef __GNUC__
+ s << "\n" << le->ice_stackTrace();
+#endif
+ throw LocalExceptionWrapper(UnknownLocalException(__FILE__, __LINE__, s.str()), false);
+ }
+ string msg = "std::exception: ";
+ throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, msg + ex.what()), false);
+}
+
+const LocalException*
+IceInternal::LocalExceptionWrapper::get() const
+{
+ assert(_ex.get());
+ return _ex.get();
+}
+
+bool
+IceInternal::LocalExceptionWrapper::retry() const
+{
+ return _retry;
+}
+
+IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode,
+ const Context* context) :
+ _handler(handler),
+ _state(StateUnsent),
+ _is(handler->getReference()->getInstance().get()),
+ _os(handler->getReference()->getInstance().get()),
+ _sent(false)
+{
+ switch(_handler->getReference()->getMode())
+ {
+ case Reference::ModeTwoway:
+ case Reference::ModeOneway:
+ case Reference::ModeDatagram:
+ {
+ _os.writeBlob(requestHdr, sizeof(requestHdr));
+ break;
+ }
+
+ case Reference::ModeBatchOneway:
+ case Reference::ModeBatchDatagram:
+ {
+ _handler->prepareBatchRequest(&_os);
+ break;
+ }
+ }
+
+ try
+ {
+ _handler->getReference()->getIdentity().__write(&_os);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ if(_handler->getReference()->getFacet().empty())
+ {
+ _os.write(static_cast<string*>(0), static_cast<string*>(0));
+ }
+ else
+ {
+ string facet = _handler->getReference()->getFacet();
+ _os.write(&facet, &facet + 1);
+ }
+
+ _os.write(operation, false);
+
+ _os.write(static_cast<Byte>(mode));
+
+ if(context != 0)
+ {
+ //
+ // Explicit context
+ //
+ __writeContext(&_os, *context);
+ }
+ else
+ {
+ //
+ // Implicit context
+ //
+ const ImplicitContextIPtr& implicitContext = _handler->getReference()->getInstance()->getImplicitContext();
+ const Context& prxContext = _handler->getReference()->getContext()->getValue();
+ if(implicitContext == 0)
+ {
+ __writeContext(&_os, prxContext);
+ }
+ else
+ {
+ implicitContext->write(prxContext, &_os);
+ }
+ }
+
+ //
+ // Input and output parameters are always sent in an
+ // encapsulation, which makes it possible to forward requests as
+ // blobs.
+ //
+ _os.startWriteEncaps();
+ }
+ catch(const LocalException& ex)
+ {
+ abort(ex);
+ }
+}
+
+bool
+IceInternal::Outgoing::invoke()
+{
+ assert(_state == StateUnsent);
+
+ _os.endWriteEncaps();
+
+ switch(_handler->getReference()->getMode())
+ {
+ case Reference::ModeTwoway:
+ {
+ _state = StateInProgress;
+
+ Ice::ConnectionI* connection = _handler->sendRequest(this);
+ assert(connection);
+
+ bool timedOut = false;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+
+ //
+ // If the request is being sent in the background we first wait for the
+ // sent notification.
+ //
+ while(_state != StateFailed && !_sent)
+ {
+ _monitor.wait();
+ }
+
+ //
+ // Wait until the request has completed, or until the request times out.
+ //
+
+ Int timeout = connection->timeout();
+ while(_state == StateInProgress && !timedOut)
+ {
+ if(timeout >= 0)
+ {
+ _monitor.timedWait(IceUtil::Time::milliSeconds(timeout));
+
+ if(_state == StateInProgress)
+ {
+ timedOut = true;
+ }
+ }
+ else
+ {
+ _monitor.wait();
+ }
+ }
+ }
+
+ if(timedOut)
+ {
+ //
+ // Must be called outside the synchronization of this
+ // object.
+ //
+ connection->exception(TimeoutException(__FILE__, __LINE__));
+
+ //
+ // We must wait until the exception set above has
+ // propagated to this Outgoing object.
+ //
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(_state == StateInProgress)
+ {
+ _monitor.wait();
+ }
+ }
+ }
+
+ if(_exception.get())
+ {
+ //
+ // A CloseConnectionException indicates graceful
+ // server shutdown, and is therefore always repeatable
+ // without violating "at-most-once". That's because by
+ // sending a close connection message, the server
+ // guarantees that all outstanding requests can safely
+ // be repeated.
+ //
+ // An ObjectNotExistException can always be retried as
+ // well without violating "at-most-once" (see the
+ // implementation of the checkRetryAfterException
+ // method of the ProxyFactory class for the reasons
+ // why it can be useful).
+ //
+ if(!_sent ||
+ dynamic_cast<CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<ObjectNotExistException*>(_exception.get()))
+ {
+ _exception->ice_throw();
+ }
+
+ //
+ // Throw the exception wrapped in a LocalExceptionWrapper,
+ // to indicate that the request cannot be resent without
+ // potentially violating the "at-most-once" principle.
+ //
+ throw LocalExceptionWrapper(*_exception.get(), false);
+ }
+
+ if(_state == StateUserException)
+ {
+ return false;
+ }
+ else
+ {
+ assert(_state == StateOK);
+ return true;
+ }
+ }
+
+ case Reference::ModeOneway:
+ case Reference::ModeDatagram:
+ {
+ _state = StateInProgress;
+ if(_handler->sendRequest(this))
+ {
+ //
+ // If the handler returns the connection, we must wait for the sent callback.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(_state != StateFailed && !_sent)
+ {
+ _monitor.wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+ }
+ return true;
+ }
+
+ case Reference::ModeBatchOneway:
+ case Reference::ModeBatchDatagram:
+ {
+ //
+ // For batch oneways and datagrams, the same rules as for
+ // regular oneways and datagrams (see comment above)
+ // apply.
+ //
+ _state = StateInProgress;
+ _handler->finishBatchRequest(&_os);
+ return true;
+ }
+ }
+
+ assert(false);
+ return false;
+}
+
+void
+IceInternal::Outgoing::abort(const LocalException& ex)
+{
+ assert(_state == StateUnsent);
+
+ //
+ // If we didn't finish a batch oneway or datagram request, we must
+ // notify the connection about that we give up ownership of the
+ // batch stream.
+ //
+ if(_handler->getReference()->getMode() == Reference::ModeBatchOneway ||
+ _handler->getReference()->getMode() == Reference::ModeBatchDatagram)
+ {
+ _handler->abortBatchRequest();
+ }
+
+ ex.ice_throw();
+}
+
+void
+IceInternal::Outgoing::sent(bool notify)
+{
+ if(notify)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _sent = true;
+ _monitor.notify();
+ }
+ else
+ {
+ //
+ // No synchronization is necessary if called from sendRequest() because the connection
+ // send mutex is locked and no other threads can call on Outgoing until it's released.
+ //
+ _sent = true;
+ }
+}
+
+void
+IceInternal::Outgoing::finished(BasicStream& is)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+
+ assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
+
+ assert(_state <= StateInProgress);
+
+ _is.swap(is);
+ Byte replyStatus;
+ _is.read(replyStatus);
+
+ switch(replyStatus)
+ {
+ case replyOK:
+ {
+ _state = StateOK; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ case replyUserException:
+ {
+ _state = StateUserException; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ case replyObjectNotExist:
+ case replyFacetNotExist:
+ case replyOperationNotExist:
+ {
+ //
+ // Don't read the exception members directly into the
+ // exception. Otherwise if reading fails and raises an
+ // exception, you will have a memory leak.
+ //
+ Identity ident;
+ ident.__read(&_is);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ vector<string> facetPath;
+ _is.read(facetPath);
+ string facet;
+ if(!facetPath.empty())
+ {
+ if(facetPath.size() > 1)
+ {
+ throw MarshalException(__FILE__, __LINE__);
+ }
+ facet.swap(facetPath[0]);
+ }
+
+ string operation;
+ _is.read(operation, false);
+
+ RequestFailedException* ex;
+ switch(replyStatus)
+ {
+ case replyObjectNotExist:
+ {
+ ex = new ObjectNotExistException(__FILE__, __LINE__);
+ break;
+ }
+
+ case replyFacetNotExist:
+ {
+ ex = new FacetNotExistException(__FILE__, __LINE__);
+ break;
+ }
+
+ case replyOperationNotExist:
+ {
+ ex = new OperationNotExistException(__FILE__, __LINE__);
+ break;
+ }
+
+ default:
+ {
+ ex = 0; // To keep the compiler from complaining.
+ assert(false);
+ break;
+ }
+ }
+
+ ex->id = ident;
+ ex->facet = facet;
+ ex->operation = operation;
+ _exception.reset(ex);
+
+ _state = StateLocalException; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ case replyUnknownException:
+ case replyUnknownLocalException:
+ case replyUnknownUserException:
+ {
+ //
+ // Don't read the exception members directly into the
+ // exception. Otherwise if reading fails and raises an
+ // exception, you will have a memory leak.
+ //
+ string unknown;
+ _is.read(unknown, false);
+
+ UnknownException* ex;
+ switch(replyStatus)
+ {
+ case replyUnknownException:
+ {
+ ex = new UnknownException(__FILE__, __LINE__);
+ break;
+ }
+
+ case replyUnknownLocalException:
+ {
+ ex = new UnknownLocalException(__FILE__, __LINE__);
+ break;
+ }
+
+ case replyUnknownUserException:
+ {
+ ex = new UnknownUserException(__FILE__, __LINE__);
+ break;
+ }
+
+ default:
+ {
+ ex = 0; // To keep the compiler from complaining.
+ assert(false);
+ break;
+ }
+ }
+
+ ex->unknown = unknown;
+ _exception.reset(ex);
+
+ _state = StateLocalException; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ default:
+ {
+ _exception.reset(new UnknownReplyStatusException(__FILE__, __LINE__));
+ _state = StateLocalException;
+ break;
+ }
+ }
+
+ _monitor.notify();
+}
+
+void
+IceInternal::Outgoing::finished(const LocalException& ex, bool sent)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(_state <= StateInProgress);
+ _state = StateFailed;
+ _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ _sent = sent;
+ _monitor.notify();
+}
+
+void
+IceInternal::Outgoing::throwUserException()
+{
+ try
+ {
+ _is.startReadEncaps();
+ _is.throwException();
+ }
+ catch(const Ice::UserException&)
+ {
+ _is.endReadEncaps();
+ throw;
+ }
+}
+
+IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler) :
+ _handler(handler),
+ _connection(0),
+ _sent(false),
+ _os(handler->getReference()->getInstance().get())
+{
+}
+
+IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance) :
+ _handler(0),
+ _connection(connection),
+ _sent(false),
+ _os(instance)
+{
+}
+
+void
+IceInternal::BatchOutgoing::invoke()
+{
+ assert(_handler || _connection);
+ if((_handler && !_handler->flushBatchRequests(this)) || (_connection && !_connection->flushBatchRequests(this)))
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!_exception.get() && !_sent)
+ {
+ _monitor.wait();
+ }
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+ }
+}
+
+void
+IceInternal::BatchOutgoing::sent(bool notify)
+{
+ if(notify)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _sent = true;
+ _monitor.notify();
+ }
+ else
+ {
+ _sent = true;
+ }
+}
+
+void
+IceInternal::BatchOutgoing::finished(const Ice::LocalException& ex, bool)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ _monitor.notify();
+}