diff options
39 files changed, 191 insertions, 186 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES index 49cae7c46e9..e9e51d892b5 100644 --- a/cpp/CHANGES +++ b/cpp/CHANGES @@ -1,6 +1,11 @@ Changes since version 1.1.1 --------------------------- +- Removed ice_flush() on the proxy base class. Batch requests are + now flushed by calling Communicator::flushBatchRequests(). This + flushes all requests that are currently batched in the communicator, + (for all connections). + - Added back the connection closure timeout, but only for misbehaving peers. If a timeout is set, and a peer does't react to a close connection message, the connection is forcefully closed after the diff --git a/cpp/demo/Glacier/session/Client.cpp b/cpp/demo/Glacier/session/Client.cpp index ca5af8c37a8..5e47de556d5 100644 --- a/cpp/demo/Glacier/session/Client.cpp +++ b/cpp/demo/Glacier/session/Client.cpp @@ -176,11 +176,7 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator) } else if(c == 'f') { - batchOneway->ice_flush(); - if(!secure) - { - batchDatagram->ice_flush(); - } + communicator->flushBatchRequests(); } else if(c == 'T') { diff --git a/cpp/demo/Ice/callback/Client.cpp b/cpp/demo/Ice/callback/Client.cpp index 45401b1a861..e0b6c667465 100644 --- a/cpp/demo/Ice/callback/Client.cpp +++ b/cpp/demo/Ice/callback/Client.cpp @@ -156,11 +156,7 @@ CallbackClient::run(int argc, char* argv[]) } else if(c == 'f') { - batchOneway->ice_flush(); - if(!secure) - { - batchDatagram->ice_flush(); - } + communicator()->flushBatchRequests(); } else if(c == 'S') { diff --git a/cpp/demo/Ice/hello/Client.cpp b/cpp/demo/Ice/hello/Client.cpp index 66b8e7deed9..1f63bf78ab0 100644 --- a/cpp/demo/Ice/hello/Client.cpp +++ b/cpp/demo/Ice/hello/Client.cpp @@ -107,11 +107,7 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator) } else if(c == 'f') { - batchOneway->ice_flush(); - if(!secure) - { - batchDatagram->ice_flush(); - } + communicator->flushBatchRequests(); } else if(c == 'T') { diff --git a/cpp/demo/IceBox/hello/Client.cpp b/cpp/demo/IceBox/hello/Client.cpp index 95867585949..365267b0799 100644 --- a/cpp/demo/IceBox/hello/Client.cpp +++ b/cpp/demo/IceBox/hello/Client.cpp @@ -106,11 +106,7 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator) } else if(c == 'f') { - batchOneway->ice_flush(); - if(!secure) - { - batchDatagram->ice_flush(); - } + communicator->flushBatchRequests(); } else if(c == 'T') { diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index a1eaab0706d..d6eef4be5f7 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -140,8 +140,6 @@ public: ::Ice::ObjectPrx ice_collocationOptimization(bool) const; ::Ice::ObjectPrx ice_default() const; - void ice_flush(); // Flush batch messages - ::IceInternal::ReferencePtr __reference() const; void __copyFrom(const ::Ice::ObjectPrx&); void __handleException(const ::Ice::LocalException&, int&); @@ -184,7 +182,6 @@ public: virtual void ice_invoke_async(const ::Ice::AMI_Object_ice_invokePtr&, const ::std::string&, ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&, const ::Ice::Context&) = 0; - virtual void ice_flush() = 0; }; } } @@ -208,7 +205,6 @@ public: virtual void ice_invoke_async(const ::Ice::AMI_Object_ice_invokePtr&, const ::std::string&, ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&, const ::Ice::Context&); - virtual void ice_flush(); void __copyFrom(const ::IceInternal::Handle< ::IceDelegateM::Ice::Object>&); @@ -245,8 +241,6 @@ public: virtual void ice_invoke_async(const ::Ice::AMI_Object_ice_invokePtr&, const ::std::string&, ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&, const ::Ice::Context&); - virtual void ice_flush(); - void __copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>&); protected: diff --git a/cpp/src/Glacier/Request.cpp b/cpp/src/Glacier/Request.cpp index ea8867d9752..f3186799d91 100644 --- a/cpp/src/Glacier/Request.cpp +++ b/cpp/src/Glacier/Request.cpp @@ -289,12 +289,7 @@ Glacier::RequestQueue::run() // // This sends all batched missives. // -// sort(proxies.begin(), proxies.end()); -// proxies.erase(unique(proxies.begin(), proxies.end()), proxies.end()); - for(vector<ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) - { - (*q)->ice_flush(); - } + _communicator->flushBatchRequests(); } catch(const Ice::Exception& ex) { diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index 4a156a0fa3f..731576cb23c 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -260,7 +260,7 @@ Ice::CommunicatorI::getPluginManager() void Ice::CommunicatorI::flushBatchRequests() { - // TODO: implement this. + _instance->flushBatchRequests(); } Ice::CommunicatorI::CommunicatorI(int& argc, char* argv[], const PropertiesPtr& properties) : diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 6661b7322b2..ae99f562611 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -380,6 +380,23 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad } } +void +IceInternal::OutgoingConnectionFactory::flushBatchRequests() +{ + list<ConnectionPtr> c; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + for(std::multimap<EndpointPtr, ConnectionPtr>::const_iterator p = _connections.begin(); + p != _connections.end(); + ++p) + { + c.push_back(p->second); + } + } + for_each(c.begin(), c.end(), Ice::voidMemFun(&Connection::flushBatchRequest)); +} + IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) : _instance(instance), _destroyed(false) @@ -493,6 +510,13 @@ IceInternal::IncomingConnectionFactory::connections() const return result; } +void +IceInternal::IncomingConnectionFactory::flushBatchRequests() +{ + list<ConnectionPtr> c = connections(); // connections() is synchronized, so need to synchronize here. + for_each(c.begin(), c.end(), Ice::voidMemFun(&Connection::flushBatchRequest)); +} + bool IceInternal::IncomingConnectionFactory::datagram() const { diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 1f0cfb9ea2f..d79ad31cd74 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -51,6 +51,7 @@ public: ConnectionPtr create(const std::vector<EndpointPtr>&); void setRouter(const ::Ice::RouterPrx&); void removeAdapter(const ::Ice::ObjectAdapterPtr&); + void flushBatchRequests(); private: @@ -78,6 +79,7 @@ public: EndpointPtr endpoint() const; bool equivalent(const EndpointPtr&) const; std::list<ConnectionPtr> connections() const; + void flushBatchRequests(); // // Operations from EventHandler diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index e2ad598d146..52ba8ea430d 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -360,6 +360,24 @@ IceInternal::Instance::messageSizeMax() const return _messageSizeMax; } +void +IceInternal::Instance::flushBatchRequests() +{ + OutgoingConnectionFactoryPtr factory; + std::map<std::string, ::Ice::ObjectAdapterIPtr> adapters; + { + IceUtil::RecMutex::Lock sync(*this); + + factory = _outgoingConnectionFactory; + adapters = _objectAdapterFactory->_adapters; + } + factory->flushBatchRequests(); + for(std::map<std::string, ::Ice::ObjectAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + { + p->second->flushBatchRequests(); + } +} + IceInternal::Instance::Instance(const CommunicatorPtr& communicator, int& argc, char* argv[], const PropertiesPtr& properties) : _destroyed(false), diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index 1f919b35c68..4f31513345f 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -74,6 +74,7 @@ public: DynamicLibraryListPtr dynamicLibraryList(); Ice::PluginManagerPtr pluginManager(); size_t messageSizeMax() const; + void flushBatchRequests(); private: diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index 72d58545ddf..afa026ff9aa 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -466,6 +466,17 @@ Ice::ObjectAdapterI::getIncomingConnections() const } void +Ice::ObjectAdapterI::flushBatchRequests() +{ + std::vector<IceInternal::IncomingConnectionFactoryPtr> f; + { + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + f = _incomingConnectionFactories; + } + for_each(f.begin(), f.end(), Ice::voidMemFun(&IncomingConnectionFactory::flushBatchRequests)); +} + +void Ice::ObjectAdapterI::incDirectCount() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h index 81564d486aa..be3c87c4692 100644 --- a/cpp/src/Ice/ObjectAdapterI.h +++ b/cpp/src/Ice/ObjectAdapterI.h @@ -74,6 +74,8 @@ public: std::list<IceInternal::ConnectionPtr> getIncomingConnections() const; + void flushBatchRequests(); + void incDirectCount(); void decDirectCount(); diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index f43c82b2615..780cff5e38c 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -631,33 +631,6 @@ IceProxy::Ice::Object::ice_default() const } } -void -IceProxy::Ice::Object::ice_flush() -{ - // - // Retry is necessary for ice_flush in case the current connection - // is closed. If that's the case we need to get a new connection. - // - int __cnt = 0; - while(true) - { - try - { - Handle< ::IceDelegate::Ice::Object> __del = __getDelegate(); - __del->ice_flush(); - return; - } - catch(const DatagramLimitException&) - { - throw; // DatagramLimitExcpetion is not repeatable. - } - catch(const LocalException& __ex) - { - __handleException(__ex, __cnt); - } - } -} - ReferencePtr IceProxy::Ice::Object::__reference() const { @@ -1006,12 +979,6 @@ IceDelegateM::Ice::Object::ice_invoke_async(const AMI_Object_ice_invokePtr& cb, } void -IceDelegateM::Ice::Object::ice_flush() -{ - __connection->flushBatchRequest(); -} - -void IceDelegateM::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateM::Ice::Object>& from) { // @@ -1319,12 +1286,6 @@ IceDelegateD::Ice::Object::ice_invoke_async(const AMI_Object_ice_invokePtr&, } void -IceDelegateD::Ice::Object::ice_flush() -{ - // Nothing to do for direct delegates. -} - -void IceDelegateD::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>& from) { // diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp index 4ef6b3374d1..302d9160b2f 100644 --- a/cpp/src/IceStorm/Flusher.cpp +++ b/cpp/src/IceStorm/Flusher.cpp @@ -35,6 +35,7 @@ class FlusherThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::M public: FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) : + _communicator(communicator), _traceLevels(traceLevels), _destroy(false) { @@ -131,7 +132,8 @@ private: // _subscribers.erase(remove_if(_subscribers.begin(), _subscribers.end(), IceUtil::constMemFun(&Flushable::inactive)), _subscribers.end()); - for_each(_subscribers.begin(), _subscribers.end(), IceUtil::voidMemFun(&Flushable::flush)); + + _communicator->flushBatchRequests(); // // Trace after the flush so that the correct number of objects @@ -150,8 +152,8 @@ private: return (_subscribers.empty()) ? 0 : _flushTime; } + Ice::CommunicatorPtr _communicator; TraceLevelsPtr _traceLevels; - FlushableList _subscribers; bool _destroy; long _flushTime; diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp index e753ad38e2b..8eb19e3476f 100644 --- a/cpp/src/IceStorm/LinkSubscriber.cpp +++ b/cpp/src/IceStorm/LinkSubscriber.cpp @@ -20,10 +20,10 @@ using namespace IceStorm; using namespace std; -LinkSubscriber::LinkSubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels, - const QueuedProxyPtr& obj, Ice::Int cost) : +LinkSubscriber::LinkSubscriber(const SubscriberFactoryPtr& factory, const Ice::CommunicatorPtr& communicator, + const TraceLevelsPtr& traceLevels, const QueuedProxyPtr& obj, Ice::Int cost) : Subscriber(traceLevels, obj->proxy()->ice_getIdentity()), - _factory(factory), _obj(obj), _cost(cost) + _factory(factory), _communicator(communicator), _obj(obj), _cost(cost) { _factory->incProxyUsageCount(_obj); } @@ -115,32 +115,7 @@ LinkSubscriber::publish(const EventPtr& event) void LinkSubscriber::flush() { - try - { - _obj->proxy()->ice_flush(); - } - catch(const Ice::ObjectNotExistException& e) - { - // - // ObjectNotExist causes the link to be removed. - // - IceUtil::Mutex::Lock sync(_stateMutex); - _state = StateError; - - if(_traceLevels->subscriber > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << id() << ": link topic flush failed: " << e; - } - } - catch(const Ice::LocalException& e) - { - if(_traceLevels->subscriber > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << id() << ": link topic flush failed: " << e; - } - } + _communicator->flushBatchRequests(); } bool diff --git a/cpp/src/IceStorm/LinkSubscriber.h b/cpp/src/IceStorm/LinkSubscriber.h index 25dc7b6ae96..f5eacff632e 100644 --- a/cpp/src/IceStorm/LinkSubscriber.h +++ b/cpp/src/IceStorm/LinkSubscriber.h @@ -30,7 +30,8 @@ class LinkSubscriber : public Subscriber, public Flushable { public: - LinkSubscriber(const SubscriberFactoryPtr&, const TraceLevelsPtr&, const QueuedProxyPtr&, Ice::Int); + LinkSubscriber(const SubscriberFactoryPtr&, const Ice::CommunicatorPtr&, + const TraceLevelsPtr&, const QueuedProxyPtr&, Ice::Int); ~LinkSubscriber(); virtual bool persistent() const; @@ -46,6 +47,7 @@ private: // Immutable SubscriberFactoryPtr _factory; + Ice::CommunicatorPtr _communicator; QueuedProxyPtr _obj; Ice::Int _cost; }; diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp index d54d223f1f9..e8869cd8e13 100644 --- a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp +++ b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp @@ -20,9 +20,13 @@ using namespace IceStorm; using namespace std; -OnewayBatchSubscriber::OnewayBatchSubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels, - const FlusherPtr& flusher, const QueuedProxyPtr& obj) : +OnewayBatchSubscriber::OnewayBatchSubscriber(const SubscriberFactoryPtr& factory, + const Ice::CommunicatorPtr& communicator, + const TraceLevelsPtr& traceLevels, + const FlusherPtr& flusher, + const QueuedProxyPtr& obj) : OnewaySubscriber(factory, traceLevels, obj), + _communicator(communicator), _flusher(flusher) { _flusher->add(this); @@ -79,28 +83,7 @@ OnewayBatchSubscriber::inactive() const void OnewayBatchSubscriber::flush() { - try - { - _obj->proxy()->ice_flush(); - } - catch(const Ice::LocalException& e) - { - IceUtil::Mutex::Lock sync(_stateMutex); - // - // It's possible that the subscriber was unsubscribed, or - // marked invalid by another thread. Don't display a - // diagnostic in this case. - // - if(_state == StateActive) - { - if(_traceLevels->subscriber > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << id() << ": flush failed: " << e; - } - _state = StateError; - } - } + _communicator->flushBatchRequests(); } bool diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.h b/cpp/src/IceStorm/OnewayBatchSubscriber.h index 2c209551f80..4aef9f36a35 100644 --- a/cpp/src/IceStorm/OnewayBatchSubscriber.h +++ b/cpp/src/IceStorm/OnewayBatchSubscriber.h @@ -32,7 +32,8 @@ class OnewayBatchSubscriber : public OnewaySubscriber, public Flushable { public: - OnewayBatchSubscriber(const SubscriberFactoryPtr&, const TraceLevelsPtr&, const FlusherPtr&, const QueuedProxyPtr&); + OnewayBatchSubscriber(const SubscriberFactoryPtr&, const Ice::CommunicatorPtr&, + const TraceLevelsPtr&, const FlusherPtr&, const QueuedProxyPtr&); ~OnewayBatchSubscriber(); virtual void unsubscribe(); @@ -45,6 +46,7 @@ public: private: + Ice::CommunicatorPtr _communicator; FlusherPtr _flusher; }; diff --git a/cpp/src/IceStorm/SubscriberFactory.cpp b/cpp/src/IceStorm/SubscriberFactory.cpp index 8ce91506925..4aabd860592 100644 --- a/cpp/src/IceStorm/SubscriberFactory.cpp +++ b/cpp/src/IceStorm/SubscriberFactory.cpp @@ -25,7 +25,10 @@ using namespace std; using namespace IceStorm; -SubscriberFactory::SubscriberFactory(const TraceLevelsPtr& traceLevels, const FlusherPtr& flusher) : +SubscriberFactory::SubscriberFactory(const Ice::CommunicatorPtr& communicator, + const TraceLevelsPtr& traceLevels, + const FlusherPtr& flusher) : + _communicator(communicator), _traceLevels(traceLevels), _flusher(flusher) { @@ -59,7 +62,7 @@ SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost) _proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info)); } - return new LinkSubscriber(this, _traceLevels, proxy, cost); + return new LinkSubscriber(this, _communicator, _traceLevels, proxy, cost); } SubscriberPtr @@ -134,7 +137,7 @@ SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj) if(reliability == "batch") { - return new OnewayBatchSubscriber(this, _traceLevels, _flusher, proxy); + return new OnewayBatchSubscriber(this, _communicator, _traceLevels, _flusher, proxy); } else { diff --git a/cpp/src/IceStorm/SubscriberFactory.h b/cpp/src/IceStorm/SubscriberFactory.h index 6c653d31b7d..eeabc8d29a2 100644 --- a/cpp/src/IceStorm/SubscriberFactory.h +++ b/cpp/src/IceStorm/SubscriberFactory.h @@ -43,7 +43,7 @@ class SubscriberFactory : public IceUtil::Shared { public: - SubscriberFactory(const TraceLevelsPtr&, const FlusherPtr&); + SubscriberFactory(const Ice::CommunicatorPtr&, const TraceLevelsPtr&, const FlusherPtr&); // // Create a link subscriber (that is a subscriber that points to @@ -79,6 +79,7 @@ private: Ice::Int count; }; + Ice::CommunicatorPtr _communicator; TraceLevelsPtr _traceLevels; FlusherPtr _flusher; IceUtil::RecMutex _proxiesMutex; diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 2d17084902b..009574b7706 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -35,7 +35,7 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice _topics(_communicator, envName, dbName) { _flusher = new Flusher(_communicator, _traceLevels); - _factory = new SubscriberFactory(_traceLevels, _flusher); + _factory = new SubscriberFactory(_communicator, _traceLevels, _flusher); // // Recreate each of the topics in the dictionary. If the topic diff --git a/java/CHANGES b/java/CHANGES index e054a05b50a..903b9288456 100644 --- a/java/CHANGES +++ b/java/CHANGES @@ -1,6 +1,11 @@ Changes since version 1.1.1 --------------------------- +- Removed ice_flush() on the proxy base class. Batch requests are + now flushed by calling Communicator::flushBatchRequests(). This + flushes all requests that are currently batched in the communicator, + (for all connections). + - Added back the connection closure timeout, but only for misbehaving peers. If a timeout is set, and a peer does't react to a close connection message, the connection is forcefully closed after the diff --git a/java/demo/Glacier/session/Client.java b/java/demo/Glacier/session/Client.java index 9eed44215f2..4bb13e7f926 100644 --- a/java/demo/Glacier/session/Client.java +++ b/java/demo/Glacier/session/Client.java @@ -156,8 +156,7 @@ public class Client } else if(line.equals("f")) { - batchOneway.ice_flush(); - batchDatagram.ice_flush(); + communicator.flushBatchRequests(); } else if(line.equals("T")) { diff --git a/java/demo/Ice/callback/CallbackClient.java b/java/demo/Ice/callback/CallbackClient.java index 46f2c9e8bcc..81fba3f4697 100644 --- a/java/demo/Ice/callback/CallbackClient.java +++ b/java/demo/Ice/callback/CallbackClient.java @@ -130,8 +130,7 @@ class CallbackClient extends Ice.Application } else if(line.equals("f")) { - batchOneway.ice_flush(); - batchDatagram.ice_flush(); + communicator().flushBatchRequests(); } else if(line.equals("S")) { diff --git a/java/demo/Ice/hello/Client.java b/java/demo/Ice/hello/Client.java index c5a75693aba..f21fd38d1fd 100644 --- a/java/demo/Ice/hello/Client.java +++ b/java/demo/Ice/hello/Client.java @@ -97,8 +97,7 @@ public class Client } else if(line.equals("f")) { - batchOneway.ice_flush(); - batchDatagram.ice_flush(); + communicator.flushBatchRequests(); } else if(line.equals("T")) { diff --git a/java/demo/IceBox/hello/Client.java b/java/demo/IceBox/hello/Client.java index 191412c3ebc..ec2d75184a9 100644 --- a/java/demo/IceBox/hello/Client.java +++ b/java/demo/IceBox/hello/Client.java @@ -96,8 +96,7 @@ public class Client } else if(line.equals("f")) { - batchOneway.ice_flush(); - batchDatagram.ice_flush(); + communicator.flushBatchRequests(); } else if(line.equals("T")) { diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 403b4bb9210..df1fda37f88 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -266,7 +266,7 @@ final class CommunicatorI extends LocalObjectImpl implements Communicator public void flushBatchRequests() { - // TODO: implement this. + _instance.flushBatchRequests(); } CommunicatorI(StringSeqHolder args, Properties properties) diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index b91d8119198..4f67e8edc68 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -475,6 +475,21 @@ public final class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapt return arr; } + public void + flushBatchRequests() + { + java.util.ArrayList f; + synchronized(this) + { + f = new java.util.ArrayList(_incomingConnectionFactories); + } + java.util.Iterator i = f.iterator(); + while(i.hasNext()) + { + ((IceInternal.IncomingConnectionFactory)i.next()).flushBatchRequests(); + } + } + public synchronized void incDirectCount() { diff --git a/java/src/Ice/ObjectPrx.java b/java/src/Ice/ObjectPrx.java index c83276dea20..93cc42b99cf 100644 --- a/java/src/Ice/ObjectPrx.java +++ b/java/src/Ice/ObjectPrx.java @@ -71,7 +71,5 @@ public interface ObjectPrx ObjectPrx ice_collocationOptimization(boolean b); ObjectPrx ice_default(); - void ice_flush(); - boolean equals(java.lang.Object r); } diff --git a/java/src/Ice/ObjectPrxHelper.java b/java/src/Ice/ObjectPrxHelper.java index 0bb1e235f61..eeeb66bfbc5 100644 --- a/java/src/Ice/ObjectPrxHelper.java +++ b/java/src/Ice/ObjectPrxHelper.java @@ -527,33 +527,6 @@ public class ObjectPrxHelper implements ObjectPrx } } - public final void - ice_flush() - { - // - // Retry is necessary for ice_flush in case the current connection - // is closed. If that's the case we need to get a new connection. - // - int __cnt = 0; - while(true) - { - try - { - _ObjectDel __del = __getDelegate(); - __del.ice_flush(); - return; - } - catch(Ice.DatagramLimitException ex) - { - throw ex; // DatagramLimitException is not repeatable. - } - catch(LocalException __ex) - { - __cnt = __handleException(__ex, __cnt); - } - } - } - public final boolean equals(java.lang.Object r) { diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java index 5ed377ce3e5..5d929566123 100644 --- a/java/src/Ice/_ObjectDel.java +++ b/java/src/Ice/_ObjectDel.java @@ -37,6 +37,4 @@ public interface _ObjectDel void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams, java.util.Map context); - - void ice_flush(); } diff --git a/java/src/Ice/_ObjectDelD.java b/java/src/Ice/_ObjectDelD.java index 04007735242..cc18359fa7a 100644 --- a/java/src/Ice/_ObjectDelD.java +++ b/java/src/Ice/_ObjectDelD.java @@ -132,12 +132,6 @@ public class _ObjectDelD implements _ObjectDel throw new CollocationOptimizationException(); } - public void - ice_flush() - { - // Nothing to do for direct delegates. - } - // // Only for use by ObjectPrx. // diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index 1bdc8e19e2e..cf3505fc3da 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -185,12 +185,6 @@ public class _ObjectDelM implements _ObjectDel cb.__invoke(); } - public void - ice_flush() - { - __connection.flushBatchRequest(); - } - // // Only for use by ObjectPrx // diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 251e1d1ec5b..adb21c99db8 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -139,6 +139,16 @@ public class IncomingConnectionFactory extends EventHandler return arr; } + public void + flushBatchRequests() + { + Connection[] c = connections(); // connections() is synchronized, so no need to synchronize here. + for(int i = 0; i < c.length; i++) + { + c[i].flushBatchRequest(); + } + } + // // Operations from EventHandler. // diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 638eb2f69d7..99307251ed1 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -258,6 +258,23 @@ public class Instance return _messageSizeMax; } + public void + flushBatchRequests() + { + OutgoingConnectionFactory f; + Ice.ObjectAdapterI adapters[]; + synchronized(this) + { + f = _outgoingConnectionFactory; + adapters = _objectAdapterFactory.adapters(); + } + f.flushBatchRequests(); + for(int i = 0; i < adapters.length; i++) + { + adapters[i].flushBatchRequests(); + } + } + public BufferManager bufferManager() { diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index 659017e0d51..2f139d16740 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -149,6 +149,20 @@ public final class ObjectAdapterFactory return null; } + public synchronized Ice.ObjectAdapterI[] + adapters() + { + java.util.LinkedList a = new java.util.LinkedList(); + java.util.Iterator i = _adapters.values().iterator(); + while(i.hasNext()) + { + a.add(i.next()); + } + Ice.ObjectAdapterI[] arr = new Ice.ObjectAdapterI[a.size()]; + a.toArray(arr); + return arr; + } + // // Only for use by Instance. // diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 052c6791dd3..ef4f4c30d2e 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -393,6 +393,32 @@ public class OutgoingConnectionFactory } } + public void + flushBatchRequests() + { + java.util.LinkedList c = new java.util.LinkedList(); + + synchronized(this) + { + java.util.Iterator p = _connections.values().iterator(); + while(p.hasNext()) + { + java.util.LinkedList connectionList = (java.util.LinkedList)p.next(); + java.util.Iterator q = connectionList.iterator(); + while(q.hasNext()) + { + c.add(q.next()); + } + } + } + + java.util.Iterator p = c.iterator(); + while(p.hasNext()) + { + ((Connection)p.next()).flushBatchRequest(); + } + } + // // Only for use by Instance. // |