summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2002-05-30 12:26:45 +0000
committerMarc Laukien <marc@zeroc.com>2002-05-30 12:26:45 +0000
commit89e12c32672600522be7e91bbf01c0cee09e7692 (patch)
treebe0909f9805e7da1fb3dad19e68cfc9820ece56d /cpp
parentfixes (diff)
downloadice-89e12c32672600522be7e91bbf01c0cee09e7692.tar.bz2
ice-89e12c32672600522be7e91bbf01c0cee09e7692.tar.xz
ice-89e12c32672600522be7e91bbf01c0cee09e7692.zip
started with missives
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Glacier/Blobject.cpp136
-rw-r--r--cpp/src/Glacier/Blobject.h46
-rw-r--r--cpp/src/Glacier/ClientBlobject.cpp67
-rw-r--r--cpp/src/Glacier/ClientBlobject.h9
-rw-r--r--cpp/src/Glacier/Makefile4
-rw-r--r--cpp/src/Glacier/Missive.cpp168
-rw-r--r--cpp/src/Glacier/Missive.h66
-rw-r--r--cpp/src/Glacier/ServerBlobject.cpp69
-rw-r--r--cpp/src/Glacier/ServerBlobject.h7
-rw-r--r--cpp/src/Glacier/StarterI.cpp2
10 files changed, 442 insertions, 132 deletions
diff --git a/cpp/src/Glacier/Blobject.cpp b/cpp/src/Glacier/Blobject.cpp
new file mode 100644
index 00000000000..bbbfdfbc365
--- /dev/null
+++ b/cpp/src/Glacier/Blobject.cpp
@@ -0,0 +1,136 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/RoutingTable.h>
+#include <Glacier/Blobject.h>
+
+using namespace std;
+using namespace Ice;
+using namespace Glacier;
+
+Glacier::Blobject::Blobject(const CommunicatorPtr& communicator) :
+ _communicator(communicator),
+ _logger(_communicator->getLogger())
+{
+}
+
+Glacier::Blobject::~Blobject()
+{
+ assert(!_communicator);
+ assert(!_missiveQueue);
+}
+
+void
+Glacier::Blobject::destroy()
+{
+ //
+ // No mutex protection necessary, destroy is only called after all
+ // object adapters have shut down.
+ //
+ _communicator = 0;
+ _logger = 0;
+
+ {
+ IceUtil::Mutex::Lock lock(_missiveQueueMutex);
+ if (_missiveQueue)
+ {
+ _missiveQueue->destroy();
+ _missiveQueueControl.join();
+ _missiveQueue = 0;
+ }
+ }
+}
+
+
+MissiveQueuePtr
+Glacier::Blobject::modifyProxy(ObjectPrx& proxy, const Current& current)
+{
+ if (!current.facet.empty())
+ {
+ proxy = proxy->ice_newFacet(current.facet);
+ }
+
+ MissiveQueuePtr missiveQueue;
+ Context::const_iterator p = current.context.find("_fwd");
+ if (p != current.context.end())
+ {
+ for (unsigned int i = 0; i < p->second.length(); ++i)
+ {
+ char option = p->second[i];
+ switch (option)
+ {
+ case 't':
+ {
+ proxy = proxy->ice_twoway();
+ missiveQueue = 0;
+ break;
+ }
+
+ case 'o':
+ {
+ proxy = proxy->ice_oneway();
+ missiveQueue = 0;
+ break;
+ }
+
+ case 'd':
+ {
+ proxy = proxy->ice_datagram();
+ missiveQueue = 0;
+ break;
+ }
+
+ case 'O':
+ {
+ proxy = proxy->ice_batchOneway();
+ missiveQueue = getMissiveQueue();
+ break;
+ }
+
+ case 'D':
+ {
+ proxy = proxy->ice_batchDatagram();
+ missiveQueue = getMissiveQueue();
+ break;
+ }
+
+ case 's':
+ {
+ proxy = proxy->ice_secure(true);
+ break;
+ }
+
+ default:
+ {
+ Warning out(_logger);
+ out << "unknown forward option `" << option << "'";
+ break;
+ }
+ }
+ }
+ }
+
+ return missiveQueue;
+}
+
+MissiveQueuePtr
+Glacier::Blobject::getMissiveQueue()
+{
+ //
+ // Lazy missive queue initialization.
+ //
+ IceUtil::Mutex::Lock lock(_missiveQueueMutex);
+ if (!_missiveQueue)
+ {
+ _missiveQueue = new MissiveQueue;
+ _missiveQueueControl = _missiveQueue->start();
+ }
+ return _missiveQueue;
+}
diff --git a/cpp/src/Glacier/Blobject.h b/cpp/src/Glacier/Blobject.h
new file mode 100644
index 00000000000..836db558d0f
--- /dev/null
+++ b/cpp/src/Glacier/Blobject.h
@@ -0,0 +1,46 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef BLOBJECT_H
+#define BLOBJECT_H
+
+#include <Ice/Ice.h>
+#include <Glacier/Missive.h>
+
+namespace Glacier
+{
+
+class Blobject : public Ice::Blobject
+{
+public:
+
+ Blobject(const Ice::CommunicatorPtr&);
+ virtual ~Blobject();
+
+ void destroy();
+ virtual MissiveQueuePtr modifyProxy(Ice::ObjectPrx&, const Ice::Current&);
+
+protected:
+
+ Ice::CommunicatorPtr _communicator;
+ Ice::LoggerPtr _logger;
+
+private:
+
+ MissiveQueuePtr getMissiveQueue();
+
+ MissiveQueuePtr _missiveQueue;
+ IceUtil::ThreadControl _missiveQueueControl;
+ IceUtil::Mutex _missiveQueueMutex;
+};
+
+}
+
+#endif
diff --git a/cpp/src/Glacier/ClientBlobject.cpp b/cpp/src/Glacier/ClientBlobject.cpp
index 6821a2bfd69..d9df798fc38 100644
--- a/cpp/src/Glacier/ClientBlobject.cpp
+++ b/cpp/src/Glacier/ClientBlobject.cpp
@@ -20,8 +20,7 @@ using namespace Glacier;
Glacier::ClientBlobject::ClientBlobject(const CommunicatorPtr& communicator,
const IceInternal::RoutingTablePtr& routingTable,
const string& allowCategories) :
- _communicator(communicator),
- _logger(_communicator->getLogger()),
+ Glacier::Blobject(communicator),
_routingTable(routingTable)
{
PropertiesPtr properties = _communicator->getProperties();
@@ -41,11 +40,6 @@ Glacier::ClientBlobject::ClientBlobject(const CommunicatorPtr& communicator,
_allowCategories.erase(unique(_allowCategories.begin(), _allowCategories.end()), _allowCategories.end());
}
-Glacier::ClientBlobject::~ClientBlobject()
-{
- assert(!_communicator);
-}
-
void
Glacier::ClientBlobject::destroy()
{
@@ -53,14 +47,12 @@ Glacier::ClientBlobject::destroy()
// No mutex protection necessary, destroy is only called after all
// object adapters have shut down.
//
- _communicator = 0;
- _logger = 0;
_routingTable = 0;
+ Glacier::Blobject::destroy();
}
bool
-Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vector<Byte>& outParams,
- const Current& current)
+Glacier::ClientBlobject::ice_invoke(const vector<Byte>& inParams, vector<Byte>& outParams, const Current& current)
{
assert(_communicator); // Destroyed?
@@ -71,7 +63,7 @@ Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect
{
if (!binary_search(_allowCategories.begin(), _allowCategories.end(), current.identity.category))
{
- if (_traceLevel > 0)
+ if (_traceLevel >= 1)
{
Trace out(_logger, "Glacier");
out << "rejecting request\n";
@@ -93,53 +85,9 @@ Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect
ex.identity = current.identity;
throw ex;
}
-
- if (!current.facet.empty())
- {
- proxy = proxy->ice_newFacet(current.facet);
- }
- Context::const_iterator p = current.context.find("_fwd");
- if (p != current.context.end())
- {
- for (unsigned int i = 0; i < p->second.length(); ++i)
- {
- char option = p->second[i];
- switch (option)
- {
- case 't':
- {
- proxy = proxy->ice_twoway();
- break;
- }
-
- case 'o':
- {
- proxy = proxy->ice_oneway();
- break;
- }
-
- case 'd':
- {
- proxy = proxy->ice_datagram();
- break;
- }
-
- case 's':
- {
- proxy = proxy->ice_secure(true);
- break;
- }
-
- default:
- {
- Warning out(_logger);
- out << "unknown forward option `" << option << "'";
- break;
- }
- }
- }
- }
+ MissiveQueuePtr missiveQueue = modifyProxy(proxy, current);
+ assert(!missiveQueue);
if (_traceLevel >= 2)
{
@@ -150,11 +98,12 @@ Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect
<< "nonmutating = " << (current.nonmutating ? "true" : "false");
}
+ // TODO: Should we forward the context? Perhaps a config parameter?
return proxy->ice_invoke(current.operation, current.nonmutating, inParams, outParams, current.context);
}
catch (const Exception& ex)
{
- if (_traceLevel)
+ if (_traceLevel >= 1)
{
Trace out(_logger, "Glacier");
out << "routing exception:\n" << ex;
diff --git a/cpp/src/Glacier/ClientBlobject.h b/cpp/src/Glacier/ClientBlobject.h
index 8ca92e91362..2242f564720 100644
--- a/cpp/src/Glacier/ClientBlobject.h
+++ b/cpp/src/Glacier/ClientBlobject.h
@@ -12,27 +12,22 @@
#define CLIENT_BLOBJECT_H
#include <Ice/RoutingTableF.h>
-#include <Ice/Ice.h>
-
-#include <set>
+#include <Glacier/Blobject.h>
namespace Glacier
{
-class ClientBlobject : public Ice::Blobject
+class ClientBlobject : public Glacier::Blobject
{
public:
ClientBlobject(const Ice::CommunicatorPtr&, const IceInternal::RoutingTablePtr&, const std::string&);
- virtual ~ClientBlobject();
void destroy();
virtual bool ice_invoke(const std::vector<Ice::Byte>&, std::vector<Ice::Byte>&, const Ice::Current&);
private:
- Ice::CommunicatorPtr _communicator;
- Ice::LoggerPtr _logger;
int _traceLevel;
IceInternal::RoutingTablePtr _routingTable;
std::vector<std::string> _allowCategories;
diff --git a/cpp/src/Glacier/Makefile b/cpp/src/Glacier/Makefile
index 952b8f43fea..3714cb01ced 100644
--- a/cpp/src/Glacier/Makefile
+++ b/cpp/src/Glacier/Makefile
@@ -28,8 +28,10 @@ OBJS = Starter.o \
ROBJS = GlacierRouter.o \
RouterI.o \
+ Blobject.o \
ClientBlobject.o \
- ServerBlobject.o
+ ServerBlobject.o \
+ Missive.o
SOBJS = GlacierStarter.o \
StarterI.o
diff --git a/cpp/src/Glacier/Missive.cpp b/cpp/src/Glacier/Missive.cpp
new file mode 100644
index 00000000000..6faedc57426
--- /dev/null
+++ b/cpp/src/Glacier/Missive.cpp
@@ -0,0 +1,168 @@
+// **********************************************************************
+//
+// Copyright (c) 2002
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Glacier/Missive.h>
+
+using namespace std;
+using namespace Ice;
+using namespace Glacier;
+
+Glacier::Missive::Missive(const ObjectPrx& proxy, const vector<Byte>& inParams, const Current& current) :
+ _proxy(proxy),
+ _inParams(inParams),
+ _current(current)
+{
+}
+
+ObjectPrx
+Glacier::Missive::invoke()
+{
+ // TODO: Should we forward the context? Perhaps a config parameter?
+ std::vector<Byte> dummy;
+ _proxy->ice_invoke(_current.operation, _current.nonmutating, _inParams, dummy, _current.context);
+ return _proxy;
+}
+
+bool
+Glacier::Missive::override(const MissivePtr& missive)
+{
+ return false;
+}
+
+Glacier::MissiveQueue::MissiveQueue() :
+ _destroy(false)
+{
+}
+
+Glacier::MissiveQueue::~MissiveQueue()
+{
+ assert(_destroy);
+ assert(_missives.empty());
+}
+
+void
+Glacier::MissiveQueue::destroy()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+
+ _destroy = true;
+ _missives.clear();
+
+ notify();
+}
+
+void
+Glacier::MissiveQueue::add(const MissivePtr& missive)
+{
+ assert(missive);
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+
+ assert(!_destroy);
+
+ if (_missives.empty())
+ {
+ notify();
+ }
+
+ for (std::vector<MissivePtr>::iterator p = _missives.begin(); p != _missives.end(); ++p)
+ {
+ if (missive->override(*p))
+ {
+ *p = missive; // Replace old missive if this is an override.
+ return;
+ }
+ }
+
+ _missives.push_back(missive); // No override, add new missive.
+}
+
+void
+Glacier::MissiveQueue::run()
+{
+ while (true)
+ {
+ vector<ObjectPrx> proxies;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+
+ while (!_destroy && _missives.empty())
+ {
+ wait();
+ }
+
+ if (_destroy)
+ {
+ return;
+ }
+
+ proxies.reserve(_missives.size());
+ vector<MissivePtr>::const_iterator p;
+ for (p = _missives.begin(); p != _missives.end(); ++p)
+ {
+ try
+ {
+ proxies.push_back((*p)->invoke());
+ }
+ catch (const Ice::Exception& ex)
+ {
+ //
+ // Remember exception and destroy the missive queue.
+ //
+ _destroy = true;
+ _missives.clear();
+ _exception = std::auto_ptr<Ice::Exception>(ex.ice_clone());
+ return;
+ }
+ _missives.clear();
+ }
+ }
+
+ //
+ // Flush and sleep outside the thread synchronization, so that
+ // new messages can be added to this missive queue while this
+ // thread sends a batch and sleeps.
+ //
+ try
+ {
+ //
+ // This sends all batched missives.
+ //
+ sort(proxies.begin(), proxies.end());
+ proxies.erase(unique(proxies.begin(), proxies.end()), proxies.end());
+ vector<ObjectPrx>::const_iterator p;
+ for (p = proxies.begin(); p != proxies.end(); ++p)
+ {
+ (*p)->ice_flush();
+ }
+
+ //
+ // In order to avoid flooding the missive receivers, we add
+ // a delay between sending missives.
+ //
+ // TODO: Configurable.
+ //
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(250));
+ }
+ catch (const Ice::Exception& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this);
+
+ //
+ // Remember exception and destroy the missive queue.
+ //
+ _destroy = true;
+ _missives.clear();
+ _exception = std::auto_ptr<Ice::Exception>(ex.ice_clone());
+ return;
+ }
+ }
+}
diff --git a/cpp/src/Glacier/Missive.h b/cpp/src/Glacier/Missive.h
new file mode 100644
index 00000000000..ba72fa26aac
--- /dev/null
+++ b/cpp/src/Glacier/Missive.h
@@ -0,0 +1,66 @@
+// **********************************************************************
+//
+// Copyright (c) 2002
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef MISSIVE_H
+#define MISSIVE_H
+
+#include <Ice/Ice.h>
+#include <IceUtil/Thread.h>
+#include <IceUtil/Monitor.h>
+
+namespace Glacier
+{
+
+class Missive;
+typedef IceUtil::Handle<Missive> MissivePtr;
+
+class Missive : virtual public IceUtil::Shared
+{
+public:
+
+ Missive(const Ice::ObjectPrx&, const std::vector<Ice::Byte>&, const Ice::Current&);
+
+ virtual Ice::ObjectPrx invoke();
+ virtual bool override(const MissivePtr&);
+
+private:
+
+ const Ice::ObjectPrx _proxy;
+ const std::vector<Ice::Byte>& _inParams;
+ Ice::Current _current;
+};
+
+class MissiveQueue;
+typedef IceUtil::Handle<MissiveQueue> MissiveQueuePtr;
+
+class MissiveQueue : virtual public IceUtil::Thread, IceUtil::Monitor<IceUtil::Mutex>
+{
+public:
+
+ MissiveQueue();
+ virtual ~MissiveQueue();
+
+ void destroy();
+ void add(const MissivePtr&);
+
+ virtual void run();
+
+protected:
+
+ Ice::ObjectPrx _proxy;
+ std::vector<MissivePtr> _missives;
+ std::auto_ptr<Ice::Exception> _exception;
+ bool _destroy;
+};
+
+};
+
+#endif
+
diff --git a/cpp/src/Glacier/ServerBlobject.cpp b/cpp/src/Glacier/ServerBlobject.cpp
index d2ad46a113e..6f9c0f74064 100644
--- a/cpp/src/Glacier/ServerBlobject.cpp
+++ b/cpp/src/Glacier/ServerBlobject.cpp
@@ -16,18 +16,13 @@ using namespace Ice;
using namespace Glacier;
Glacier::ServerBlobject::ServerBlobject(const ObjectAdapterPtr& clientAdapter) :
- _clientAdapter(clientAdapter),
- _logger(_clientAdapter->getCommunicator()->getLogger())
+ Glacier::Blobject(_clientAdapter->getCommunicator()),
+ _clientAdapter(clientAdapter)
{
- PropertiesPtr properties = _clientAdapter->getCommunicator()->getProperties();
+ PropertiesPtr properties = _communicator->getProperties();
_traceLevel = properties->getPropertyAsInt("Glacier.Router.Trace.Server");
}
-Glacier::ServerBlobject::~ServerBlobject()
-{
- assert(!_clientAdapter);
-}
-
void
Glacier::ServerBlobject::destroy()
{
@@ -36,12 +31,11 @@ Glacier::ServerBlobject::destroy()
// object adapters have shut down.
//
_clientAdapter = 0;
- _logger = 0;
+ Glacier::Blobject::destroy();
}
bool
-Glacier::ServerBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vector<Byte>& outParams,
- const Current& current)
+Glacier::ServerBlobject::ice_invoke(const vector<Byte>& inParams, vector<Byte>& outParams, const Current& current)
{
assert(_clientAdapter); // Destroyed?
@@ -50,67 +44,24 @@ Glacier::ServerBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect
ObjectPrx proxy = _clientAdapter->createReverseProxy(current.identity);
assert(proxy);
- if (!current.facet.empty())
- {
- proxy = proxy->ice_newFacet(current.facet);
- }
-
- Context::const_iterator p = current.context.find("_fwd");
- if (p != current.context.end())
- {
- for (unsigned int i = 0; i < p->second.length(); ++i)
- {
- char option = p->second[i];
- switch (option)
- {
- case 't':
- {
- proxy = proxy->ice_twoway();
- break;
- }
-
- case 'o':
- {
- proxy = proxy->ice_oneway();
- break;
- }
-
- case 'd':
- {
- proxy = proxy->ice_datagram();
- break;
- }
-
- case 's':
- {
- proxy = proxy->ice_secure(true);
- break;
- }
-
- default:
- {
- Warning out(_logger);
- out << "unknown forward option `" << option << "'";
- break;
- }
- }
- }
- }
+ MissiveQueuePtr missiveQueue = modifyProxy(proxy, current);
+ assert(!missiveQueue);
if (_traceLevel >= 2)
{
Trace out(_logger, "Glacier");
out << "reverse routing to:\n"
- << "proxy = " << _clientAdapter->getCommunicator()->proxyToString(proxy) << '\n'
+ << "proxy = " << _communicator->proxyToString(proxy) << '\n'
<< "operation = " << current.operation << '\n'
<< "nonmutating = " << (current.nonmutating ? "true" : "false");
}
+ // TODO: Should we forward the context? Perhaps a config parameter?
return proxy->ice_invoke(current.operation, current.nonmutating, inParams, outParams, current.context);
}
catch (const Exception& ex)
{
- if (_traceLevel)
+ if (_traceLevel >= 1)
{
Trace out(_logger, "Glacier");
out << "reverse routing exception:\n" << ex;
diff --git a/cpp/src/Glacier/ServerBlobject.h b/cpp/src/Glacier/ServerBlobject.h
index 11aa642a60b..7b800b4059f 100644
--- a/cpp/src/Glacier/ServerBlobject.h
+++ b/cpp/src/Glacier/ServerBlobject.h
@@ -11,18 +11,16 @@
#ifndef SERVER_BLOBJECT_H
#define SERVER_BLOBJECT_H
-#include <Ice/RoutingTableF.h>
-#include <Ice/Ice.h>
+#include <Glacier/Blobject.h>
namespace Glacier
{
-class ServerBlobject : public Ice::Blobject
+class ServerBlobject : public Glacier::Blobject
{
public:
ServerBlobject(const Ice::ObjectAdapterPtr&);
- virtual ~ServerBlobject();
void destroy();
virtual bool ice_invoke(const std::vector<Ice::Byte>&, std::vector<Ice::Byte>&, const Ice::Current&);
@@ -30,7 +28,6 @@ public:
private:
Ice::ObjectAdapterPtr _clientAdapter;
- Ice::LoggerPtr _logger;
int _traceLevel;
};
diff --git a/cpp/src/Glacier/StarterI.cpp b/cpp/src/Glacier/StarterI.cpp
index 2200c0f4f76..a8eef0ca0f8 100644
--- a/cpp/src/Glacier/StarterI.cpp
+++ b/cpp/src/Glacier/StarterI.cpp
@@ -97,7 +97,7 @@ Glacier::StarterI::startRouter(const string& userId, const string& password, Byt
//
// The value of clientCertificateBase64 should be passed in to the router
// in the property
- // * Glacier.Router.ClientCertificate
+ // * Glacier.Router.AcceptCert
//
string routerPrivateKeyBase64;
string routerCertificateBase64;