diff options
author | Matthew Newhook <matthew@zeroc.com> | 2005-09-02 02:59:36 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2005-09-02 02:59:36 +0000 |
commit | 610aac07d482bfc9aff85ec68da3b018390552f8 (patch) | |
tree | 796f5f0d7e5a03bea4f9c9abb5804348947eaa66 /cpp | |
parent | revising dictionary helper code for bug 386 (diff) | |
download | ice-610aac07d482bfc9aff85ec68da3b018390552f8.tar.bz2 ice-610aac07d482bfc9aff85ec68da3b018390552f8.tar.xz ice-610aac07d482bfc9aff85ec68da3b018390552f8.zip |
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=28
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/IceStorm/IceStormInternal.ice | 34 | ||||
-rw-r--r-- | cpp/src/IceStorm/LinkProxy.cpp | 22 | ||||
-rw-r--r-- | cpp/src/IceStorm/LinkProxy.h | 2 | ||||
-rw-r--r-- | cpp/src/IceStorm/OnewayProxy.cpp | 7 | ||||
-rw-r--r-- | cpp/src/IceStorm/OnewayProxy.h | 2 | ||||
-rw-r--r-- | cpp/src/IceStorm/QueuedProxy.cpp | 5 | ||||
-rw-r--r-- | cpp/src/IceStorm/QueuedProxy.h | 8 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberFactory.cpp | 14 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 26 | ||||
-rw-r--r-- | cpp/src/IceStorm/TwowayProxy.cpp | 37 | ||||
-rw-r--r-- | cpp/src/IceStorm/TwowayProxy.h | 4 |
11 files changed, 103 insertions, 58 deletions
diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice index 172c541f2ed..76a228a4313 100644 --- a/cpp/src/IceStorm/IceStormInternal.ice +++ b/cpp/src/IceStorm/IceStormInternal.ice @@ -32,6 +32,26 @@ dictionary<string, string> ContextData; /** * + * The event data. + * + **/ +struct EventData +{ + /** The operation name. */ + string op; + /** The operation mode. */ + Ice::OperationMode mode; + /** The encoded data for the operation's input parameters. */ + ByteSeq data; + /** The Ice::Current::Context data from the originating request. */ + ContextData context; +}; + +/** A sequence of EventData. */ +sequence<EventData> EventDataSeq; + +/** + * * The TopicLink interface. This is used to forward events between * federated Topic instances. * @@ -42,20 +62,12 @@ interface TopicLink { /** * - * Forward an event. - * - * @param op The operation name. - * - * @param idempotent Flag indicating whether the operation is - * idempotent. - * - * @param data The encoded data for the operation's input parameters. + * Forward a sequence of events. * - * @param context The Ice::Current::Context data from the - * originating request. + * @param events The events to forward. * **/ - void forward(string op, Ice::OperationMode mode, ByteSeq data, ContextData context); + void forward(EventDataSeq events); }; /** diff --git a/cpp/src/IceStorm/LinkProxy.cpp b/cpp/src/IceStorm/LinkProxy.cpp index a555c40ff59..12e3e94763e 100644 --- a/cpp/src/IceStorm/LinkProxy.cpp +++ b/cpp/src/IceStorm/LinkProxy.cpp @@ -23,7 +23,25 @@ IceStorm::LinkProxy::proxy() const } void -IceStorm::LinkProxy::deliver(const EventPtr& event) +IceStorm::LinkProxy::deliver(const vector<EventPtr>& v) { - _obj->forward(event->op, event->mode, event->data, event->context); + // + // TODO: It would be nice to avoid all this copying. To do this we + // could change EventData from a struct to a class and rename to + // Event, and then remove EventPtr. However, this means EventData + // would have some data that doesn't need to be passed over the + // wire (forward/cost), and the marshaling of the data would be + // marginally slower. + // + vector<EventData> events; + for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p) + { + EventData e; + e.op = (*p)->op; + e.mode = (*p)->mode; + e.data = (*p)->data; + e.context = (*p)->context; + events.push_back(e); + } + _obj->forward(events); } diff --git a/cpp/src/IceStorm/LinkProxy.h b/cpp/src/IceStorm/LinkProxy.h index 226a90b1008..67e0f0276e1 100644 --- a/cpp/src/IceStorm/LinkProxy.h +++ b/cpp/src/IceStorm/LinkProxy.h @@ -29,7 +29,7 @@ public: protected: - virtual void deliver(const EventPtr&); + virtual void deliver(const std::vector<EventPtr>&); private: diff --git a/cpp/src/IceStorm/OnewayProxy.cpp b/cpp/src/IceStorm/OnewayProxy.cpp index e662eb55f48..d52edeee03c 100644 --- a/cpp/src/IceStorm/OnewayProxy.cpp +++ b/cpp/src/IceStorm/OnewayProxy.cpp @@ -23,8 +23,11 @@ IceStorm::OnewayProxy::proxy() const } void -IceStorm::OnewayProxy::deliver(const EventPtr& event) +IceStorm::OnewayProxy::deliver(const vector<EventPtr>& v) { vector<Ice::Byte> dummy; - _obj->ice_invoke(event->op, event->mode, event->data, dummy, event->context); + for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } } diff --git a/cpp/src/IceStorm/OnewayProxy.h b/cpp/src/IceStorm/OnewayProxy.h index 1face10de84..edbf738e6b2 100644 --- a/cpp/src/IceStorm/OnewayProxy.h +++ b/cpp/src/IceStorm/OnewayProxy.h @@ -28,7 +28,7 @@ public: protected: - virtual void deliver(const EventPtr&); + virtual void deliver(const std::vector<EventPtr>&); private: diff --git a/cpp/src/IceStorm/QueuedProxy.cpp b/cpp/src/IceStorm/QueuedProxy.cpp index 0f2e2d233d0..32d8c136c45 100644 --- a/cpp/src/IceStorm/QueuedProxy.cpp +++ b/cpp/src/IceStorm/QueuedProxy.cpp @@ -56,10 +56,7 @@ IceStorm::QueuedProxy::publish(const EventPtr& event) // // Deliver the events without holding the lock. // - for(vector<EventPtr>::iterator p = v.begin(); p != v.end(); ++p) - { - deliver(*p); - } + deliver(v); // // Reacquire the lock before we check the queue again. diff --git a/cpp/src/IceStorm/QueuedProxy.h b/cpp/src/IceStorm/QueuedProxy.h index 23924531bc7..a305117d456 100644 --- a/cpp/src/IceStorm/QueuedProxy.h +++ b/cpp/src/IceStorm/QueuedProxy.h @@ -17,9 +17,9 @@ namespace IceStorm { // -// QueuedProxy encapsulates a subscriber proxy in order to maintain -// a queue of events to be delivered to the subscriber. QueuedProxy -// manages the event queue, but delegates delivery to subsclasses. +// QueuedProxy encapsulates a subscriber proxy in order to maintain a +// queue of events to be delivered to the subscriber. QueuedProxy +// manages the event queue, but delegates delivery to subclasses. // class QueuedProxy : public IceUtil::Shared { @@ -33,7 +33,7 @@ public: protected: - virtual void deliver(const EventPtr&) = 0; + virtual void deliver(const std::vector<EventPtr>&) = 0; IceUtil::Mutex _mutex; std::auto_ptr<Ice::LocalException> _exception; diff --git a/cpp/src/IceStorm/SubscriberFactory.cpp b/cpp/src/IceStorm/SubscriberFactory.cpp index 3b2c820827c..9a2350aca50 100644 --- a/cpp/src/IceStorm/SubscriberFactory.cpp +++ b/cpp/src/IceStorm/SubscriberFactory.cpp @@ -36,26 +36,22 @@ SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost) IceUtil::RecMutex::Lock sync(_proxiesMutex); // - // Delivery to links is done in batch mode. - // - TopicLinkPrx newObj = TopicLinkPrx::uncheckedCast(obj->ice_batchOneway()); - - // - // Check if a queued proxy already exists, or create one if necessary. + // Check if a queued proxy already exists, or create one if + // necessary. // QueuedProxyPtr proxy; - map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(newObj); + map<Ice::ObjectPrx, ProxyInfo>::iterator p = _proxies.find(obj); if(p != _proxies.end()) { proxy = p->second.proxy; } else { - proxy = new LinkProxy(newObj); + proxy = new LinkProxy(obj); ProxyInfo info; info.proxy = proxy; info.count = 0; - _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info)); + _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(obj, info)); } return new LinkSubscriber(this, _communicator, _traceLevels, proxy, cost); diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 8fa062ff000..d7b1a3b98b7 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -65,7 +65,7 @@ public: { } - virtual void forward(const string&, ::Ice::OperationMode, const ByteSeq&, const ContextData&, const Ice::Current&); + virtual void forward(const vector<EventData>&, const Ice::Current&); private: @@ -280,18 +280,20 @@ PublisherProxyI::ice_invoke(const vector< Ice::Byte>& inParams, vector< Ice::Byt // Incoming events from linked topics. // void -TopicLinkI::forward(const string& op, Ice::OperationMode mode, const ByteSeq& data, const ContextData& context, - const Ice::Current& current) +TopicLinkI::forward(const vector<EventData>& v, const Ice::Current& current) { - EventPtr event = new Event; - event->forwarded = true; - event->cost = 0; - event->op = op; - event->mode = mode; - event->data = data; - event->context = context; - - _subscribers->publish(event); + for(vector<EventData>::const_iterator p = v.begin(); p != v.end(); ++p) + { + EventPtr event = new Event; + event->forwarded = true; + event->cost = 0; + event->op = p->op; + event->mode = p->mode; + event->data = p->data; + event->context = p->context; + + _subscribers->publish(event); + } } TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const string& name, diff --git a/cpp/src/IceStorm/TwowayProxy.cpp b/cpp/src/IceStorm/TwowayProxy.cpp index bdf4cc16fa8..a7a458069d9 100644 --- a/cpp/src/IceStorm/TwowayProxy.cpp +++ b/cpp/src/IceStorm/TwowayProxy.cpp @@ -71,12 +71,15 @@ IceStorm::UnorderedTwowayProxy::exception(const Ice::LocalException& ex) } void -IceStorm::UnorderedTwowayProxy::deliver(const EventPtr& event) +IceStorm::UnorderedTwowayProxy::deliver(const vector<EventPtr>& v) { // // TODO: Use a buffer of AMI callback objects to eliminate the dynamic memory allocation? // - _obj->ice_invoke_async(new UnorderedInvokeCB(this), event->op, event->mode, event->data, event->context); + for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke_async(new UnorderedInvokeCB(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); + } } class OrderedInvokeCB : public Ice::AMI_Object_ice_invoke @@ -146,8 +149,10 @@ IceStorm::OrderedTwowayProxy::publish(const EventPtr& event) try { - assert(e); - deliver(e); + assert(e);
+ vector<EventPtr> events;
+ events.push_back(e); + deliver(events); } catch(const Ice::LocalException& ex) { @@ -186,8 +191,10 @@ IceStorm::OrderedTwowayProxy::response() try { - assert(event); - deliver(event); + assert(event);
+ vector<EventPtr> events;
+ events.push_back(event); + deliver(events); } catch(const Ice::LocalException& ex) { @@ -196,12 +203,22 @@ IceStorm::OrderedTwowayProxy::response() } void -IceStorm::OrderedTwowayProxy::deliver(const EventPtr& event) +IceStorm::OrderedTwowayProxy::deliver(const vector<EventPtr>& v) { // - // TODO: Use a buffer of AMI callback objects to eliminate the dynamic memory allocation? (we could - // actually use only 2 AMI callback objects there.) + // Should only ever be called with a queue of length 1 (should + // only be called by methods local to OrderedTwowayProxy). + // + assert(v.size() == 1); + // - _obj->ice_invoke_async(new OrderedInvokeCB(this), event->op, event->mode, event->data, event->context); + // TODO: Use a buffer of AMI callback objects to eliminate the + // dynamic memory allocation? (we could actually use only 2 AMI + // callback objects there.) + // + for(vector<EventPtr>::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke_async(new OrderedInvokeCB(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); + } } diff --git a/cpp/src/IceStorm/TwowayProxy.h b/cpp/src/IceStorm/TwowayProxy.h index 6b65191d63a..e4c4949bd52 100644 --- a/cpp/src/IceStorm/TwowayProxy.h +++ b/cpp/src/IceStorm/TwowayProxy.h @@ -47,7 +47,7 @@ public: protected: - virtual void deliver(const EventPtr&); + virtual void deliver(const std::vector<EventPtr>&); }; @@ -67,7 +67,7 @@ public: protected: - virtual void deliver(const EventPtr&); + virtual void deliver(const std::vector<EventPtr>&); }; typedef IceUtil::Handle<UnorderedTwowayProxy> UnorderedTwowayProxyPtr; |