diff options
author | Marc Laukien <marc@zeroc.com> | 2002-05-30 12:26:45 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2002-05-30 12:26:45 +0000 |
commit | 89e12c32672600522be7e91bbf01c0cee09e7692 (patch) | |
tree | be0909f9805e7da1fb3dad19e68cfc9820ece56d /cpp/src | |
parent | fixes (diff) | |
download | ice-89e12c32672600522be7e91bbf01c0cee09e7692.tar.bz2 ice-89e12c32672600522be7e91bbf01c0cee09e7692.tar.xz ice-89e12c32672600522be7e91bbf01c0cee09e7692.zip |
started with missives
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Glacier/Blobject.cpp | 136 | ||||
-rw-r--r-- | cpp/src/Glacier/Blobject.h | 46 | ||||
-rw-r--r-- | cpp/src/Glacier/ClientBlobject.cpp | 67 | ||||
-rw-r--r-- | cpp/src/Glacier/ClientBlobject.h | 9 | ||||
-rw-r--r-- | cpp/src/Glacier/Makefile | 4 | ||||
-rw-r--r-- | cpp/src/Glacier/Missive.cpp | 168 | ||||
-rw-r--r-- | cpp/src/Glacier/Missive.h | 66 | ||||
-rw-r--r-- | cpp/src/Glacier/ServerBlobject.cpp | 69 | ||||
-rw-r--r-- | cpp/src/Glacier/ServerBlobject.h | 7 | ||||
-rw-r--r-- | cpp/src/Glacier/StarterI.cpp | 2 |
10 files changed, 442 insertions, 132 deletions
diff --git a/cpp/src/Glacier/Blobject.cpp b/cpp/src/Glacier/Blobject.cpp new file mode 100644 index 00000000000..bbbfdfbc365 --- /dev/null +++ b/cpp/src/Glacier/Blobject.cpp @@ -0,0 +1,136 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/RoutingTable.h> +#include <Glacier/Blobject.h> + +using namespace std; +using namespace Ice; +using namespace Glacier; + +Glacier::Blobject::Blobject(const CommunicatorPtr& communicator) : + _communicator(communicator), + _logger(_communicator->getLogger()) +{ +} + +Glacier::Blobject::~Blobject() +{ + assert(!_communicator); + assert(!_missiveQueue); +} + +void +Glacier::Blobject::destroy() +{ + // + // No mutex protection necessary, destroy is only called after all + // object adapters have shut down. + // + _communicator = 0; + _logger = 0; + + { + IceUtil::Mutex::Lock lock(_missiveQueueMutex); + if (_missiveQueue) + { + _missiveQueue->destroy(); + _missiveQueueControl.join(); + _missiveQueue = 0; + } + } +} + + +MissiveQueuePtr +Glacier::Blobject::modifyProxy(ObjectPrx& proxy, const Current& current) +{ + if (!current.facet.empty()) + { + proxy = proxy->ice_newFacet(current.facet); + } + + MissiveQueuePtr missiveQueue; + Context::const_iterator p = current.context.find("_fwd"); + if (p != current.context.end()) + { + for (unsigned int i = 0; i < p->second.length(); ++i) + { + char option = p->second[i]; + switch (option) + { + case 't': + { + proxy = proxy->ice_twoway(); + missiveQueue = 0; + break; + } + + case 'o': + { + proxy = proxy->ice_oneway(); + missiveQueue = 0; + break; + } + + case 'd': + { + proxy = proxy->ice_datagram(); + missiveQueue = 0; + break; + } + + case 'O': + { + proxy = proxy->ice_batchOneway(); + missiveQueue = getMissiveQueue(); + break; + } + + case 'D': + { + proxy = proxy->ice_batchDatagram(); + missiveQueue = getMissiveQueue(); + break; + } + + case 's': + { + proxy = proxy->ice_secure(true); + break; + } + + default: + { + Warning out(_logger); + out << "unknown forward option `" << option << "'"; + break; + } + } + } + } + + return missiveQueue; +} + +MissiveQueuePtr +Glacier::Blobject::getMissiveQueue() +{ + // + // Lazy missive queue initialization. + // + IceUtil::Mutex::Lock lock(_missiveQueueMutex); + if (!_missiveQueue) + { + _missiveQueue = new MissiveQueue; + _missiveQueueControl = _missiveQueue->start(); + } + return _missiveQueue; +} diff --git a/cpp/src/Glacier/Blobject.h b/cpp/src/Glacier/Blobject.h new file mode 100644 index 00000000000..836db558d0f --- /dev/null +++ b/cpp/src/Glacier/Blobject.h @@ -0,0 +1,46 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef BLOBJECT_H +#define BLOBJECT_H + +#include <Ice/Ice.h> +#include <Glacier/Missive.h> + +namespace Glacier +{ + +class Blobject : public Ice::Blobject +{ +public: + + Blobject(const Ice::CommunicatorPtr&); + virtual ~Blobject(); + + void destroy(); + virtual MissiveQueuePtr modifyProxy(Ice::ObjectPrx&, const Ice::Current&); + +protected: + + Ice::CommunicatorPtr _communicator; + Ice::LoggerPtr _logger; + +private: + + MissiveQueuePtr getMissiveQueue(); + + MissiveQueuePtr _missiveQueue; + IceUtil::ThreadControl _missiveQueueControl; + IceUtil::Mutex _missiveQueueMutex; +}; + +} + +#endif diff --git a/cpp/src/Glacier/ClientBlobject.cpp b/cpp/src/Glacier/ClientBlobject.cpp index 6821a2bfd69..d9df798fc38 100644 --- a/cpp/src/Glacier/ClientBlobject.cpp +++ b/cpp/src/Glacier/ClientBlobject.cpp @@ -20,8 +20,7 @@ using namespace Glacier; Glacier::ClientBlobject::ClientBlobject(const CommunicatorPtr& communicator, const IceInternal::RoutingTablePtr& routingTable, const string& allowCategories) : - _communicator(communicator), - _logger(_communicator->getLogger()), + Glacier::Blobject(communicator), _routingTable(routingTable) { PropertiesPtr properties = _communicator->getProperties(); @@ -41,11 +40,6 @@ Glacier::ClientBlobject::ClientBlobject(const CommunicatorPtr& communicator, _allowCategories.erase(unique(_allowCategories.begin(), _allowCategories.end()), _allowCategories.end()); } -Glacier::ClientBlobject::~ClientBlobject() -{ - assert(!_communicator); -} - void Glacier::ClientBlobject::destroy() { @@ -53,14 +47,12 @@ Glacier::ClientBlobject::destroy() // No mutex protection necessary, destroy is only called after all // object adapters have shut down. // - _communicator = 0; - _logger = 0; _routingTable = 0; + Glacier::Blobject::destroy(); } bool -Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vector<Byte>& outParams, - const Current& current) +Glacier::ClientBlobject::ice_invoke(const vector<Byte>& inParams, vector<Byte>& outParams, const Current& current) { assert(_communicator); // Destroyed? @@ -71,7 +63,7 @@ Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect { if (!binary_search(_allowCategories.begin(), _allowCategories.end(), current.identity.category)) { - if (_traceLevel > 0) + if (_traceLevel >= 1) { Trace out(_logger, "Glacier"); out << "rejecting request\n"; @@ -93,53 +85,9 @@ Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect ex.identity = current.identity; throw ex; } - - if (!current.facet.empty()) - { - proxy = proxy->ice_newFacet(current.facet); - } - Context::const_iterator p = current.context.find("_fwd"); - if (p != current.context.end()) - { - for (unsigned int i = 0; i < p->second.length(); ++i) - { - char option = p->second[i]; - switch (option) - { - case 't': - { - proxy = proxy->ice_twoway(); - break; - } - - case 'o': - { - proxy = proxy->ice_oneway(); - break; - } - - case 'd': - { - proxy = proxy->ice_datagram(); - break; - } - - case 's': - { - proxy = proxy->ice_secure(true); - break; - } - - default: - { - Warning out(_logger); - out << "unknown forward option `" << option << "'"; - break; - } - } - } - } + MissiveQueuePtr missiveQueue = modifyProxy(proxy, current); + assert(!missiveQueue); if (_traceLevel >= 2) { @@ -150,11 +98,12 @@ Glacier::ClientBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect << "nonmutating = " << (current.nonmutating ? "true" : "false"); } + // TODO: Should we forward the context? Perhaps a config parameter? return proxy->ice_invoke(current.operation, current.nonmutating, inParams, outParams, current.context); } catch (const Exception& ex) { - if (_traceLevel) + if (_traceLevel >= 1) { Trace out(_logger, "Glacier"); out << "routing exception:\n" << ex; diff --git a/cpp/src/Glacier/ClientBlobject.h b/cpp/src/Glacier/ClientBlobject.h index 8ca92e91362..2242f564720 100644 --- a/cpp/src/Glacier/ClientBlobject.h +++ b/cpp/src/Glacier/ClientBlobject.h @@ -12,27 +12,22 @@ #define CLIENT_BLOBJECT_H #include <Ice/RoutingTableF.h> -#include <Ice/Ice.h> - -#include <set> +#include <Glacier/Blobject.h> namespace Glacier { -class ClientBlobject : public Ice::Blobject +class ClientBlobject : public Glacier::Blobject { public: ClientBlobject(const Ice::CommunicatorPtr&, const IceInternal::RoutingTablePtr&, const std::string&); - virtual ~ClientBlobject(); void destroy(); virtual bool ice_invoke(const std::vector<Ice::Byte>&, std::vector<Ice::Byte>&, const Ice::Current&); private: - Ice::CommunicatorPtr _communicator; - Ice::LoggerPtr _logger; int _traceLevel; IceInternal::RoutingTablePtr _routingTable; std::vector<std::string> _allowCategories; diff --git a/cpp/src/Glacier/Makefile b/cpp/src/Glacier/Makefile index 952b8f43fea..3714cb01ced 100644 --- a/cpp/src/Glacier/Makefile +++ b/cpp/src/Glacier/Makefile @@ -28,8 +28,10 @@ OBJS = Starter.o \ ROBJS = GlacierRouter.o \ RouterI.o \ + Blobject.o \ ClientBlobject.o \ - ServerBlobject.o + ServerBlobject.o \ + Missive.o SOBJS = GlacierStarter.o \ StarterI.o diff --git a/cpp/src/Glacier/Missive.cpp b/cpp/src/Glacier/Missive.cpp new file mode 100644 index 00000000000..6faedc57426 --- /dev/null +++ b/cpp/src/Glacier/Missive.cpp @@ -0,0 +1,168 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Glacier/Missive.h> + +using namespace std; +using namespace Ice; +using namespace Glacier; + +Glacier::Missive::Missive(const ObjectPrx& proxy, const vector<Byte>& inParams, const Current& current) : + _proxy(proxy), + _inParams(inParams), + _current(current) +{ +} + +ObjectPrx +Glacier::Missive::invoke() +{ + // TODO: Should we forward the context? Perhaps a config parameter? + std::vector<Byte> dummy; + _proxy->ice_invoke(_current.operation, _current.nonmutating, _inParams, dummy, _current.context); + return _proxy; +} + +bool +Glacier::Missive::override(const MissivePtr& missive) +{ + return false; +} + +Glacier::MissiveQueue::MissiveQueue() : + _destroy(false) +{ +} + +Glacier::MissiveQueue::~MissiveQueue() +{ + assert(_destroy); + assert(_missives.empty()); +} + +void +Glacier::MissiveQueue::destroy() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + + _destroy = true; + _missives.clear(); + + notify(); +} + +void +Glacier::MissiveQueue::add(const MissivePtr& missive) +{ + assert(missive); + + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + + assert(!_destroy); + + if (_missives.empty()) + { + notify(); + } + + for (std::vector<MissivePtr>::iterator p = _missives.begin(); p != _missives.end(); ++p) + { + if (missive->override(*p)) + { + *p = missive; // Replace old missive if this is an override. + return; + } + } + + _missives.push_back(missive); // No override, add new missive. +} + +void +Glacier::MissiveQueue::run() +{ + while (true) + { + vector<ObjectPrx> proxies; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + + while (!_destroy && _missives.empty()) + { + wait(); + } + + if (_destroy) + { + return; + } + + proxies.reserve(_missives.size()); + vector<MissivePtr>::const_iterator p; + for (p = _missives.begin(); p != _missives.end(); ++p) + { + try + { + proxies.push_back((*p)->invoke()); + } + catch (const Ice::Exception& ex) + { + // + // Remember exception and destroy the missive queue. + // + _destroy = true; + _missives.clear(); + _exception = std::auto_ptr<Ice::Exception>(ex.ice_clone()); + return; + } + _missives.clear(); + } + } + + // + // Flush and sleep outside the thread synchronization, so that + // new messages can be added to this missive queue while this + // thread sends a batch and sleeps. + // + try + { + // + // This sends all batched missives. + // + sort(proxies.begin(), proxies.end()); + proxies.erase(unique(proxies.begin(), proxies.end()), proxies.end()); + vector<ObjectPrx>::const_iterator p; + for (p = proxies.begin(); p != proxies.end(); ++p) + { + (*p)->ice_flush(); + } + + // + // In order to avoid flooding the missive receivers, we add + // a delay between sending missives. + // + // TODO: Configurable. + // + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(250)); + } + catch (const Ice::Exception& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock lock(*this); + + // + // Remember exception and destroy the missive queue. + // + _destroy = true; + _missives.clear(); + _exception = std::auto_ptr<Ice::Exception>(ex.ice_clone()); + return; + } + } +} diff --git a/cpp/src/Glacier/Missive.h b/cpp/src/Glacier/Missive.h new file mode 100644 index 00000000000..ba72fa26aac --- /dev/null +++ b/cpp/src/Glacier/Missive.h @@ -0,0 +1,66 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef MISSIVE_H +#define MISSIVE_H + +#include <Ice/Ice.h> +#include <IceUtil/Thread.h> +#include <IceUtil/Monitor.h> + +namespace Glacier +{ + +class Missive; +typedef IceUtil::Handle<Missive> MissivePtr; + +class Missive : virtual public IceUtil::Shared +{ +public: + + Missive(const Ice::ObjectPrx&, const std::vector<Ice::Byte>&, const Ice::Current&); + + virtual Ice::ObjectPrx invoke(); + virtual bool override(const MissivePtr&); + +private: + + const Ice::ObjectPrx _proxy; + const std::vector<Ice::Byte>& _inParams; + Ice::Current _current; +}; + +class MissiveQueue; +typedef IceUtil::Handle<MissiveQueue> MissiveQueuePtr; + +class MissiveQueue : virtual public IceUtil::Thread, IceUtil::Monitor<IceUtil::Mutex> +{ +public: + + MissiveQueue(); + virtual ~MissiveQueue(); + + void destroy(); + void add(const MissivePtr&); + + virtual void run(); + +protected: + + Ice::ObjectPrx _proxy; + std::vector<MissivePtr> _missives; + std::auto_ptr<Ice::Exception> _exception; + bool _destroy; +}; + +}; + +#endif + diff --git a/cpp/src/Glacier/ServerBlobject.cpp b/cpp/src/Glacier/ServerBlobject.cpp index d2ad46a113e..6f9c0f74064 100644 --- a/cpp/src/Glacier/ServerBlobject.cpp +++ b/cpp/src/Glacier/ServerBlobject.cpp @@ -16,18 +16,13 @@ using namespace Ice; using namespace Glacier; Glacier::ServerBlobject::ServerBlobject(const ObjectAdapterPtr& clientAdapter) : - _clientAdapter(clientAdapter), - _logger(_clientAdapter->getCommunicator()->getLogger()) + Glacier::Blobject(_clientAdapter->getCommunicator()), + _clientAdapter(clientAdapter) { - PropertiesPtr properties = _clientAdapter->getCommunicator()->getProperties(); + PropertiesPtr properties = _communicator->getProperties(); _traceLevel = properties->getPropertyAsInt("Glacier.Router.Trace.Server"); } -Glacier::ServerBlobject::~ServerBlobject() -{ - assert(!_clientAdapter); -} - void Glacier::ServerBlobject::destroy() { @@ -36,12 +31,11 @@ Glacier::ServerBlobject::destroy() // object adapters have shut down. // _clientAdapter = 0; - _logger = 0; + Glacier::Blobject::destroy(); } bool -Glacier::ServerBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vector<Byte>& outParams, - const Current& current) +Glacier::ServerBlobject::ice_invoke(const vector<Byte>& inParams, vector<Byte>& outParams, const Current& current) { assert(_clientAdapter); // Destroyed? @@ -50,67 +44,24 @@ Glacier::ServerBlobject::ice_invoke(const std::vector<Byte>& inParams, std::vect ObjectPrx proxy = _clientAdapter->createReverseProxy(current.identity); assert(proxy); - if (!current.facet.empty()) - { - proxy = proxy->ice_newFacet(current.facet); - } - - Context::const_iterator p = current.context.find("_fwd"); - if (p != current.context.end()) - { - for (unsigned int i = 0; i < p->second.length(); ++i) - { - char option = p->second[i]; - switch (option) - { - case 't': - { - proxy = proxy->ice_twoway(); - break; - } - - case 'o': - { - proxy = proxy->ice_oneway(); - break; - } - - case 'd': - { - proxy = proxy->ice_datagram(); - break; - } - - case 's': - { - proxy = proxy->ice_secure(true); - break; - } - - default: - { - Warning out(_logger); - out << "unknown forward option `" << option << "'"; - break; - } - } - } - } + MissiveQueuePtr missiveQueue = modifyProxy(proxy, current); + assert(!missiveQueue); if (_traceLevel >= 2) { Trace out(_logger, "Glacier"); out << "reverse routing to:\n" - << "proxy = " << _clientAdapter->getCommunicator()->proxyToString(proxy) << '\n' + << "proxy = " << _communicator->proxyToString(proxy) << '\n' << "operation = " << current.operation << '\n' << "nonmutating = " << (current.nonmutating ? "true" : "false"); } + // TODO: Should we forward the context? Perhaps a config parameter? return proxy->ice_invoke(current.operation, current.nonmutating, inParams, outParams, current.context); } catch (const Exception& ex) { - if (_traceLevel) + if (_traceLevel >= 1) { Trace out(_logger, "Glacier"); out << "reverse routing exception:\n" << ex; diff --git a/cpp/src/Glacier/ServerBlobject.h b/cpp/src/Glacier/ServerBlobject.h index 11aa642a60b..7b800b4059f 100644 --- a/cpp/src/Glacier/ServerBlobject.h +++ b/cpp/src/Glacier/ServerBlobject.h @@ -11,18 +11,16 @@ #ifndef SERVER_BLOBJECT_H #define SERVER_BLOBJECT_H -#include <Ice/RoutingTableF.h> -#include <Ice/Ice.h> +#include <Glacier/Blobject.h> namespace Glacier { -class ServerBlobject : public Ice::Blobject +class ServerBlobject : public Glacier::Blobject { public: ServerBlobject(const Ice::ObjectAdapterPtr&); - virtual ~ServerBlobject(); void destroy(); virtual bool ice_invoke(const std::vector<Ice::Byte>&, std::vector<Ice::Byte>&, const Ice::Current&); @@ -30,7 +28,6 @@ public: private: Ice::ObjectAdapterPtr _clientAdapter; - Ice::LoggerPtr _logger; int _traceLevel; }; diff --git a/cpp/src/Glacier/StarterI.cpp b/cpp/src/Glacier/StarterI.cpp index 2200c0f4f76..a8eef0ca0f8 100644 --- a/cpp/src/Glacier/StarterI.cpp +++ b/cpp/src/Glacier/StarterI.cpp @@ -97,7 +97,7 @@ Glacier::StarterI::startRouter(const string& userId, const string& password, Byt // // The value of clientCertificateBase64 should be passed in to the router // in the property - // * Glacier.Router.ClientCertificate + // * Glacier.Router.AcceptCert // string routerPrivateKeyBase64; string routerCertificateBase64; |