diff options
Diffstat (limited to 'cpp/src/IceStorm/SubscriberFactory.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberFactory.cpp | 94 |
1 files changed, 86 insertions, 8 deletions
diff --git a/cpp/src/IceStorm/SubscriberFactory.cpp b/cpp/src/IceStorm/SubscriberFactory.cpp index 97966b6a9de..8ce91506925 100644 --- a/cpp/src/IceStorm/SubscriberFactory.cpp +++ b/cpp/src/IceStorm/SubscriberFactory.cpp @@ -19,6 +19,8 @@ #include <IceStorm/OnewayBatchSubscriber.h> #include <IceStorm/Flusher.h> #include <IceStorm/TraceLevels.h> +#include <IceStorm/LinkProxy.h> +#include <IceStorm/OnewayProxy.h> using namespace std; using namespace IceStorm; @@ -30,8 +32,41 @@ SubscriberFactory::SubscriberFactory(const TraceLevelsPtr& traceLevels, const Fl } SubscriberPtr +SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost) +{ + IceUtil::RecMutex::Lock sync(_proxiesMutex); + + // + // Delivery to links is done in batch mode. + // + TopicLinkPrx newObj = TopicLinkPrx::uncheckedCast(obj->ice_batchOneway()); + + // + // Check if a queued proxy already exists, or create one if necessary. + // + QueuedProxyPtr proxy; + map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(newObj); + if(p != _proxies.end()) + { + proxy = p->second.proxy; + } + else + { + proxy = new LinkProxy(newObj); + ProxyInfo info; + info.proxy = proxy; + info.count = 0; + _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info)); + } + + return new LinkSubscriber(this, _traceLevels, proxy, cost); +} + +SubscriberPtr SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj) { + IceUtil::RecMutex::Lock sync(_proxiesMutex); + // // Determine the requested reliability characteristics // @@ -52,11 +87,11 @@ SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj) { if(obj->ice_isDatagram()) { - return new OnewayBatchSubscriber(_traceLevels, _flusher, obj->ice_batchDatagram()); + newObj = obj->ice_batchDatagram(); } else { - return new OnewayBatchSubscriber(_traceLevels, _flusher, obj->ice_batchOneway()); + newObj = obj->ice_batchOneway(); } } else // reliability == "oneway" @@ -71,19 +106,62 @@ SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj) } if(obj->ice_isDatagram()) { - return new OnewaySubscriber(_traceLevels, obj->ice_datagram()); + newObj = obj; } else { - return new OnewaySubscriber(_traceLevels, obj->ice_oneway()); + newObj = obj->ice_oneway(); } } - assert(false); + // + // Check if a queued proxy already exists, or create one if necessary. + // + QueuedProxyPtr proxy; + map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(newObj); + if(p != _proxies.end()) + { + proxy = p->second.proxy; + } + else + { + proxy = new OnewayProxy(newObj); + ProxyInfo info; + info.proxy = proxy; + info.count = 0; + _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info)); + } + + if(reliability == "batch") + { + return new OnewayBatchSubscriber(this, _traceLevels, _flusher, proxy); + } + else + { + return new OnewaySubscriber(this, _traceLevels, proxy); + } } -SubscriberPtr -SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost) +void +SubscriberFactory::incProxyUsageCount(const QueuedProxyPtr& proxy) +{ + IceUtil::RecMutex::Lock sync(_proxiesMutex); + + map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(proxy->proxy()); + assert(p->second.count >= 0); + p->second.count++; +} + +void +SubscriberFactory::decProxyUsageCount(const QueuedProxyPtr& proxy) { - return new LinkSubscriber(_traceLevels, obj, cost); + IceUtil::RecMutex::Lock sync(_proxiesMutex); + + map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(proxy->proxy()); + assert(p->second.count > 0); + p->second.count--; + if(p->second.count == 0) + { + _proxies.erase(p); + } } |