summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SubscriberFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/SubscriberFactory.cpp')
-rw-r--r--cpp/src/IceStorm/SubscriberFactory.cpp94
1 files changed, 86 insertions, 8 deletions
diff --git a/cpp/src/IceStorm/SubscriberFactory.cpp b/cpp/src/IceStorm/SubscriberFactory.cpp
index 97966b6a9de..8ce91506925 100644
--- a/cpp/src/IceStorm/SubscriberFactory.cpp
+++ b/cpp/src/IceStorm/SubscriberFactory.cpp
@@ -19,6 +19,8 @@
#include <IceStorm/OnewayBatchSubscriber.h>
#include <IceStorm/Flusher.h>
#include <IceStorm/TraceLevels.h>
+#include <IceStorm/LinkProxy.h>
+#include <IceStorm/OnewayProxy.h>
using namespace std;
using namespace IceStorm;
@@ -30,8 +32,41 @@ SubscriberFactory::SubscriberFactory(const TraceLevelsPtr& traceLevels, const Fl
}
SubscriberPtr
+SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost)
+{
+ IceUtil::RecMutex::Lock sync(_proxiesMutex);
+
+ //
+ // Delivery to links is done in batch mode.
+ //
+ TopicLinkPrx newObj = TopicLinkPrx::uncheckedCast(obj->ice_batchOneway());
+
+ //
+ // Check if a queued proxy already exists, or create one if necessary.
+ //
+ QueuedProxyPtr proxy;
+ map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(newObj);
+ if(p != _proxies.end())
+ {
+ proxy = p->second.proxy;
+ }
+ else
+ {
+ proxy = new LinkProxy(newObj);
+ ProxyInfo info;
+ info.proxy = proxy;
+ info.count = 0;
+ _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info));
+ }
+
+ return new LinkSubscriber(this, _traceLevels, proxy, cost);
+}
+
+SubscriberPtr
SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj)
{
+ IceUtil::RecMutex::Lock sync(_proxiesMutex);
+
//
// Determine the requested reliability characteristics
//
@@ -52,11 +87,11 @@ SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj)
{
if(obj->ice_isDatagram())
{
- return new OnewayBatchSubscriber(_traceLevels, _flusher, obj->ice_batchDatagram());
+ newObj = obj->ice_batchDatagram();
}
else
{
- return new OnewayBatchSubscriber(_traceLevels, _flusher, obj->ice_batchOneway());
+ newObj = obj->ice_batchOneway();
}
}
else // reliability == "oneway"
@@ -71,19 +106,62 @@ SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj)
}
if(obj->ice_isDatagram())
{
- return new OnewaySubscriber(_traceLevels, obj->ice_datagram());
+ newObj = obj;
}
else
{
- return new OnewaySubscriber(_traceLevels, obj->ice_oneway());
+ newObj = obj->ice_oneway();
}
}
- assert(false);
+ //
+ // Check if a queued proxy already exists, or create one if necessary.
+ //
+ QueuedProxyPtr proxy;
+ map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(newObj);
+ if(p != _proxies.end())
+ {
+ proxy = p->second.proxy;
+ }
+ else
+ {
+ proxy = new OnewayProxy(newObj);
+ ProxyInfo info;
+ info.proxy = proxy;
+ info.count = 0;
+ _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info));
+ }
+
+ if(reliability == "batch")
+ {
+ return new OnewayBatchSubscriber(this, _traceLevels, _flusher, proxy);
+ }
+ else
+ {
+ return new OnewaySubscriber(this, _traceLevels, proxy);
+ }
}
-SubscriberPtr
-SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost)
+void
+SubscriberFactory::incProxyUsageCount(const QueuedProxyPtr& proxy)
+{
+ IceUtil::RecMutex::Lock sync(_proxiesMutex);
+
+ map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(proxy->proxy());
+ assert(p->second.count >= 0);
+ p->second.count++;
+}
+
+void
+SubscriberFactory::decProxyUsageCount(const QueuedProxyPtr& proxy)
{
- return new LinkSubscriber(_traceLevels, obj, cost);
+ IceUtil::RecMutex::Lock sync(_proxiesMutex);
+
+ map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(proxy->proxy());
+ assert(p->second.count > 0);
+ p->second.count--;
+ if(p->second.count == 0)
+ {
+ _proxies.erase(p);
+ }
}