summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/repstress/Publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/IceStorm/repstress/Publisher.cpp')
-rw-r--r--cpp/test/IceStorm/repstress/Publisher.cpp169
1 files changed, 169 insertions, 0 deletions
diff --git a/cpp/test/IceStorm/repstress/Publisher.cpp b/cpp/test/IceStorm/repstress/Publisher.cpp
new file mode 100644
index 00000000000..36f33b04a18
--- /dev/null
+++ b/cpp/test/IceStorm/repstress/Publisher.cpp
@@ -0,0 +1,169 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <IceUtil/IceUtil.h>
+#include <IceStorm/IceStorm.h>
+#include <Single.h>
+#include <Controller.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceStorm;
+using namespace Test;
+
+class ControllerI : public Controller
+{
+public:
+
+ virtual void stop(const Ice::Current& c)
+ {
+ c.adapter->getCommunicator()->shutdown();
+ }
+};
+
+class PublishThread : public IceUtil::Thread, public IceUtil::Mutex
+{
+public:
+
+ PublishThread(const SinglePrx& single) :
+ _single(single),
+ _published(0),
+ _destroy(false)
+ {
+ }
+
+ virtual void run()
+ {
+ while(true)
+ {
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ cout << _published << endl;
+ break;
+ }
+ }
+ try
+ {
+ _single->event(_published);
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(1));
+ }
+ catch(const Ice::UnknownException&)
+ {
+ // This is expected if we publish to a replica that is
+ // going down.
+ continue;
+ }
+ ++_published;
+ }
+ }
+
+ void destroy()
+ {
+ Lock sync(*this);
+ _destroy = true;
+ }
+
+private:
+
+ const SinglePrx _single;
+ int _published;
+ bool _destroy;
+};
+typedef IceUtil::Handle<PublishThread> PublishThreadPtr;
+
+int
+run(int argc, char* argv[], const CommunicatorPtr& communicator)
+{
+ PropertiesPtr properties = communicator->getProperties();
+ const char* managerProxyProperty = "IceStormAdmin.TopicManager.Default";
+ string managerProxy = properties->getProperty(managerProxyProperty);
+ if(managerProxy.empty())
+ {
+ cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl;
+ return EXIT_FAILURE;
+ }
+
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
+ communicator->stringToProxy(managerProxy));
+ if(!manager)
+ {
+ cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl;
+ return EXIT_FAILURE;
+ }
+
+ TopicPrx topic;
+ try
+ {
+ topic = manager->retrieve("single");
+ }
+ catch(const NoSuchTopic& e)
+ {
+ cerr << argv[0] << ": NoSuchTopic: " << e.name << endl;
+ return EXIT_FAILURE;
+
+ }
+ assert(topic);
+
+ //
+ // Get a publisher object, create a twoway proxy, disable
+ // connection caching and then cast to a Single object.
+ //
+ SinglePrx single = SinglePrx::uncheckedCast(topic->getPublisher()->ice_twoway()->ice_connectionCached(false));
+
+ ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "default");
+ Ice::ObjectPrx controller = adapter->addWithUUID(new ControllerI);
+ adapter->activate();
+ cout << communicator->proxyToString(controller) << endl;
+
+ PublishThreadPtr t = new PublishThread(single);
+ t->start();
+
+ communicator->waitForShutdown();
+
+ t->destroy();
+ t->getThreadControl().join();
+
+ return EXIT_SUCCESS;
+}
+
+int
+main(int argc, char* argv[])
+{
+ int status;
+ CommunicatorPtr communicator;
+
+ try
+ {
+ communicator = initialize(argc, argv);
+ status = run(argc, argv, communicator);
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+
+ if(communicator)
+ {
+ try
+ {
+ communicator->destroy();
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+ }
+
+ return status;
+}