summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-10-29 20:59:10 +0000
committerMarc Laukien <marc@zeroc.com>2004-10-29 20:59:10 +0000
commit6e5575dec12605fdc76594994d7c92b81ec84f25 (patch)
tree3a5324be3d71ad2632a2e7668f832dbe6110e224 /cpp
parentfixed asserts ; removed _Ice_ prefix, otherwise the code would not compile (diff)
downloadice-6e5575dec12605fdc76594994d7c92b81ec84f25.tar.bz2
ice-6e5575dec12605fdc76594994d7c92b81ec84f25.tar.xz
ice-6e5575dec12605fdc76594994d7c92b81ec84f25.zip
more glacier2 work
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Glacier2/Blobject.cpp164
-rw-r--r--cpp/src/Glacier2/Blobject.h5
-rw-r--r--cpp/src/Glacier2/ClientBlobject.cpp2
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp151
-rw-r--r--cpp/src/Glacier2/RequestQueue.h23
-rw-r--r--cpp/src/Glacier2/SessionRouterI.cpp16
-rw-r--r--cpp/src/Glacier2/SessionRouterI.h2
7 files changed, 220 insertions, 143 deletions
diff --git a/cpp/src/Glacier2/Blobject.cpp b/cpp/src/Glacier2/Blobject.cpp
index e845444f1e4..9633e7903c2 100644
--- a/cpp/src/Glacier2/Blobject.cpp
+++ b/cpp/src/Glacier2/Blobject.cpp
@@ -13,24 +13,44 @@ using namespace std;
using namespace Ice;
using namespace Glacier2;
+static const string serverForwardContext = "Glacier2.Server.ForwardContext";
+static const string clientForwardContext = "Glacier2.Client.ForwardContext";
+static const string serverUnbuffered = "Glacier2.Server.Unbuffered";
+static const string clientUnbuffered = "Glacier2.Client.Unbuffered";
static const string serverAlwaysBatch = "Glacier2.Server.AlwaysBatch";
static const string clientAlwaysBatch = "Glacier2.Client.AlwaysBatch";
static const string serverThreadStackSize = "Glacier2.Server.ThreadStackSize";
static const string clientThreadStackSize = "Glacier2.Client.ThreadStackSize";
+static const string serverTraceRequest = "Glacier2.Server.Trace.Request";
+static const string clientTraceRequest = "Glacier2.Client.Trace.Request";
Glacier2::Blobject::Blobject(const CommunicatorPtr& communicator, bool reverse) :
- _properties(communicator->getProperties()),
- _logger(communicator->getLogger()),
- _alwaysBatch(reverse ?
+ _communicator(communicator),
+ _properties(_communicator->getProperties()),
+ _logger(_communicator->getLogger()),
+ _reverse(reverse),
+ _forwardContext(_reverse ?
+ _properties->getPropertyAsInt(serverForwardContext) > 0 :
+ _properties->getPropertyAsInt(clientForwardContext) > 0),
+ _unbuffered(_reverse ?
+ _properties->getPropertyAsInt(serverUnbuffered) > 0 :
+ _properties->getPropertyAsInt(clientUnbuffered) > 0),
+ _alwaysBatch(_reverse ?
_properties->getPropertyAsInt(serverAlwaysBatch) > 0 :
- _properties->getPropertyAsInt(clientAlwaysBatch) > 0)
+ _properties->getPropertyAsInt(clientAlwaysBatch) > 0),
+ _requestTraceLevel(_reverse ?
+ _properties->getPropertyAsInt(serverTraceRequest) :
+ _properties->getPropertyAsInt(clientTraceRequest))
{
- Int threadStackSize = reverse ?
- _properties->getPropertyAsInt(serverThreadStackSize) :
- _properties->getPropertyAsInt(clientThreadStackSize);
-
- _requestQueue = new RequestQueue(communicator, reverse);
- _requestQueue->start(static_cast<size_t>(threadStackSize));
+ if(!_unbuffered)
+ {
+ Int threadStackSize = _reverse ?
+ _properties->getPropertyAsInt(serverThreadStackSize) :
+ _properties->getPropertyAsInt(clientThreadStackSize);
+
+ _requestQueue = new RequestQueue(_communicator, _reverse);
+ _requestQueue->start(static_cast<size_t>(threadStackSize));
+ }
}
Glacier2::Blobject::~Blobject()
@@ -41,43 +61,17 @@ Glacier2::Blobject::~Blobject()
void
Glacier2::Blobject::destroy()
{
- assert(_requestQueue); // Destroyed?
- _requestQueue->destroy();
- _requestQueue = 0;
-}
-
-class Glacier2CB : public AMI_Object_ice_invoke
-{
-public:
-
- Glacier2CB(const AMD_Object_ice_invokePtr& cb) :
- _cb(cb)
+ if(_requestQueue)
{
+ _requestQueue->destroy();
+ _requestQueue = 0;
}
-
- virtual void
- ice_response(bool ok, const vector<Byte>& outParams)
- {
- _cb->ice_response(ok, outParams);
- }
-
- virtual void
- ice_exception(const Exception& ex)
- {
- _cb->ice_exception(ex);
- }
-
-private:
-
- AMD_Object_ice_invokePtr _cb;
-};
+}
void
Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amdCB, const vector<Byte>& inParams,
- const Current& current)
+ const Current& current)
{
- assert(_requestQueue); // Destroyed?
-
//
// Set the correct facet on the proxy.
//
@@ -105,7 +99,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd
case 'o':
{
- if(_alwaysBatch)
+ if(_alwaysBatch && !_unbuffered)
{
proxy = proxy->ice_batchOneway();
}
@@ -118,7 +112,7 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd
case 'd':
{
- if(_alwaysBatch)
+ if(_alwaysBatch && !_unbuffered)
{
proxy = proxy->ice_batchDatagram();
}
@@ -131,13 +125,27 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd
case 'O':
{
- proxy = proxy->ice_batchOneway();
+ if(!_unbuffered)
+ {
+ proxy = proxy->ice_batchOneway();
+ }
+ else
+ {
+ proxy = proxy->ice_oneway();
+ }
break;
}
case 'D':
{
- proxy = proxy->ice_batchDatagram();
+ if(!_unbuffered)
+ {
+ proxy = proxy->ice_batchDatagram();
+ }
+ else
+ {
+ proxy = proxy->ice_datagram();
+ }
break;
}
@@ -163,18 +171,64 @@ Glacier2::Blobject::invoke(ObjectPrx& proxy, const AMD_Object_ice_invokePtr& amd
}
}
- //
- // Create a new request and add it to the request queue.
- //
- if(proxy->ice_isTwoway())
+ if(_unbuffered)
{
- AMI_Object_ice_invokePtr amiCB = new Glacier2CB(amdCB);
- _requestQueue->addRequest(new Request(proxy, inParams, current, amiCB));
+ //
+ // If we are in unbuffered mode, we sent the request directly.
+ //
+
+ if(_requestTraceLevel >= 1)
+ {
+ Trace out(_logger, "Glacier2");
+ if(_reverse)
+ {
+ out << "reverse ";
+ }
+ out << "routing (unbuffered)";
+ out << "\nproxy = " << _communicator->proxyToString(proxy);
+ out << "\noperation = " << current.operation;
+ out << "\ncontext = ";
+ Context::const_iterator q = current.ctx.begin();
+ while(q != current.ctx.end())
+ {
+ out << q->first << '/' << q->second;
+ if(++q != current.ctx.end())
+ {
+ out << ", ";
+ }
+ }
+ }
+
+ bool ok;
+ vector<Byte> outParams;
+
+ try
+ {
+ if(_forwardContext)
+ {
+ ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams, current.ctx);
+ }
+ else
+ {
+ ok = proxy->ice_invoke(current.operation, current.mode, inParams, outParams);
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ amdCB->ice_exception(ex);
+ return;
+ }
+
+ amdCB->ice_response(ok, outParams);
}
else
- {
- vector<Byte> dummy;
- amdCB->ice_response(true, dummy);
- _requestQueue->addRequest(new Request(proxy, inParams, current, 0));
+ {
+ //
+ // If we are not in unbuffered mode, we create a new request
+ // and add it to the request queue. If the request is twoway,
+ // we use AMI.
+ //
+
+ _requestQueue->addRequest(new Request(proxy, inParams, current, _forwardContext, amdCB));
}
}
diff --git a/cpp/src/Glacier2/Blobject.h b/cpp/src/Glacier2/Blobject.h
index 5895b2fad06..004cfaf1be5 100644
--- a/cpp/src/Glacier2/Blobject.h
+++ b/cpp/src/Glacier2/Blobject.h
@@ -30,12 +30,17 @@ protected:
void invoke(Ice::ObjectPrx&, const Ice::AMD_Object_ice_invokePtr&, const std::vector<Ice::Byte>&,
const Ice::Current&);
+ const Ice::CommunicatorPtr _communicator;
const Ice::PropertiesPtr _properties;
const Ice::LoggerPtr _logger;
private:
+ const bool _reverse;
+ const bool _forwardContext;
+ const bool _unbuffered;
const bool _alwaysBatch;
+ const int _requestTraceLevel;
RequestQueuePtr _requestQueue;
};
diff --git a/cpp/src/Glacier2/ClientBlobject.cpp b/cpp/src/Glacier2/ClientBlobject.cpp
index 30fe28b30f1..67c4a75a563 100644
--- a/cpp/src/Glacier2/ClientBlobject.cpp
+++ b/cpp/src/Glacier2/ClientBlobject.cpp
@@ -67,7 +67,7 @@ Glacier2::ClientBlobject::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&
{
if(_rejectTraceLevel >= 1)
{
- Trace out(current.adapter->getCommunicator()->getLogger(), "Glacier2");
+ Trace out(_logger, "Glacier2");
out << "rejecting request\n";
out << "identity: " << identityToString(current.id);
}
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index f7912fa3414..1b2dceac9e2 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -14,53 +14,84 @@ using namespace std;
using namespace Ice;
using namespace Glacier2;
+class Glacier2CB : public AMI_Object_ice_invoke
+{
+public:
+
+ Glacier2CB(const AMD_Object_ice_invokePtr& cb) :
+ _cb(cb)
+ {
+ }
+
+ virtual void
+ ice_response(bool ok, const vector<Byte>& outParams)
+ {
+ _cb->ice_response(ok, outParams);
+ }
+
+ virtual void
+ ice_exception(const Exception& ex)
+ {
+ _cb->ice_exception(ex);
+ }
+
+private:
+
+ AMD_Object_ice_invokePtr _cb;
+};
+
Glacier2::Request::Request(const ObjectPrx& proxy, const vector<Byte>& inParams, const Current& current,
- const AMI_Object_ice_invokePtr& amiCB) :
+ bool forwardContext, const AMD_Object_ice_invokePtr& amdCB) :
_proxy(proxy),
_inParams(inParams),
_current(current),
- _amiCB(amiCB)
+ _forwardContext(forwardContext),
+ _amiCB(new Glacier2CB(amdCB))
{
Context::const_iterator p = current.ctx.find("_ovrd");
if(p != current.ctx.end())
{
- _override = p->second;
+ const_cast<string&>(_override) = p->second;
}
}
void
-Glacier2::Request::invoke(bool forwardContext)
+Glacier2::Request::invoke()
{
if(_proxy->ice_isTwoway())
{
- assert(_amiCB);
- try
+ if(_forwardContext)
{
- if(forwardContext)
- {
- _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx);
- }
- else
- {
- _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams);
- }
+ _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams, _current.ctx);
}
- catch(const Ice::Exception& ex)
+ else
{
- _amiCB->ice_exception(ex);
+ _proxy->ice_invoke_async(_amiCB, _current.operation, _current.mode, _inParams);
}
}
else
{
- vector<Byte> dummy;
- if(forwardContext)
+ bool ok;
+ vector<Byte> outParams;
+
+ try
{
- _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy, _current.ctx);
+ if(_forwardContext)
+ {
+ ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams, _current.ctx);
+ }
+ else
+ {
+ ok = _proxy->ice_invoke(_current.operation, _current.mode, _inParams, outParams);
+ }
}
- else
+ catch(const LocalException& ex)
{
- _proxy->ice_invoke(_current.operation, _current.mode, _inParams, dummy);
+ _amiCB->ice_exception(ex);
+ return;
}
+
+ _amiCB->ice_response(ok, outParams);
}
}
@@ -107,30 +138,25 @@ Glacier2::Request::getCurrent() const
return _current;
}
+static const string serverSleepTime = "Glacier2.Server.SleepTime";
+static const string clientSleepTime = "Glacier2.Client.SleepTime";
static const string serverTraceRequest = "Glacier2.Server.Trace.Request";
static const string clientTraceRequest = "Glacier2.Client.Trace.Request";
static const string serverTraceOverride = "Glacier2.Server.Trace.Override";
static const string clientTraceOverride = "Glacier2.Client.Trace.Override";
-static const string serverForwardContext = "Glacier2.Server.ForwardContext";
-static const string clientForwardContext = "Glacier2.Client.ForwardContext";
-static const string serverSleepTime = "Glacier2.Server.SleepTime";
-static const string clientSleepTime = "Glacier2.Client.SleepTime";
Glacier2::RequestQueue::RequestQueue(const Ice::CommunicatorPtr& communicator, bool reverse) :
_logger(communicator->getLogger()),
_reverse(reverse),
- _traceLevelRequest(reverse ?
+ _sleepTime(reverse ?
+ IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) :
+ IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))),
+ _requestTraceLevel(_reverse ?
communicator->getProperties()->getPropertyAsInt(serverTraceRequest) :
communicator->getProperties()->getPropertyAsInt(clientTraceRequest)),
- _traceLevelOverride(reverse ?
+ _overrideTraceLevel(reverse ?
communicator->getProperties()->getPropertyAsInt(serverTraceOverride) :
communicator->getProperties()->getPropertyAsInt(clientTraceOverride)),
- _forwardContext(reverse ?
- communicator->getProperties()->getPropertyAsInt(serverForwardContext) > 0 :
- communicator->getProperties()->getPropertyAsInt(clientForwardContext) > 0),
- _sleepTime(reverse ?
- IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(serverSleepTime)) :
- IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsInt(clientSleepTime))),
_destroy(false)
{
}
@@ -179,7 +205,7 @@ Glacier2::RequestQueue::addRequest(const RequestPtr& request)
{
*p = request; // Replace old request if this is an override.
- if(_traceLevelOverride >= 1)
+ if(_overrideTraceLevel >= 1)
{
traceRequest(request, "override");
}
@@ -225,43 +251,36 @@ Glacier2::RequestQueue::run()
// while this is being done.
//
- try
- {
- set<ConnectionPtr> flushSet;
-
- for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
+ set<ConnectionPtr> flushSet;
+
+ for(vector<RequestPtr>::const_iterator p = requests.begin(); p != requests.end(); ++p)
+ {
+ const ObjectPrx& proxy = (*p)->getProxy();
+
+ if(proxy->ice_batchOneway() || proxy->ice_batchDatagram())
{
- const ObjectPrx& proxy = (*p)->getProxy();
-
- if(proxy->ice_batchOneway() || proxy->ice_batchDatagram())
- {
- flushSet.insert(proxy->ice_connection());
- }
-
- if(_traceLevelRequest >= 1)
- {
- traceRequest(*p, "");
- }
-
- (*p)->invoke(_forwardContext);
+ flushSet.insert(proxy->ice_connection());
}
-
- for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests));
+
+ if(_requestTraceLevel >= 1)
+ {
+ traceRequest(*p, "");
+ }
+
+ (*p)->invoke();
}
- catch(const Ice::Exception& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
-
- if(_traceLevelRequest >= 1)
+
+ for(set<ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
+ {
+ try
{
- Trace out(_logger, "Glacier2");
- if(_reverse)
- {
- out << "reverse ";
- }
- out << "routing exception:\n" << ex;
+ for_each(flushSet.begin(), flushSet.end(), Ice::voidMemFun(&Connection::flushBatchRequests));
}
- }
+ catch(const LocalException& ex)
+ {
+ // Ignore.
+ }
+ }
//
// In order to avoid flooding, we add a delay, if so
diff --git a/cpp/src/Glacier2/RequestQueue.h b/cpp/src/Glacier2/RequestQueue.h
index c274ad5a9ef..512fbba68ba 100644
--- a/cpp/src/Glacier2/RequestQueue.h
+++ b/cpp/src/Glacier2/RequestQueue.h
@@ -24,22 +24,22 @@ class Request : virtual public IceUtil::Shared
{
public:
- Request(const Ice::ObjectPrx&, const std::vector<Ice::Byte>&, const Ice::Current&,
- const Ice::AMI_Object_ice_invokePtr&);
+ Request(const Ice::ObjectPrx&, const std::vector<Ice::Byte>&, const Ice::Current&, bool,
+ const Ice::AMD_Object_ice_invokePtr&);
- void invoke(bool);
+ void invoke();
bool override(const RequestPtr&) const;
const Ice::ObjectPrx& getProxy() const;
const Ice::Current& getCurrent() const;
private:
- Ice::ObjectPrx _proxy;
- std::vector<Ice::Byte> _inParams;
- Ice::Current _current;
- bool _forwardContext;
- Ice::AMI_Object_ice_invokePtr _amiCB;
- std::string _override;
+ const Ice::ObjectPrx _proxy;
+ const std::vector<Ice::Byte> _inParams;
+ const Ice::Current _current;
+ const bool _forwardContext;
+ const Ice::AMI_Object_ice_invokePtr _amiCB;
+ const std::string _override;
};
class RequestQueue;
@@ -63,10 +63,9 @@ private:
const Ice::LoggerPtr _logger;
const bool _reverse;
- const int _traceLevelRequest;
- const int _traceLevelOverride;
- const bool _forwardContext;
const IceUtil::Time _sleepTime;
+ const int _requestTraceLevel;
+ const int _overrideTraceLevel;
std::vector<RequestPtr> _requests;
diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp
index adf196fd9c3..5e344c94510 100644
--- a/cpp/src/Glacier2/SessionRouterI.cpp
+++ b/cpp/src/Glacier2/SessionRouterI.cpp
@@ -82,7 +82,7 @@ Glacier2::SessionRouterI::SessionRouterI(const ObjectAdapterPtr& clientAdapter,
const SessionManagerPrx& sessionManager) :
_properties(clientAdapter->getCommunicator()->getProperties()),
_logger(clientAdapter->getCommunicator()->getLogger()),
- _traceLevel(_properties->getPropertyAsInt("Glacier2.Trace.Session")),
+ _sessionTraceLevel(_properties->getPropertyAsInt("Glacier2.Trace.Session")),
_clientAdapter(clientAdapter),
_serverAdapter(serverAdapter),
_verifier(verifier),
@@ -173,7 +173,7 @@ Glacier2::SessionRouterI::destroy()
}
catch(const Exception& ex)
{
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "exception while destroying session\n" << ex;
@@ -304,7 +304,7 @@ Glacier2::SessionRouterI::createSession(const std::string& userId, const std::st
}
catch(const Exception& ex)
{
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "exception while verifying password\n" << ex;
@@ -419,7 +419,7 @@ Glacier2::SessionRouterI::createSession(const std::string& userId, const std::st
_routersByCategoryHint, pair<const string, RouterIPtr>(category, router));
}
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "created session\n";
@@ -480,7 +480,7 @@ Glacier2::SessionRouterI::destroySession(const Current& current)
//
try
{
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "destroying session\n";
@@ -491,7 +491,7 @@ Glacier2::SessionRouterI::destroySession(const Current& current)
}
catch(const Exception& ex)
{
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "exception while destroying session\n" << ex;
@@ -612,7 +612,7 @@ Glacier2::SessionRouterI::expireSessions()
try
{
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "expiring session\n";
@@ -623,7 +623,7 @@ Glacier2::SessionRouterI::expireSessions()
}
catch(const Exception& ex)
{
- if(_traceLevel >= 1)
+ if(_sessionTraceLevel >= 1)
{
Trace out(_logger, "Glacier2");
out << "exception while expiring session\n" << ex;
diff --git a/cpp/src/Glacier2/SessionRouterI.h b/cpp/src/Glacier2/SessionRouterI.h
index daf985cdbe4..f38d711736d 100644
--- a/cpp/src/Glacier2/SessionRouterI.h
+++ b/cpp/src/Glacier2/SessionRouterI.h
@@ -50,7 +50,7 @@ private:
const Ice::PropertiesPtr _properties;
const Ice::LoggerPtr _logger;
- const int _traceLevel;
+ const int _sessionTraceLevel;
const Ice::ObjectAdapterPtr _clientAdapter;
const Ice::ObjectAdapterPtr _serverAdapter;
const PermissionsVerifierPrx _verifier;