summaryrefslogtreecommitdiff
path: root/cppe/src/IceE/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cppe/src/IceE/Outgoing.cpp')
-rw-r--r--cppe/src/IceE/Outgoing.cpp457
1 files changed, 457 insertions, 0 deletions
diff --git a/cppe/src/IceE/Outgoing.cpp b/cppe/src/IceE/Outgoing.cpp
new file mode 100644
index 00000000000..d0270242403
--- /dev/null
+++ b/cppe/src/IceE/Outgoing.cpp
@@ -0,0 +1,457 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2005 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICEE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceE/Outgoing.h>
+#include <IceE/Object.h>
+#include <IceE/Connection.h>
+#include <IceE/Reference.h>
+#include <IceE/Endpoint.h>
+#include <IceE/LocalException.h>
+
+using namespace std;
+using namespace IceE;
+using namespace IceEInternal;
+
+IceEInternal::NonRepeatable::NonRepeatable(const NonRepeatable& ex)
+{
+ _ex = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.get()->ice_clone()));
+}
+
+IceEInternal::NonRepeatable::NonRepeatable(const ::IceE::LocalException& ex)
+{
+ _ex = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+}
+
+const ::IceE::LocalException*
+IceEInternal::NonRepeatable::get() const
+{
+ assert(_ex.get());
+ return _ex.get();
+}
+
+IceEInternal::Outgoing::Outgoing(Connection* connection, Reference* ref, const string& operation,
+ OperationMode mode, const Context& context) :
+ _connection(connection),
+ _reference(ref),
+ _state(StateUnsent),
+ _is(ref->getInstance().get()),
+ _os(ref->getInstance().get())
+{
+ switch(_reference->getMode())
+ {
+ case Reference::ModeTwoway:
+ case Reference::ModeOneway:
+ {
+ _connection->prepareRequest(&_os);
+ break;
+ }
+
+#ifndef ICEE_NO_BATCH
+ case Reference::ModeBatchOneway:
+ {
+ _connection->prepareBatchRequest(&_os);
+ break;
+ }
+#endif
+ }
+
+ _reference->getIdentity().__write(&_os);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ if(_reference->getFacet().empty())
+ {
+ _os.write(vector<string>());
+ }
+ else
+ {
+ vector<string> facetPath;
+ facetPath.push_back(_reference->getFacet());
+ _os.write(facetPath);
+ }
+
+ _os.write(operation);
+
+ _os.write(static_cast<Byte>(mode));
+
+ _os.writeSize(Int(context.size()));
+ Context::const_iterator p;
+ for(p = context.begin(); p != context.end(); ++p)
+ {
+ _os.write(p->first);
+ _os.write(p->second);
+ }
+
+ //
+ // Input and output parameters are always sent in an
+ // encapsulation, which makes it possible to forward requests as
+ // blobs.
+ //
+ _os.startWriteEncaps();
+}
+
+bool
+IceEInternal::Outgoing::invoke()
+{
+ assert(_state == StateUnsent);
+
+ _os.endWriteEncaps();
+
+ switch(_reference->getMode())
+ {
+ case Reference::ModeTwoway:
+ {
+ //
+ // We let all exceptions raised by sending directly
+ // propagate to the caller, because they can be retried
+ // without violating "at-most-once". In case of such
+ // exceptions, the connection object does not call back on
+ // this object, so we don't need to lock the mutex, keep
+ // track of state, or save exceptions.
+ //
+ _connection->sendRequest(&_os, this);
+
+ //
+ // Wait until the request has completed, or until the
+ // request times out.
+ //
+
+ bool timedOut = false;
+
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ //
+ // It's possible that the request has already
+ // completed, due to a regular response, or because of
+ // an exception. So we only change the state to "in
+ // progress" if it is still "unsent".
+ //
+ if(_state == StateUnsent)
+ {
+ _state = StateInProgress;
+ }
+
+ Int timeout = _connection->timeout();
+ while(_state == StateInProgress && !timedOut)
+ {
+ if(timeout >= 0)
+ {
+ timedWait(IceE::Time::milliSeconds(timeout));
+
+ if(_state == StateInProgress)
+ {
+ timedOut = true;
+ }
+ }
+ else
+ {
+ wait();
+ }
+ }
+ }
+
+ if(timedOut)
+ {
+ //
+ // Must be called outside the synchronization of this
+ // object.
+ //
+ _connection->exception(TimeoutException(__FILE__, __LINE__));
+
+ //
+ // We must wait until the exception set above has
+ // propagated to this Outgoing object.
+ //
+ {
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ while(_state == StateInProgress)
+ {
+ wait();
+ }
+ }
+ }
+
+ if(_exception.get())
+ {
+ //
+ // A CloseConnectionException indicates graceful
+ // server shutdown, and is therefore always repeatable
+ // without violating "at-most-once". That's because by
+ // sending a close connection message, the server
+ // guarantees that all outstanding requests can safely
+ // be repeated.
+ //
+ // An ObjectNotExistException can always be retried as
+ // well without violating "at-most-once".
+ //
+ if(dynamic_cast<CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<ObjectNotExistException*>(_exception.get()))
+ {
+ _exception->ice_throw();
+ }
+
+ //
+ // Throw the exception wrapped in a NonRepeatable, to
+ // indicate that the request cannot be resent without
+ // potentially violating the "at-most-once" principle.
+ //
+ throw NonRepeatable(*_exception.get());
+ }
+
+ if(_state == StateUserException)
+ {
+ return false;
+ }
+
+ assert(_state == StateOK);
+ break;
+ }
+
+ case Reference::ModeOneway:
+ {
+ //
+ // For oneway requests, the connection object
+ // never calls back on this object. Therefore we don't
+ // need to lock the mutex or save exceptions. We simply
+ // let all exceptions from sending propagate to the
+ // caller, because such exceptions can be retried without
+ // violating "at-most-once".
+ //
+ _state = StateInProgress;
+ _connection->sendRequest(&_os, 0);
+ break;
+ }
+
+#ifndef ICEE_NO_BATCH
+ case Reference::ModeBatchOneway:
+ {
+ //
+ // For batch oneways, the same rules as for
+ // regular oneways (see comment above)
+ // apply.
+ //
+ _state = StateInProgress;
+ _connection->finishBatchRequest(&_os);
+ break;
+ }
+#endif
+ }
+
+ return true;
+}
+
+void
+IceEInternal::Outgoing::abort(const LocalException& ex)
+{
+ assert(_state == StateUnsent);
+
+ //
+ // If we didn't finish a batch oneway request, we must
+ // notify the connection about that we give up ownership of the
+ // batch stream.
+ //
+#ifndef ICEE_NO_BATCH
+ if(_reference->getMode() == Reference::ModeBatchOneway)
+ {
+ _connection->abortBatchRequest();
+
+ //
+ // If we abort a batch requests, we cannot retry, because not
+ // only the batch request that caused the problem will be
+ // aborted, but all other requests in the batch as well.
+ //
+ throw NonRepeatable(ex);
+ }
+#endif
+
+ ex.ice_throw();
+}
+
+void
+IceEInternal::Outgoing::finished(BasicStream& is)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_reference->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
+
+ assert(_state <= StateInProgress);
+
+ _is.swap(is);
+ Byte status;
+ _is.read(status);
+
+ switch(static_cast<DispatchStatus>(status))
+ {
+ case DispatchOK:
+ {
+ //
+ // Input and output parameters are always sent in an
+ // encapsulation, which makes it possible to forward
+ // oneway requests as blobs.
+ //
+ _is.startReadEncaps();
+ _state = StateOK; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ case DispatchUserException:
+ {
+ //
+ // Input and output parameters are always sent in an
+ // encapsulation, which makes it possible to forward
+ // oneway requests as blobs.
+ //
+ _is.startReadEncaps();
+ _state = StateUserException; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ case DispatchObjectNotExist:
+ case DispatchFacetNotExist:
+ case DispatchOperationNotExist:
+ {
+ //
+ // Don't read the exception members directly into the
+ // exception. Otherwise if reading fails and raises an
+ // exception, you will have a memory leak.
+ //
+ Identity ident;
+ ident.__read(&_is);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ vector<string> facetPath;
+ _is.read(facetPath);
+ string facet;
+ if(!facetPath.empty())
+ {
+ if(facetPath.size() > 1)
+ {
+ throw MarshalException(__FILE__, __LINE__);
+ }
+ facet.swap(facetPath[0]);
+ }
+
+ string operation;
+ _is.read(operation);
+
+ RequestFailedException* ex;
+ switch(static_cast<DispatchStatus>(status))
+ {
+ case DispatchObjectNotExist:
+ {
+ ex = new ObjectNotExistException(__FILE__, __LINE__);
+ break;
+ }
+
+ case DispatchFacetNotExist:
+ {
+ ex = new FacetNotExistException(__FILE__, __LINE__);
+ break;
+ }
+
+ case DispatchOperationNotExist:
+ {
+ ex = new OperationNotExistException(__FILE__, __LINE__);
+ break;
+ }
+
+ default:
+ {
+ ex = 0; // To keep the compiler from complaining.
+ assert(false);
+ break;
+ }
+ }
+
+ ex->id = ident;
+ ex->facet = facet;
+ ex->operation = operation;
+ _exception = auto_ptr<LocalException>(ex);
+
+ _state = StateLocalException; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ case DispatchUnknownException:
+ case DispatchUnknownLocalException:
+ case DispatchUnknownUserException:
+ {
+ //
+ // Don't read the exception members directly into the
+ // exception. Otherwise if reading fails and raises an
+ // exception, you will have a memory leak.
+ //
+ string unknown;
+ _is.read(unknown);
+
+ UnknownException* ex;
+ switch(static_cast<DispatchStatus>(status))
+ {
+ case DispatchUnknownException:
+ {
+ ex = new UnknownException(__FILE__, __LINE__);
+ break;
+ }
+
+ case DispatchUnknownLocalException:
+ {
+ ex = new UnknownLocalException(__FILE__, __LINE__);
+ break;
+ }
+
+ case DispatchUnknownUserException:
+ {
+ ex = new UnknownUserException(__FILE__, __LINE__);
+ break;
+ }
+
+ default:
+ {
+ ex = 0; // To keep the compiler from complaining.
+ assert(false);
+ break;
+ }
+ }
+
+ ex->unknown = unknown;
+ _exception = auto_ptr<LocalException>(ex);
+
+ _state = StateLocalException; // The state must be set last, in case there is an exception.
+ break;
+ }
+
+ default:
+ {
+ _exception = auto_ptr<LocalException>(new UnknownReplyStatusException(__FILE__, __LINE__));
+ _state = StateLocalException;
+ break;
+ }
+ }
+
+ notify();
+}
+
+void
+IceEInternal::Outgoing::finished(const LocalException& ex)
+{
+ IceE::Monitor<IceE::Mutex>::Lock sync(*this);
+
+ assert(_reference->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
+
+ assert(_state <= StateInProgress);
+
+ _state = StateLocalException;
+ _exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone()));
+ notify();
+}