summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichi Henning <michi@zeroc.com>2003-08-29 06:51:58 +0000
committerMichi Henning <michi@zeroc.com>2003-08-29 06:51:58 +0000
commit14e46bc24e79df49bd603c08d293c17245b9ca93 (patch)
treec7fc6f422c2a0706ed5e3926a091aa715ee41021
parentconnection closure timeout (diff)
downloadice-14e46bc24e79df49bd603c08d293c17245b9ca93.tar.bz2
ice-14e46bc24e79df49bd603c08d293c17245b9ca93.tar.xz
ice-14e46bc24e79df49bd603c08d293c17245b9ca93.zip
Removed ice_flush() from proxy base class and implemented
Communicator::flushBatchRequests().
-rw-r--r--cpp/CHANGES5
-rw-r--r--cpp/demo/Glacier/session/Client.cpp6
-rw-r--r--cpp/demo/Ice/callback/Client.cpp6
-rw-r--r--cpp/demo/Ice/hello/Client.cpp6
-rw-r--r--cpp/demo/IceBox/hello/Client.cpp6
-rw-r--r--cpp/include/Ice/Proxy.h6
-rw-r--r--cpp/src/Glacier/Request.cpp7
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp2
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp24
-rw-r--r--cpp/src/Ice/ConnectionFactory.h2
-rw-r--r--cpp/src/Ice/Instance.cpp18
-rw-r--r--cpp/src/Ice/Instance.h1
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp11
-rw-r--r--cpp/src/Ice/ObjectAdapterI.h2
-rw-r--r--cpp/src/Ice/Proxy.cpp39
-rw-r--r--cpp/src/IceStorm/Flusher.cpp6
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.cpp33
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.h4
-rw-r--r--cpp/src/IceStorm/OnewayBatchSubscriber.cpp31
-rw-r--r--cpp/src/IceStorm/OnewayBatchSubscriber.h4
-rw-r--r--cpp/src/IceStorm/SubscriberFactory.cpp9
-rw-r--r--cpp/src/IceStorm/SubscriberFactory.h3
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp2
-rw-r--r--java/CHANGES5
-rw-r--r--java/demo/Glacier/session/Client.java3
-rw-r--r--java/demo/Ice/callback/CallbackClient.java3
-rw-r--r--java/demo/Ice/hello/Client.java3
-rw-r--r--java/demo/IceBox/hello/Client.java3
-rw-r--r--java/src/Ice/CommunicatorI.java2
-rw-r--r--java/src/Ice/ObjectAdapterI.java15
-rw-r--r--java/src/Ice/ObjectPrx.java2
-rw-r--r--java/src/Ice/ObjectPrxHelper.java27
-rw-r--r--java/src/Ice/_ObjectDel.java2
-rw-r--r--java/src/Ice/_ObjectDelD.java6
-rw-r--r--java/src/Ice/_ObjectDelM.java6
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java10
-rw-r--r--java/src/IceInternal/Instance.java17
-rw-r--r--java/src/IceInternal/ObjectAdapterFactory.java14
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java26
39 files changed, 191 insertions, 186 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES
index 49cae7c46e9..e9e51d892b5 100644
--- a/cpp/CHANGES
+++ b/cpp/CHANGES
@@ -1,6 +1,11 @@
Changes since version 1.1.1
---------------------------
+- Removed ice_flush() on the proxy base class. Batch requests are
+ now flushed by calling Communicator::flushBatchRequests(). This
+ flushes all requests that are currently batched in the communicator,
+ (for all connections).
+
- Added back the connection closure timeout, but only for misbehaving
peers. If a timeout is set, and a peer does't react to a close
connection message, the connection is forcefully closed after the
diff --git a/cpp/demo/Glacier/session/Client.cpp b/cpp/demo/Glacier/session/Client.cpp
index ca5af8c37a8..5e47de556d5 100644
--- a/cpp/demo/Glacier/session/Client.cpp
+++ b/cpp/demo/Glacier/session/Client.cpp
@@ -176,11 +176,7 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator)
}
else if(c == 'f')
{
- batchOneway->ice_flush();
- if(!secure)
- {
- batchDatagram->ice_flush();
- }
+ communicator->flushBatchRequests();
}
else if(c == 'T')
{
diff --git a/cpp/demo/Ice/callback/Client.cpp b/cpp/demo/Ice/callback/Client.cpp
index 45401b1a861..e0b6c667465 100644
--- a/cpp/demo/Ice/callback/Client.cpp
+++ b/cpp/demo/Ice/callback/Client.cpp
@@ -156,11 +156,7 @@ CallbackClient::run(int argc, char* argv[])
}
else if(c == 'f')
{
- batchOneway->ice_flush();
- if(!secure)
- {
- batchDatagram->ice_flush();
- }
+ communicator()->flushBatchRequests();
}
else if(c == 'S')
{
diff --git a/cpp/demo/Ice/hello/Client.cpp b/cpp/demo/Ice/hello/Client.cpp
index 66b8e7deed9..1f63bf78ab0 100644
--- a/cpp/demo/Ice/hello/Client.cpp
+++ b/cpp/demo/Ice/hello/Client.cpp
@@ -107,11 +107,7 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator)
}
else if(c == 'f')
{
- batchOneway->ice_flush();
- if(!secure)
- {
- batchDatagram->ice_flush();
- }
+ communicator->flushBatchRequests();
}
else if(c == 'T')
{
diff --git a/cpp/demo/IceBox/hello/Client.cpp b/cpp/demo/IceBox/hello/Client.cpp
index 95867585949..365267b0799 100644
--- a/cpp/demo/IceBox/hello/Client.cpp
+++ b/cpp/demo/IceBox/hello/Client.cpp
@@ -106,11 +106,7 @@ run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator)
}
else if(c == 'f')
{
- batchOneway->ice_flush();
- if(!secure)
- {
- batchDatagram->ice_flush();
- }
+ communicator->flushBatchRequests();
}
else if(c == 'T')
{
diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h
index a1eaab0706d..d6eef4be5f7 100644
--- a/cpp/include/Ice/Proxy.h
+++ b/cpp/include/Ice/Proxy.h
@@ -140,8 +140,6 @@ public:
::Ice::ObjectPrx ice_collocationOptimization(bool) const;
::Ice::ObjectPrx ice_default() const;
- void ice_flush(); // Flush batch messages
-
::IceInternal::ReferencePtr __reference() const;
void __copyFrom(const ::Ice::ObjectPrx&);
void __handleException(const ::Ice::LocalException&, int&);
@@ -184,7 +182,6 @@ public:
virtual void ice_invoke_async(const ::Ice::AMI_Object_ice_invokePtr&,
const ::std::string&, ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&,
const ::Ice::Context&) = 0;
- virtual void ice_flush() = 0;
};
} }
@@ -208,7 +205,6 @@ public:
virtual void ice_invoke_async(const ::Ice::AMI_Object_ice_invokePtr&,
const ::std::string&, ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&,
const ::Ice::Context&);
- virtual void ice_flush();
void __copyFrom(const ::IceInternal::Handle< ::IceDelegateM::Ice::Object>&);
@@ -245,8 +241,6 @@ public:
virtual void ice_invoke_async(const ::Ice::AMI_Object_ice_invokePtr&,
const ::std::string&, ::Ice::OperationMode, const ::std::vector< ::Ice::Byte>&,
const ::Ice::Context&);
- virtual void ice_flush();
-
void __copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>&);
protected:
diff --git a/cpp/src/Glacier/Request.cpp b/cpp/src/Glacier/Request.cpp
index ea8867d9752..f3186799d91 100644
--- a/cpp/src/Glacier/Request.cpp
+++ b/cpp/src/Glacier/Request.cpp
@@ -289,12 +289,7 @@ Glacier::RequestQueue::run()
//
// This sends all batched missives.
//
-// sort(proxies.begin(), proxies.end());
-// proxies.erase(unique(proxies.begin(), proxies.end()), proxies.end());
- for(vector<ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
- {
- (*q)->ice_flush();
- }
+ _communicator->flushBatchRequests();
}
catch(const Ice::Exception& ex)
{
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index 4a156a0fa3f..731576cb23c 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -260,7 +260,7 @@ Ice::CommunicatorI::getPluginManager()
void
Ice::CommunicatorI::flushBatchRequests()
{
- // TODO: implement this.
+ _instance->flushBatchRequests();
}
Ice::CommunicatorI::CommunicatorI(int& argc, char* argv[], const PropertiesPtr& properties) :
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 6661b7322b2..ae99f562611 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -380,6 +380,23 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
}
}
+void
+IceInternal::OutgoingConnectionFactory::flushBatchRequests()
+{
+ list<ConnectionPtr> c;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ for(std::multimap<EndpointPtr, ConnectionPtr>::const_iterator p = _connections.begin();
+ p != _connections.end();
+ ++p)
+ {
+ c.push_back(p->second);
+ }
+ }
+ for_each(c.begin(), c.end(), Ice::voidMemFun(&Connection::flushBatchRequest));
+}
+
IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const InstancePtr& instance) :
_instance(instance),
_destroyed(false)
@@ -493,6 +510,13 @@ IceInternal::IncomingConnectionFactory::connections() const
return result;
}
+void
+IceInternal::IncomingConnectionFactory::flushBatchRequests()
+{
+ list<ConnectionPtr> c = connections(); // connections() is synchronized, so need to synchronize here.
+ for_each(c.begin(), c.end(), Ice::voidMemFun(&Connection::flushBatchRequest));
+}
+
bool
IceInternal::IncomingConnectionFactory::datagram() const
{
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 1f0cfb9ea2f..d79ad31cd74 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -51,6 +51,7 @@ public:
ConnectionPtr create(const std::vector<EndpointPtr>&);
void setRouter(const ::Ice::RouterPrx&);
void removeAdapter(const ::Ice::ObjectAdapterPtr&);
+ void flushBatchRequests();
private:
@@ -78,6 +79,7 @@ public:
EndpointPtr endpoint() const;
bool equivalent(const EndpointPtr&) const;
std::list<ConnectionPtr> connections() const;
+ void flushBatchRequests();
//
// Operations from EventHandler
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index e2ad598d146..52ba8ea430d 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -360,6 +360,24 @@ IceInternal::Instance::messageSizeMax() const
return _messageSizeMax;
}
+void
+IceInternal::Instance::flushBatchRequests()
+{
+ OutgoingConnectionFactoryPtr factory;
+ std::map<std::string, ::Ice::ObjectAdapterIPtr> adapters;
+ {
+ IceUtil::RecMutex::Lock sync(*this);
+
+ factory = _outgoingConnectionFactory;
+ adapters = _objectAdapterFactory->_adapters;
+ }
+ factory->flushBatchRequests();
+ for(std::map<std::string, ::Ice::ObjectAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
+ {
+ p->second->flushBatchRequests();
+ }
+}
+
IceInternal::Instance::Instance(const CommunicatorPtr& communicator, int& argc, char* argv[],
const PropertiesPtr& properties) :
_destroyed(false),
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index 1f919b35c68..4f31513345f 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -74,6 +74,7 @@ public:
DynamicLibraryListPtr dynamicLibraryList();
Ice::PluginManagerPtr pluginManager();
size_t messageSizeMax() const;
+ void flushBatchRequests();
private:
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index 72d58545ddf..afa026ff9aa 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -466,6 +466,17 @@ Ice::ObjectAdapterI::getIncomingConnections() const
}
void
+Ice::ObjectAdapterI::flushBatchRequests()
+{
+ std::vector<IceInternal::IncomingConnectionFactoryPtr> f;
+ {
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ f = _incomingConnectionFactories;
+ }
+ for_each(f.begin(), f.end(), Ice::voidMemFun(&IncomingConnectionFactory::flushBatchRequests));
+}
+
+void
Ice::ObjectAdapterI::incDirectCount()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h
index 81564d486aa..be3c87c4692 100644
--- a/cpp/src/Ice/ObjectAdapterI.h
+++ b/cpp/src/Ice/ObjectAdapterI.h
@@ -74,6 +74,8 @@ public:
std::list<IceInternal::ConnectionPtr> getIncomingConnections() const;
+ void flushBatchRequests();
+
void incDirectCount();
void decDirectCount();
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index f43c82b2615..780cff5e38c 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -631,33 +631,6 @@ IceProxy::Ice::Object::ice_default() const
}
}
-void
-IceProxy::Ice::Object::ice_flush()
-{
- //
- // Retry is necessary for ice_flush in case the current connection
- // is closed. If that's the case we need to get a new connection.
- //
- int __cnt = 0;
- while(true)
- {
- try
- {
- Handle< ::IceDelegate::Ice::Object> __del = __getDelegate();
- __del->ice_flush();
- return;
- }
- catch(const DatagramLimitException&)
- {
- throw; // DatagramLimitExcpetion is not repeatable.
- }
- catch(const LocalException& __ex)
- {
- __handleException(__ex, __cnt);
- }
- }
-}
-
ReferencePtr
IceProxy::Ice::Object::__reference() const
{
@@ -1006,12 +979,6 @@ IceDelegateM::Ice::Object::ice_invoke_async(const AMI_Object_ice_invokePtr& cb,
}
void
-IceDelegateM::Ice::Object::ice_flush()
-{
- __connection->flushBatchRequest();
-}
-
-void
IceDelegateM::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateM::Ice::Object>& from)
{
//
@@ -1319,12 +1286,6 @@ IceDelegateD::Ice::Object::ice_invoke_async(const AMI_Object_ice_invokePtr&,
}
void
-IceDelegateD::Ice::Object::ice_flush()
-{
- // Nothing to do for direct delegates.
-}
-
-void
IceDelegateD::Ice::Object::__copyFrom(const ::IceInternal::Handle< ::IceDelegateD::Ice::Object>& from)
{
//
diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp
index 4ef6b3374d1..302d9160b2f 100644
--- a/cpp/src/IceStorm/Flusher.cpp
+++ b/cpp/src/IceStorm/Flusher.cpp
@@ -35,6 +35,7 @@ class FlusherThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::M
public:
FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) :
+ _communicator(communicator),
_traceLevels(traceLevels),
_destroy(false)
{
@@ -131,7 +132,8 @@ private:
//
_subscribers.erase(remove_if(_subscribers.begin(), _subscribers.end(),
IceUtil::constMemFun(&Flushable::inactive)), _subscribers.end());
- for_each(_subscribers.begin(), _subscribers.end(), IceUtil::voidMemFun(&Flushable::flush));
+
+ _communicator->flushBatchRequests();
//
// Trace after the flush so that the correct number of objects
@@ -150,8 +152,8 @@ private:
return (_subscribers.empty()) ? 0 : _flushTime;
}
+ Ice::CommunicatorPtr _communicator;
TraceLevelsPtr _traceLevels;
-
FlushableList _subscribers;
bool _destroy;
long _flushTime;
diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp
index e753ad38e2b..8eb19e3476f 100644
--- a/cpp/src/IceStorm/LinkSubscriber.cpp
+++ b/cpp/src/IceStorm/LinkSubscriber.cpp
@@ -20,10 +20,10 @@
using namespace IceStorm;
using namespace std;
-LinkSubscriber::LinkSubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels,
- const QueuedProxyPtr& obj, Ice::Int cost) :
+LinkSubscriber::LinkSubscriber(const SubscriberFactoryPtr& factory, const Ice::CommunicatorPtr& communicator,
+ const TraceLevelsPtr& traceLevels, const QueuedProxyPtr& obj, Ice::Int cost) :
Subscriber(traceLevels, obj->proxy()->ice_getIdentity()),
- _factory(factory), _obj(obj), _cost(cost)
+ _factory(factory), _communicator(communicator), _obj(obj), _cost(cost)
{
_factory->incProxyUsageCount(_obj);
}
@@ -115,32 +115,7 @@ LinkSubscriber::publish(const EventPtr& event)
void
LinkSubscriber::flush()
{
- try
- {
- _obj->proxy()->ice_flush();
- }
- catch(const Ice::ObjectNotExistException& e)
- {
- //
- // ObjectNotExist causes the link to be removed.
- //
- IceUtil::Mutex::Lock sync(_stateMutex);
- _state = StateError;
-
- if(_traceLevels->subscriber > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << id() << ": link topic flush failed: " << e;
- }
- }
- catch(const Ice::LocalException& e)
- {
- if(_traceLevels->subscriber > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << id() << ": link topic flush failed: " << e;
- }
- }
+ _communicator->flushBatchRequests();
}
bool
diff --git a/cpp/src/IceStorm/LinkSubscriber.h b/cpp/src/IceStorm/LinkSubscriber.h
index 25dc7b6ae96..f5eacff632e 100644
--- a/cpp/src/IceStorm/LinkSubscriber.h
+++ b/cpp/src/IceStorm/LinkSubscriber.h
@@ -30,7 +30,8 @@ class LinkSubscriber : public Subscriber, public Flushable
{
public:
- LinkSubscriber(const SubscriberFactoryPtr&, const TraceLevelsPtr&, const QueuedProxyPtr&, Ice::Int);
+ LinkSubscriber(const SubscriberFactoryPtr&, const Ice::CommunicatorPtr&,
+ const TraceLevelsPtr&, const QueuedProxyPtr&, Ice::Int);
~LinkSubscriber();
virtual bool persistent() const;
@@ -46,6 +47,7 @@ private:
// Immutable
SubscriberFactoryPtr _factory;
+ Ice::CommunicatorPtr _communicator;
QueuedProxyPtr _obj;
Ice::Int _cost;
};
diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
index d54d223f1f9..e8869cd8e13 100644
--- a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
+++ b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
@@ -20,9 +20,13 @@
using namespace IceStorm;
using namespace std;
-OnewayBatchSubscriber::OnewayBatchSubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels,
- const FlusherPtr& flusher, const QueuedProxyPtr& obj) :
+OnewayBatchSubscriber::OnewayBatchSubscriber(const SubscriberFactoryPtr& factory,
+ const Ice::CommunicatorPtr& communicator,
+ const TraceLevelsPtr& traceLevels,
+ const FlusherPtr& flusher,
+ const QueuedProxyPtr& obj) :
OnewaySubscriber(factory, traceLevels, obj),
+ _communicator(communicator),
_flusher(flusher)
{
_flusher->add(this);
@@ -79,28 +83,7 @@ OnewayBatchSubscriber::inactive() const
void
OnewayBatchSubscriber::flush()
{
- try
- {
- _obj->proxy()->ice_flush();
- }
- catch(const Ice::LocalException& e)
- {
- IceUtil::Mutex::Lock sync(_stateMutex);
- //
- // It's possible that the subscriber was unsubscribed, or
- // marked invalid by another thread. Don't display a
- // diagnostic in this case.
- //
- if(_state == StateActive)
- {
- if(_traceLevels->subscriber > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << id() << ": flush failed: " << e;
- }
- _state = StateError;
- }
- }
+ _communicator->flushBatchRequests();
}
bool
diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.h b/cpp/src/IceStorm/OnewayBatchSubscriber.h
index 2c209551f80..4aef9f36a35 100644
--- a/cpp/src/IceStorm/OnewayBatchSubscriber.h
+++ b/cpp/src/IceStorm/OnewayBatchSubscriber.h
@@ -32,7 +32,8 @@ class OnewayBatchSubscriber : public OnewaySubscriber, public Flushable
{
public:
- OnewayBatchSubscriber(const SubscriberFactoryPtr&, const TraceLevelsPtr&, const FlusherPtr&, const QueuedProxyPtr&);
+ OnewayBatchSubscriber(const SubscriberFactoryPtr&, const Ice::CommunicatorPtr&,
+ const TraceLevelsPtr&, const FlusherPtr&, const QueuedProxyPtr&);
~OnewayBatchSubscriber();
virtual void unsubscribe();
@@ -45,6 +46,7 @@ public:
private:
+ Ice::CommunicatorPtr _communicator;
FlusherPtr _flusher;
};
diff --git a/cpp/src/IceStorm/SubscriberFactory.cpp b/cpp/src/IceStorm/SubscriberFactory.cpp
index 8ce91506925..4aabd860592 100644
--- a/cpp/src/IceStorm/SubscriberFactory.cpp
+++ b/cpp/src/IceStorm/SubscriberFactory.cpp
@@ -25,7 +25,10 @@
using namespace std;
using namespace IceStorm;
-SubscriberFactory::SubscriberFactory(const TraceLevelsPtr& traceLevels, const FlusherPtr& flusher) :
+SubscriberFactory::SubscriberFactory(const Ice::CommunicatorPtr& communicator,
+ const TraceLevelsPtr& traceLevels,
+ const FlusherPtr& flusher) :
+ _communicator(communicator),
_traceLevels(traceLevels),
_flusher(flusher)
{
@@ -59,7 +62,7 @@ SubscriberFactory::createLinkSubscriber(const TopicLinkPrx& obj, Ice::Int cost)
_proxies.insert(pair<Ice::ObjectPrx, ProxyInfo>(newObj, info));
}
- return new LinkSubscriber(this, _traceLevels, proxy, cost);
+ return new LinkSubscriber(this, _communicator, _traceLevels, proxy, cost);
}
SubscriberPtr
@@ -134,7 +137,7 @@ SubscriberFactory::createSubscriber(const QoS& qos, const Ice::ObjectPrx& obj)
if(reliability == "batch")
{
- return new OnewayBatchSubscriber(this, _traceLevels, _flusher, proxy);
+ return new OnewayBatchSubscriber(this, _communicator, _traceLevels, _flusher, proxy);
}
else
{
diff --git a/cpp/src/IceStorm/SubscriberFactory.h b/cpp/src/IceStorm/SubscriberFactory.h
index 6c653d31b7d..eeabc8d29a2 100644
--- a/cpp/src/IceStorm/SubscriberFactory.h
+++ b/cpp/src/IceStorm/SubscriberFactory.h
@@ -43,7 +43,7 @@ class SubscriberFactory : public IceUtil::Shared
{
public:
- SubscriberFactory(const TraceLevelsPtr&, const FlusherPtr&);
+ SubscriberFactory(const Ice::CommunicatorPtr&, const TraceLevelsPtr&, const FlusherPtr&);
//
// Create a link subscriber (that is a subscriber that points to
@@ -79,6 +79,7 @@ private:
Ice::Int count;
};
+ Ice::CommunicatorPtr _communicator;
TraceLevelsPtr _traceLevels;
FlusherPtr _flusher;
IceUtil::RecMutex _proxiesMutex;
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 2d17084902b..009574b7706 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -35,7 +35,7 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice
_topics(_communicator, envName, dbName)
{
_flusher = new Flusher(_communicator, _traceLevels);
- _factory = new SubscriberFactory(_traceLevels, _flusher);
+ _factory = new SubscriberFactory(_communicator, _traceLevels, _flusher);
//
// Recreate each of the topics in the dictionary. If the topic
diff --git a/java/CHANGES b/java/CHANGES
index e054a05b50a..903b9288456 100644
--- a/java/CHANGES
+++ b/java/CHANGES
@@ -1,6 +1,11 @@
Changes since version 1.1.1
---------------------------
+- Removed ice_flush() on the proxy base class. Batch requests are
+ now flushed by calling Communicator::flushBatchRequests(). This
+ flushes all requests that are currently batched in the communicator,
+ (for all connections).
+
- Added back the connection closure timeout, but only for misbehaving
peers. If a timeout is set, and a peer does't react to a close
connection message, the connection is forcefully closed after the
diff --git a/java/demo/Glacier/session/Client.java b/java/demo/Glacier/session/Client.java
index 9eed44215f2..4bb13e7f926 100644
--- a/java/demo/Glacier/session/Client.java
+++ b/java/demo/Glacier/session/Client.java
@@ -156,8 +156,7 @@ public class Client
}
else if(line.equals("f"))
{
- batchOneway.ice_flush();
- batchDatagram.ice_flush();
+ communicator.flushBatchRequests();
}
else if(line.equals("T"))
{
diff --git a/java/demo/Ice/callback/CallbackClient.java b/java/demo/Ice/callback/CallbackClient.java
index 46f2c9e8bcc..81fba3f4697 100644
--- a/java/demo/Ice/callback/CallbackClient.java
+++ b/java/demo/Ice/callback/CallbackClient.java
@@ -130,8 +130,7 @@ class CallbackClient extends Ice.Application
}
else if(line.equals("f"))
{
- batchOneway.ice_flush();
- batchDatagram.ice_flush();
+ communicator().flushBatchRequests();
}
else if(line.equals("S"))
{
diff --git a/java/demo/Ice/hello/Client.java b/java/demo/Ice/hello/Client.java
index c5a75693aba..f21fd38d1fd 100644
--- a/java/demo/Ice/hello/Client.java
+++ b/java/demo/Ice/hello/Client.java
@@ -97,8 +97,7 @@ public class Client
}
else if(line.equals("f"))
{
- batchOneway.ice_flush();
- batchDatagram.ice_flush();
+ communicator.flushBatchRequests();
}
else if(line.equals("T"))
{
diff --git a/java/demo/IceBox/hello/Client.java b/java/demo/IceBox/hello/Client.java
index 191412c3ebc..ec2d75184a9 100644
--- a/java/demo/IceBox/hello/Client.java
+++ b/java/demo/IceBox/hello/Client.java
@@ -96,8 +96,7 @@ public class Client
}
else if(line.equals("f"))
{
- batchOneway.ice_flush();
- batchDatagram.ice_flush();
+ communicator.flushBatchRequests();
}
else if(line.equals("T"))
{
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 403b4bb9210..df1fda37f88 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -266,7 +266,7 @@ final class CommunicatorI extends LocalObjectImpl implements Communicator
public void
flushBatchRequests()
{
- // TODO: implement this.
+ _instance.flushBatchRequests();
}
CommunicatorI(StringSeqHolder args, Properties properties)
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index b91d8119198..4f67e8edc68 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -475,6 +475,21 @@ public final class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapt
return arr;
}
+ public void
+ flushBatchRequests()
+ {
+ java.util.ArrayList f;
+ synchronized(this)
+ {
+ f = new java.util.ArrayList(_incomingConnectionFactories);
+ }
+ java.util.Iterator i = f.iterator();
+ while(i.hasNext())
+ {
+ ((IceInternal.IncomingConnectionFactory)i.next()).flushBatchRequests();
+ }
+ }
+
public synchronized void
incDirectCount()
{
diff --git a/java/src/Ice/ObjectPrx.java b/java/src/Ice/ObjectPrx.java
index c83276dea20..93cc42b99cf 100644
--- a/java/src/Ice/ObjectPrx.java
+++ b/java/src/Ice/ObjectPrx.java
@@ -71,7 +71,5 @@ public interface ObjectPrx
ObjectPrx ice_collocationOptimization(boolean b);
ObjectPrx ice_default();
- void ice_flush();
-
boolean equals(java.lang.Object r);
}
diff --git a/java/src/Ice/ObjectPrxHelper.java b/java/src/Ice/ObjectPrxHelper.java
index 0bb1e235f61..eeeb66bfbc5 100644
--- a/java/src/Ice/ObjectPrxHelper.java
+++ b/java/src/Ice/ObjectPrxHelper.java
@@ -527,33 +527,6 @@ public class ObjectPrxHelper implements ObjectPrx
}
}
- public final void
- ice_flush()
- {
- //
- // Retry is necessary for ice_flush in case the current connection
- // is closed. If that's the case we need to get a new connection.
- //
- int __cnt = 0;
- while(true)
- {
- try
- {
- _ObjectDel __del = __getDelegate();
- __del.ice_flush();
- return;
- }
- catch(Ice.DatagramLimitException ex)
- {
- throw ex; // DatagramLimitException is not repeatable.
- }
- catch(LocalException __ex)
- {
- __cnt = __handleException(__ex, __cnt);
- }
- }
- }
-
public final boolean
equals(java.lang.Object r)
{
diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java
index 5ed377ce3e5..5d929566123 100644
--- a/java/src/Ice/_ObjectDel.java
+++ b/java/src/Ice/_ObjectDel.java
@@ -37,6 +37,4 @@ public interface _ObjectDel
void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams,
java.util.Map context);
-
- void ice_flush();
}
diff --git a/java/src/Ice/_ObjectDelD.java b/java/src/Ice/_ObjectDelD.java
index 04007735242..cc18359fa7a 100644
--- a/java/src/Ice/_ObjectDelD.java
+++ b/java/src/Ice/_ObjectDelD.java
@@ -132,12 +132,6 @@ public class _ObjectDelD implements _ObjectDel
throw new CollocationOptimizationException();
}
- public void
- ice_flush()
- {
- // Nothing to do for direct delegates.
- }
-
//
// Only for use by ObjectPrx.
//
diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java
index 1bdc8e19e2e..cf3505fc3da 100644
--- a/java/src/Ice/_ObjectDelM.java
+++ b/java/src/Ice/_ObjectDelM.java
@@ -185,12 +185,6 @@ public class _ObjectDelM implements _ObjectDel
cb.__invoke();
}
- public void
- ice_flush()
- {
- __connection.flushBatchRequest();
- }
-
//
// Only for use by ObjectPrx
//
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 251e1d1ec5b..adb21c99db8 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -139,6 +139,16 @@ public class IncomingConnectionFactory extends EventHandler
return arr;
}
+ public void
+ flushBatchRequests()
+ {
+ Connection[] c = connections(); // connections() is synchronized, so no need to synchronize here.
+ for(int i = 0; i < c.length; i++)
+ {
+ c[i].flushBatchRequest();
+ }
+ }
+
//
// Operations from EventHandler.
//
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 638eb2f69d7..99307251ed1 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -258,6 +258,23 @@ public class Instance
return _messageSizeMax;
}
+ public void
+ flushBatchRequests()
+ {
+ OutgoingConnectionFactory f;
+ Ice.ObjectAdapterI adapters[];
+ synchronized(this)
+ {
+ f = _outgoingConnectionFactory;
+ adapters = _objectAdapterFactory.adapters();
+ }
+ f.flushBatchRequests();
+ for(int i = 0; i < adapters.length; i++)
+ {
+ adapters[i].flushBatchRequests();
+ }
+ }
+
public BufferManager
bufferManager()
{
diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java
index 659017e0d51..2f139d16740 100644
--- a/java/src/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/IceInternal/ObjectAdapterFactory.java
@@ -149,6 +149,20 @@ public final class ObjectAdapterFactory
return null;
}
+ public synchronized Ice.ObjectAdapterI[]
+ adapters()
+ {
+ java.util.LinkedList a = new java.util.LinkedList();
+ java.util.Iterator i = _adapters.values().iterator();
+ while(i.hasNext())
+ {
+ a.add(i.next());
+ }
+ Ice.ObjectAdapterI[] arr = new Ice.ObjectAdapterI[a.size()];
+ a.toArray(arr);
+ return arr;
+ }
+
//
// Only for use by Instance.
//
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 052c6791dd3..ef4f4c30d2e 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -393,6 +393,32 @@ public class OutgoingConnectionFactory
}
}
+ public void
+ flushBatchRequests()
+ {
+ java.util.LinkedList c = new java.util.LinkedList();
+
+ synchronized(this)
+ {
+ java.util.Iterator p = _connections.values().iterator();
+ while(p.hasNext())
+ {
+ java.util.LinkedList connectionList = (java.util.LinkedList)p.next();
+ java.util.Iterator q = connectionList.iterator();
+ while(q.hasNext())
+ {
+ c.add(q.next());
+ }
+ }
+ }
+
+ java.util.Iterator p = c.iterator();
+ while(p.hasNext())
+ {
+ ((Connection)p.next()).flushBatchRequest();
+ }
+ }
+
//
// Only for use by Instance.
//