// // Copyright (c) ZeroC, Inc. All rights reserved. // #include #include #include #include #include using namespace std; using namespace Ice; using namespace IceStorm; using namespace Test; class ControllerI final : public Controller { public: void stop(const Ice::Current& c) override { c.adapter->getCommunicator()->shutdown(); } }; class PublishThread final { public: explicit PublishThread(shared_ptr single) : _single(std::move(single)), _published(0), _destroy(false) { } void run() { while(true) { { lock_guard log(_mutex); if(_destroy) { cout << _published << endl; break; } } try { _single->event(_published); this_thread::sleep_for(1ms); } catch(const Ice::UnknownException&) { // This is expected if we publish to a replica that is // going down. continue; } ++_published; } } void destroy() { lock_guard log(_mutex); _destroy = true; } private: const shared_ptr _single; int _published; bool _destroy; mutex _mutex; }; class Publisher final : public Test::TestHelper { public: void run(int, char**) override; }; void Publisher::run(int argc, char** argv) { Ice::CommunicatorHolder communicator = initialize(argc, argv); auto properties = communicator->getProperties(); string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default"); if(managerProxy.empty()) { ostringstream os; os << argv[0] << ": property `IceStormAdmin.TopicManager.Default' is not set"; throw invalid_argument(os.str()); } auto manager = checkedCast( communicator->stringToProxy(managerProxy)); if(!manager) { ostringstream os; os << argv[0] << ": `" << managerProxy << "' is not running"; throw invalid_argument(os.str()); } auto topic = manager->retrieve("single"); assert(topic); // // Get a publisher object, create a twoway proxy, disable // connection caching and then cast to a Single object. // auto single = uncheckedCast(topic->getPublisher()->ice_twoway()->ice_connectionCached(false)); auto adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "tcp"); auto controller = adapter->addWithUUID(make_shared()); adapter->activate(); cout << communicator->proxyToString(controller) << endl; PublishThread pt(move(single)); auto fut = std::async(launch::async, [&pt]{ pt.run(); }); communicator->waitForShutdown(); pt.destroy(); fut.get(); } DEFINE_TEST(Publisher)