diff options
Diffstat (limited to 'cpp/test/IceStorm/repstress/Publisher.cpp')
-rw-r--r-- | cpp/test/IceStorm/repstress/Publisher.cpp | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/cpp/test/IceStorm/repstress/Publisher.cpp b/cpp/test/IceStorm/repstress/Publisher.cpp new file mode 100644 index 00000000000..36f33b04a18 --- /dev/null +++ b/cpp/test/IceStorm/repstress/Publisher.cpp @@ -0,0 +1,169 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <IceUtil/IceUtil.h> +#include <IceStorm/IceStorm.h> +#include <Single.h> +#include <Controller.h> + +using namespace std; +using namespace Ice; +using namespace IceStorm; +using namespace Test; + +class ControllerI : public Controller +{ +public: + + virtual void stop(const Ice::Current& c) + { + c.adapter->getCommunicator()->shutdown(); + } +}; + +class PublishThread : public IceUtil::Thread, public IceUtil::Mutex +{ +public: + + PublishThread(const SinglePrx& single) : + _single(single), + _published(0), + _destroy(false) + { + } + + virtual void run() + { + while(true) + { + { + Lock sync(*this); + if(_destroy) + { + cout << _published << endl; + break; + } + } + try + { + _single->event(_published); + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(1)); + } + catch(const Ice::UnknownException&) + { + // This is expected if we publish to a replica that is + // going down. + continue; + } + ++_published; + } + } + + void destroy() + { + Lock sync(*this); + _destroy = true; + } + +private: + + const SinglePrx _single; + int _published; + bool _destroy; +}; +typedef IceUtil::Handle<PublishThread> PublishThreadPtr; + +int +run(int argc, char* argv[], const CommunicatorPtr& communicator) +{ + PropertiesPtr properties = communicator->getProperties(); + const char* managerProxyProperty = "IceStormAdmin.TopicManager.Default"; + string managerProxy = properties->getProperty(managerProxyProperty); + if(managerProxy.empty()) + { + cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl; + return EXIT_FAILURE; + } + + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( + communicator->stringToProxy(managerProxy)); + if(!manager) + { + cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl; + return EXIT_FAILURE; + } + + TopicPrx topic; + try + { + topic = manager->retrieve("single"); + } + catch(const NoSuchTopic& e) + { + cerr << argv[0] << ": NoSuchTopic: " << e.name << endl; + return EXIT_FAILURE; + + } + assert(topic); + + // + // Get a publisher object, create a twoway proxy, disable + // connection caching and then cast to a Single object. + // + SinglePrx single = SinglePrx::uncheckedCast(topic->getPublisher()->ice_twoway()->ice_connectionCached(false)); + + ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "default"); + Ice::ObjectPrx controller = adapter->addWithUUID(new ControllerI); + adapter->activate(); + cout << communicator->proxyToString(controller) << endl; + + PublishThreadPtr t = new PublishThread(single); + t->start(); + + communicator->waitForShutdown(); + + t->destroy(); + t->getThreadControl().join(); + + return EXIT_SUCCESS; +} + +int +main(int argc, char* argv[]) +{ + int status; + CommunicatorPtr communicator; + + try + { + communicator = initialize(argc, argv); + status = run(argc, argv, communicator); + } + catch(const Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } + + if(communicator) + { + try + { + communicator->destroy(); + } + catch(const Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } + } + + return status; +} |