summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2005-09-02 02:59:36 +0000
committerMatthew Newhook <matthew@zeroc.com>2005-09-02 02:59:36 +0000
commit610aac07d482bfc9aff85ec68da3b018390552f8 (patch)
tree796f5f0d7e5a03bea4f9c9abb5804348947eaa66 /cpp
parentrevising dictionary helper code for bug 386 (diff)
downloadice-610aac07d482bfc9aff85ec68da3b018390552f8.tar.bz2
ice-610aac07d482bfc9aff85ec68da3b018390552f8.tar.xz
ice-610aac07d482bfc9aff85ec68da3b018390552f8.zip
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=28
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/IceStorm/IceStormInternal.ice34
-rw-r--r--cpp/src/IceStorm/LinkProxy.cpp22
-rw-r--r--cpp/src/IceStorm/LinkProxy.h2
-rw-r--r--cpp/src/IceStorm/OnewayProxy.cpp7
-rw-r--r--cpp/src/IceStorm/OnewayProxy.h2
-rw-r--r--cpp/src/IceStorm/QueuedProxy.cpp5
-rw-r--r--cpp/src/IceStorm/QueuedProxy.h8
-rw-r--r--cpp/src/IceStorm/SubscriberFactory.cpp14
-rw-r--r--cpp/src/IceStorm/TopicI.cpp26
-rw-r--r--cpp/src/IceStorm/TwowayProxy.cpp37
-rw-r--r--cpp/src/IceStorm/TwowayProxy.h4
11 files changed, 103 insertions, 58 deletions
diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice
index 172c541f2ed..76a228a4313 100644
--- a/cpp/src/IceStorm/IceStormInternal.ice
+++ b/cpp/src/IceStorm/IceStormInternal.ice
@@ -32,6 +32,26 @@ dictionary<string, string> ContextData;
/**
*
+ * The event data.
+ *
+ **/
+struct EventData
+{
+ /** The operation name. */
+ string op;
+ /** The operation mode. */
+ Ice::OperationMode mode;
+ /** The encoded data for the operation's input parameters. */
+ ByteSeq data;
+ /** The Ice::Current::Context data from the originating request. */
+ ContextData context;
+};
+
+/** A sequence of EventData. */
+sequence<EventData> EventDataSeq;
+
+/**
+ *
* The TopicLink interface. This is used to forward events between
* federated Topic instances.
*
@@ -42,20 +62,12 @@ interface TopicLink
{
/**
*
- * Forward an event.
- *
- * @param op The operation name.
- *
- * @param idempotent Flag indicating whether the operation is
- * idempotent.
- *
- * @param data The encoded data for the operation's input parameters.
+ * Forward a sequence of events.
*
- * @param context The Ice::Current::Context data from the
- * originating request.
+ * @param events The events to forward.
*
**/
- void forward(string op, Ice::OperationMode mode, ByteSeq data, ContextData context);
+ void forward(EventDataSeq events);
};
/**
diff --git a/cpp/src/IceStorm/LinkProxy.cpp b/cpp/src/IceStorm/LinkProxy.cpp
index a555c40ff59..12e3e94763e 100644
--- a/cpp/src/IceStorm/LinkProxy.cpp
+++ b/cpp/src/IceStorm/LinkProxy.cpp
@@ -23,7 +23,25 @@ IceStorm::LinkProxy::proxy() const
}
void
-IceStorm::LinkProxy::deliver(const EventPtr& event)
+IceStorm::LinkProxy::deliver(const vector<EventPtr>& v)
{
- _obj->forward(event->op, event->mode, event->data, event->context);
+ //
+ // TODO: It would be nice to avoid all this copying. To do this we
+ // could change EventData from a struct to a class and rename to
+ // Event, and then remove EventPtr. However, this means EventData
+ // would have some data that doesn't need to be passed over the
+ // wire (forward/cost), and the marshaling of the data would be
+ // marginally slower.
+ //
+ vector<EventData> events;
+ for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ EventData e;
+ e.op = (*p)->op;
+ e.mode = (*p)->mode;
+ e.data = (*p)->data;
+ e.context = (*p)->context;
+ events.push_back(e);
+ }
+ _obj->forward(events);
}
diff --git a/cpp/src/IceStorm/LinkProxy.h b/cpp/src/IceStorm/LinkProxy.h
index 226a90b1008..67e0f0276e1 100644
--- a/cpp/src/IceStorm/LinkProxy.h
+++ b/cpp/src/IceStorm/LinkProxy.h
@@ -29,7 +29,7 @@ public:
protected:
- virtual void deliver(const EventPtr&);
+ virtual void deliver(const std::vector<EventPtr>&);
private:
diff --git a/cpp/src/IceStorm/OnewayProxy.cpp b/cpp/src/IceStorm/OnewayProxy.cpp
index e662eb55f48..d52edeee03c 100644
--- a/cpp/src/IceStorm/OnewayProxy.cpp
+++ b/cpp/src/IceStorm/OnewayProxy.cpp
@@ -23,8 +23,11 @@ IceStorm::OnewayProxy::proxy() const
}
void
-IceStorm::OnewayProxy::deliver(const EventPtr& event)
+IceStorm::OnewayProxy::deliver(const vector<EventPtr>& v)
{
vector<Ice::Byte> dummy;
- _obj->ice_invoke(event->op, event->mode, event->data, dummy, event->context);
+ for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
+ }
}
diff --git a/cpp/src/IceStorm/OnewayProxy.h b/cpp/src/IceStorm/OnewayProxy.h
index 1face10de84..edbf738e6b2 100644
--- a/cpp/src/IceStorm/OnewayProxy.h
+++ b/cpp/src/IceStorm/OnewayProxy.h
@@ -28,7 +28,7 @@ public:
protected:
- virtual void deliver(const EventPtr&);
+ virtual void deliver(const std::vector<EventPtr>&);
private:
diff --git a/cpp/src/IceStorm/QueuedProxy.cpp b/cpp/src/IceStorm/QueuedProxy.cpp
index 0f2e2d233d0..32d8c136c45 100644
--- a/cpp/src/IceStorm/QueuedProxy.cpp
+++ b/cpp/src/IceStorm/QueuedProxy.cpp
@@ -56,10 +56,7 @@ IceStorm::QueuedProxy::publish(const EventPtr& event)
//
// Deliver the events without holding the lock.
//
- for(vector<EventPtr>::iterator p = v.begin(); p != v.end(); ++p)
- {
- deliver(*p);
- }
+ deliver(v);
//
// Reacquire the lock before we check the queue again.
diff --git a/cpp/src/IceStorm/QueuedProxy.h b/cpp/src/IceStorm/QueuedProxy.h
index 23924531bc7..a305117d456 100644
--- a/cpp/src/IceStorm/QueuedProxy.h
+++ b/cpp/src/IceStorm/QueuedProxy.h
@@ -17,9 +17,9 @@ namespace IceStorm
{
//
-// QueuedProxy encapsulates a subscriber proxy in order to maintain
-// a queue of events to be delivered to the subscriber. QueuedProxy
-// manages the event queue, but delegates delivery to subsclasses.
+// QueuedProxy encapsulates a subscriber proxy in order to maintain a
+// queue of events to be delivered to the subscriber. QueuedProxy
+// manages the event queue, but delegates delivery to subclasses.
//
class QueuedProxy : public IceUtil::Shared
{
@@ -33,7 +33,7 @@ public:
protected:
- virtual void deliver(const EventPtr&) = 0;
+ virtual void deliver(const std::vector<EventPtr>&) = 0;
IceUtil::Mutex _mutex;
std::auto_ptr<Ice::LocalException> _exception;
diff --git a/cpp/src/IceStorm/SubscriberFactory.cpp b/cpp/src/IceStorm/SubscriberFactory.cpp
index 3b2c820827c..9a2350aca50 100644
--- a/cpp/src/IceStorm/SubscriberFactory.cpp
+++ b/cpp/src/IceStorm/SubscriberFactory.cpp
@@ -36,26 +36,22 @@ SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost)
IceUtil::RecMutex::Lock sync(_proxiesMutex);
//
- // Delivery to links is done in batch mode.
- //
- TopicLinkPrx newObj = TopicLinkPrx::uncheckedCast(obj->ice_batchOneway());
-
- //
- // Check if a queued proxy already exists, or create one if necessary.
+ // Check if a queued proxy already exists, or create one if
+ // necessary.
//
QueuedProxyPtr proxy;
- map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(newObj);
+ map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(obj);
if(p != _proxies.end())
{
proxy = p->second.proxy;
}
else
{
- proxy = new LinkProxy(newObj);
+ proxy = new LinkProxy(obj);
ProxyInfo info;
info.proxy = proxy;
info.count = 0;
- _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info));
+ _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(obj, info));
}
return new LinkSubscriber(this, _communicator, _traceLevels, proxy, cost);
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 8fa062ff000..d7b1a3b98b7 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -65,7 +65,7 @@ public:
{
}
- virtual void forward(const string&, ::Ice::OperationMode, const ByteSeq&, const ContextData&, const Ice::Current&);
+ virtual void forward(const vector<EventData>&, const Ice::Current&);
private:
@@ -280,18 +280,20 @@ PublisherProxyI::ice_invoke(const vector< Ice::Byte>& inParams, vector< Ice::Byt
// Incoming events from linked topics.
//
void
-TopicLinkI::forward(const string& op, Ice::OperationMode mode, const ByteSeq& data, const ContextData& context,
- const Ice::Current& current)
+TopicLinkI::forward(const vector<EventData>& v, const Ice::Current& current)
{
- EventPtr event = new Event;
- event->forwarded = true;
- event->cost = 0;
- event->op = op;
- event->mode = mode;
- event->data = data;
- event->context = context;
-
- _subscribers->publish(event);
+ for(vector<EventData>::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ EventPtr event = new Event;
+ event->forwarded = true;
+ event->cost = 0;
+ event->op = p->op;
+ event->mode = p->mode;
+ event->data = p->data;
+ event->context = p->context;
+
+ _subscribers->publish(event);
+ }
}
TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const string& name,
diff --git a/cpp/src/IceStorm/TwowayProxy.cpp b/cpp/src/IceStorm/TwowayProxy.cpp
index bdf4cc16fa8..a7a458069d9 100644
--- a/cpp/src/IceStorm/TwowayProxy.cpp
+++ b/cpp/src/IceStorm/TwowayProxy.cpp
@@ -71,12 +71,15 @@ IceStorm::UnorderedTwowayProxy::exception(const Ice::LocalException& ex)
}
void
-IceStorm::UnorderedTwowayProxy::deliver(const EventPtr& event)
+IceStorm::UnorderedTwowayProxy::deliver(const vector<EventPtr>& v)
{
//
// TODO: Use a buffer of AMI callback objects to eliminate the dynamic memory allocation?
//
- _obj->ice_invoke_async(new UnorderedInvokeCB(this), event->op, event->mode, event->data, event->context);
+ for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _obj->ice_invoke_async(new UnorderedInvokeCB(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
+ }
}
class OrderedInvokeCB : public Ice::AMI_Object_ice_invoke
@@ -146,8 +149,10 @@ IceStorm::OrderedTwowayProxy::publish(const EventPtr& event)
try
{
- assert(e);
- deliver(e);
+ assert(e);
+ vector<EventPtr> events;
+ events.push_back(e);
+ deliver(events);
}
catch(const Ice::LocalException& ex)
{
@@ -186,8 +191,10 @@ IceStorm::OrderedTwowayProxy::response()
try
{
- assert(event);
- deliver(event);
+ assert(event);
+ vector<EventPtr> events;
+ events.push_back(event);
+ deliver(events);
}
catch(const Ice::LocalException& ex)
{
@@ -196,12 +203,22 @@ IceStorm::OrderedTwowayProxy::response()
}
void
-IceStorm::OrderedTwowayProxy::deliver(const EventPtr& event)
+IceStorm::OrderedTwowayProxy::deliver(const vector<EventPtr>& v)
{
//
- // TODO: Use a buffer of AMI callback objects to eliminate the dynamic memory allocation? (we could
- // actually use only 2 AMI callback objects there.)
+ // Should only ever be called with a queue of length 1 (should
+ // only be called by methods local to OrderedTwowayProxy).
+ //
+ assert(v.size() == 1);
+
//
- _obj->ice_invoke_async(new OrderedInvokeCB(this), event->op, event->mode, event->data, event->context);
+ // TODO: Use a buffer of AMI callback objects to eliminate the
+ // dynamic memory allocation? (we could actually use only 2 AMI
+ // callback objects there.)
+ //
+ for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _obj->ice_invoke_async(new OrderedInvokeCB(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
+ }
}
diff --git a/cpp/src/IceStorm/TwowayProxy.h b/cpp/src/IceStorm/TwowayProxy.h
index 6b65191d63a..e4c4949bd52 100644
--- a/cpp/src/IceStorm/TwowayProxy.h
+++ b/cpp/src/IceStorm/TwowayProxy.h
@@ -47,7 +47,7 @@ public:
protected:
- virtual void deliver(const EventPtr&);
+ virtual void deliver(const std::vector<EventPtr>&);
};
@@ -67,7 +67,7 @@ public:
protected:
- virtual void deliver(const EventPtr&);
+ virtual void deliver(const std::vector<EventPtr>&);
};
typedef IceUtil::Handle<UnorderedTwowayProxy> UnorderedTwowayProxyPtr;