diff options
64 files changed, 1227 insertions, 415 deletions
@@ -27,6 +27,11 @@ Changes since version 3.5.1 General Changes =============== +- Ice now guarantees that invocations made on different proxy objects + will be serialized if the proxies are equal and if connection + caching is enabled. Previously, invocations could be sent out of + order if the connection wasn't established. + - IceGrid now permits removing or moving servers when saving an application "without restart" (to ensure the update doesn't cause any server shutdown). The servers to be removed or moved, must be diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index 0600916d324..503ff2b5870 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -852,8 +852,6 @@ protected: private: - ::IceInternal::RequestHandlerPtr createRequestHandler(); - bool ice_isA(const ::std::string&, const ::Ice::Context*); ::Ice::AsyncResultPtr begin_ice_isA(const ::std::string&, const ::Ice::Context*, diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index f97fadf6c02..543a1b50153 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -153,7 +153,7 @@ CollocatedRequestHandler::~CollocatedRequestHandler() } RequestHandlerPtr -CollocatedRequestHandler::connect() +CollocatedRequestHandler::connect(const Ice::ObjectPrx&) { return this; } diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index 3930c12ce1c..8561ba6eb2e 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -43,7 +43,7 @@ public: CollocatedRequestHandler(const ReferencePtr&, const Ice::ObjectAdapterPtr&); virtual ~CollocatedRequestHandler(); - virtual RequestHandlerPtr connect(); + virtual RequestHandlerPtr connect(const Ice::ObjectPrx&); virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&); virtual void prepareBatchRequest(BasicStream*); diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index a1458484a57..55f1c508d5a 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -9,6 +9,7 @@ #include <Ice/ConnectRequestHandler.h> #include <Ice/ConnectionRequestHandler.h> +#include <Ice/RequestHandlerFactory.h> #include <Ice/Instance.h> #include <Ice/Proxy.h> #include <Ice/ConnectionI.h> @@ -31,8 +32,7 @@ ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice: _flushing(false), _batchRequestInProgress(false), _batchRequestsSize(sizeof(requestBatchHdr)), - _batchStream(ref->getInstance().get(), Ice::currentProtocolEncoding, _batchAutoFlush), - _updateRequestHandler(false) + _batchStream(ref->getInstance().get(), Ice::currentProtocolEncoding, _batchAutoFlush) { } @@ -41,17 +41,23 @@ ConnectRequestHandler::~ConnectRequestHandler() } RequestHandlerPtr -ConnectRequestHandler::connect() +ConnectRequestHandler::connect(const Ice::ObjectPrx& proxy) { - Ice::ObjectPrx proxy = _proxy; - try + // + // Initiate the connection if connect() is called by the proxy that + // created the handler. + // + if(proxy.get() == _proxy.get()) { _reference->getConnection(this); + } + try + { Lock sync(*this); if(!initialized()) { - _updateRequestHandler = true; // The proxy request handler will be updated when the connection is set. + _proxies.push_back(proxy); return this; } } @@ -61,11 +67,15 @@ ConnectRequestHandler::connect() throw; } - assert(_connection); - - RequestHandlerPtr handler = new ConnectionRequestHandler(_reference, _connection, _compress); - proxy->__setRequestHandler(this, handler); - return handler; + if(_connectionRequestHandler) + { + proxy->__setRequestHandler(this, _connectionRequestHandler); + return _connectionRequestHandler; + } + else + { + return this; + } } RequestHandlerPtr @@ -335,15 +345,19 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) Lock sync(*this); assert(!_initialized && !_exception.get()); _exception.reset(ex.ice_clone()); + _proxies.clear(); _proxy = 0; // Break cyclic reference count. - // - // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be - // called. - // flushRequestsWithException(); + try + { + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + } + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore + } notifyAll(); } @@ -467,15 +481,18 @@ ConnectRequestHandler::flushRequests() } // - // We've finished sending the queued requests and the request handler now sends - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. + // If we aren't caching the connection, don't bother creating a + // connection request handler. Otherwise, update the proxies + // request handler to use the more efficient connection request + // handler. // - if(_updateRequestHandler && !_exception.get()) + if(_reference->getCacheConnection() && !_exception.get()) { - _proxy->__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress)); + _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + for(vector<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) + { + (*p)->__setRequestHandler(this, _connectionRequestHandler); + } } { @@ -486,6 +503,15 @@ ConnectRequestHandler::flushRequests() _initialized = true; _flushing = false; } + try + { + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + } + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore + } + _proxies.clear(); _proxy = 0; // Break cyclic reference count. notifyAll(); } diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index c5bc6602766..53c9e33e070 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -35,7 +35,7 @@ public: ConnectRequestHandler(const ReferencePtr&, const Ice::ObjectPrx&); virtual ~ConnectRequestHandler(); - virtual RequestHandlerPtr connect(); + virtual RequestHandlerPtr connect(const Ice::ObjectPrx&); virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&); virtual void prepareBatchRequest(BasicStream*); @@ -75,6 +75,7 @@ private: }; Ice::ObjectPrx _proxy; + std::vector<Ice::ObjectPrx> _proxies; const bool _batchAutoFlush; @@ -88,7 +89,8 @@ private: bool _batchRequestInProgress; size_t _batchRequestsSize; BasicStream _batchStream; - bool _updateRequestHandler; + + RequestHandlerPtr _connectionRequestHandler; }; typedef IceUtil::Handle<ConnectRequestHandler> ConnectRequestHandlerPtr; diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp index a94d3e7180a..423a637c70c 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.cpp +++ b/cpp/src/Ice/ConnectionRequestHandler.cpp @@ -18,17 +18,6 @@ using namespace std; using namespace IceInternal; -ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference, const Ice::ObjectPrx& proxy) : - RequestHandler(reference) -{ - _connection = _reference->getConnection(_compress); - RouterInfoPtr ri = reference->getRouterInfo(); - if(ri) - { - ri->addProxy(proxy); - } -} - ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference, const Ice::ConnectionIPtr& connection, bool compress) : @@ -39,10 +28,9 @@ ConnectionRequestHandler::ConnectionRequestHandler(const ReferencePtr& reference } RequestHandlerPtr -ConnectionRequestHandler::connect() +ConnectionRequestHandler::connect(const Ice::ObjectPrx&) { - assert(false); // This request handler is only created after connection binding. - return 0; + return this; } RequestHandlerPtr diff --git a/cpp/src/Ice/ConnectionRequestHandler.h b/cpp/src/Ice/ConnectionRequestHandler.h index 211e8f02819..78feee8736f 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.h +++ b/cpp/src/Ice/ConnectionRequestHandler.h @@ -21,10 +21,9 @@ class ConnectionRequestHandler : public RequestHandler { public: - ConnectionRequestHandler(const ReferencePtr&, const Ice::ObjectPrx&); ConnectionRequestHandler(const ReferencePtr&, const Ice::ConnectionIPtr&, bool); - virtual RequestHandlerPtr connect(); + virtual RequestHandlerPtr connect(const Ice::ObjectPrx&); virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&); virtual void prepareBatchRequest(BasicStream*); diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index d45c0b9e758..261cecf46d3 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -29,6 +29,7 @@ #include <Ice/Network.h> #include <Ice/NetworkProxy.h> #include <Ice/EndpointFactoryManager.h> +#include <Ice/RequestHandlerFactory.h> #include <Ice/RetryQueue.h> #include <Ice/DynamicLibrary.h> #include <Ice/PluginManagerI.h> @@ -349,6 +350,20 @@ IceInternal::Instance::referenceFactory() const return _referenceFactory; } +RequestHandlerFactoryPtr +IceInternal::Instance::requestHandlerFactory() const +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_state == StateDestroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + assert(_requestHandlerFactory); + return _requestHandlerFactory; +} + ProxyFactoryPtr IceInternal::Instance::proxyFactory() const { @@ -1269,7 +1284,9 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _locatorManager = new LocatorManager(_initData.properties); _referenceFactory = new ReferenceFactory(this, communicator); - + + _requestHandlerFactory = new RequestHandlerFactory(this); + _proxyFactory = new ProxyFactory(this); bool ipv4 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; @@ -1714,6 +1731,8 @@ IceInternal::Instance::destroy() //_referenceFactory->destroy(); // No destroy function defined. _referenceFactory = 0; + + _requestHandlerFactory = 0; // _proxyFactory->destroy(); // No destroy function defined. _proxyFactory = 0; diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index c4047d1d0b4..4a41a6b8773 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -60,6 +60,9 @@ typedef IceUtil::Handle<Timer> TimerPtr; class MetricsAdminI; typedef IceUtil::Handle<MetricsAdminI> MetricsAdminIPtr; +class RequestHandlerFactory; +typedef IceUtil::Handle<RequestHandlerFactory> RequestHandlerFactoryPtr; + class Instance : public IceUtil::Shared, public IceUtil::RecMutex { public: @@ -71,6 +74,7 @@ public: RouterManagerPtr routerManager() const; LocatorManagerPtr locatorManager() const; ReferenceFactoryPtr referenceFactory() const; + RequestHandlerFactoryPtr requestHandlerFactory() const; ProxyFactoryPtr proxyFactory() const; OutgoingConnectionFactoryPtr outgoingConnectionFactory() const; ObjectFactoryManagerPtr servantFactoryManager() const; @@ -145,6 +149,7 @@ private: RouterManagerPtr _routerManager; LocatorManagerPtr _locatorManager; ReferenceFactoryPtr _referenceFactory; + RequestHandlerFactoryPtr _requestHandlerFactory; ProxyFactoryPtr _proxyFactory; OutgoingConnectionFactoryPtr _outgoingConnectionFactory; ObjectFactoryManagerPtr _servantFactoryManager; diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index f44e0c977a8..e50882118c4 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -117,6 +117,7 @@ OBJS = Acceptor.o \ Reference.o \ ReferenceFactory.o \ RequestHandler.o \ + RequestHandlerFactory.o \ ResponseHandler.o \ RetryQueue.o \ RouterInfo.o \ diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak index fcc4d3eb3e9..f920f1d709c 100644 --- a/cpp/src/Ice/Makefile.mak +++ b/cpp/src/Ice/Makefile.mak @@ -119,6 +119,7 @@ OBJS = .\Acceptor.obj \ .\Reference.obj \ .\ReferenceFactory.obj \ .\RequestHandler.obj \ + .\RequestHandlerFactory.obj \ .\ResponseHandler.obj \ .\RetryQueue.obj \ .\RouterInfo.obj \ diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index fca1013252e..1719adc6b35 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -98,6 +98,11 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) // try { + // + // It's important to let the retry queue do the retry even if + // the retry interval is 0. This method can be called with the + // connection locked so we can't just retry here. + // _instance->retryQueue()->add(this, handleException(exc)); return false; } diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index d18a38a96e5..7618ff6eeb0 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -14,9 +14,7 @@ #include <Ice/ObjectAdapterFactory.h> #include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> -#include <Ice/ConnectRequestHandler.h> -#include <Ice/CollocatedRequestHandler.h> -#include <Ice/ConnectionRequestHandler.h> +#include <Ice/RequestHandlerFactory.h> #include <Ice/Reference.h> #include <Ice/EndpointI.h> #include <Ice/Instance.h> @@ -1626,14 +1624,14 @@ IceProxy::Ice::Object::__getRequestHandler() { return _requestHandler; } - handler = createRequestHandler(); + handler = _reference->getInstance()->requestHandlerFactory()->getRequestHandler(_reference, this); _requestHandler = handler; } else { - handler = createRequestHandler(); + handler = _reference->getInstance()->requestHandlerFactory()->getRequestHandler(_reference, this); } - return handler->connect(); + return handler->connect(this); } void @@ -1663,21 +1661,6 @@ IceProxy::Ice::Object::__newInstance() const return new Object; } -RequestHandlerPtr -IceProxy::Ice::Object::createRequestHandler() -{ - if(_reference->getCollocationOptimized()) - { - ObjectAdapterPtr adapter = _reference->getInstance()->objectAdapterFactory()->findObjectAdapter(this); - if(adapter) - { - return new ::IceInternal::CollocatedRequestHandler(_reference, adapter); - } - } - - return new ::IceInternal::ConnectRequestHandler(_reference, this); -} - void IceProxy::Ice::Object::setup(const ReferencePtr& ref) { diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h index 68ff00d647d..8cae95275ba 100644 --- a/cpp/src/Ice/RequestHandler.h +++ b/cpp/src/Ice/RequestHandler.h @@ -63,7 +63,7 @@ class RequestHandler : public CancellationHandler { public: - virtual RequestHandlerPtr connect() = 0; + virtual RequestHandlerPtr connect(const Ice::ObjectPrx&) = 0; virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&) = 0; virtual void prepareBatchRequest(BasicStream*) = 0; diff --git a/cpp/src/Ice/RequestHandlerFactory.cpp b/cpp/src/Ice/RequestHandlerFactory.cpp new file mode 100644 index 00000000000..e21344c98d9 --- /dev/null +++ b/cpp/src/Ice/RequestHandlerFactory.cpp @@ -0,0 +1,71 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/RequestHandlerFactory.h> +#include <Ice/CollocatedRequestHandler.h> +#include <Ice/ConnectRequestHandler.h> +#include <Ice/CollocatedRequestHandler.h> +#include <Ice/Reference.h> +#include <Ice/ObjectAdapterFactory.h> +#include <Ice/Instance.h> + +using namespace std; +using namespace IceInternal; + +RequestHandlerFactory::RequestHandlerFactory(const InstancePtr& instance) : _instance(instance) +{ +} + +RequestHandlerPtr +IceInternal::RequestHandlerFactory::getRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) +{ + if(ref->getCollocationOptimized()) + { + Ice::ObjectAdapterPtr adapter = _instance->objectAdapterFactory()->findObjectAdapter(proxy); + if(adapter) + { + return new CollocatedRequestHandler(ref, adapter); + } + } + + if(ref->getCacheConnection()) + { + Lock sync(*this); + + map<ReferencePtr, RequestHandlerPtr>::iterator p = _handlers.find(ref); + if(p != _handlers.end()) + { + return p->second; + } + + RequestHandlerPtr handler = new ConnectRequestHandler(ref, proxy); + _handlers.insert(make_pair(ref, handler)); + return handler; + } + else + { + return new ConnectRequestHandler(ref, proxy); + } +} + +void +IceInternal::RequestHandlerFactory::removeRequestHandler(const ReferencePtr& ref, const RequestHandlerPtr& handler) +{ + if(ref->getCacheConnection()) + { + Lock sync(*this); + map<ReferencePtr, RequestHandlerPtr>::iterator p = _handlers.find(ref); + assert(p != _handlers.end() && p->second.get() == handler.get()); + if(p != _handlers.end()) + { + _handlers.erase(p); + } + } +} + diff --git a/cpp/src/Ice/RequestHandlerFactory.h b/cpp/src/Ice/RequestHandlerFactory.h new file mode 100644 index 00000000000..b7a0b5d7cc0 --- /dev/null +++ b/cpp/src/Ice/RequestHandlerFactory.h @@ -0,0 +1,41 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_REQUEST_HANDLER_FACTORY_H +#define ICE_REQUEST_HANDLER_FACTORY_H + +#include <IceUtil/Shared.h> +#include <IceUtil/Mutex.h> + +#include <Ice/RequestHandlerF.h> +#include <Ice/ProxyF.h> +#include <Ice/ReferenceF.h> +#include <Ice/InstanceF.h> + +namespace IceInternal +{ + +class RequestHandlerFactory : public IceUtil::Shared, private IceUtil::Mutex +{ +public: + + RequestHandlerFactory(const InstancePtr&); + + RequestHandlerPtr getRequestHandler(const ReferencePtr&, const Ice::ObjectPrx&); + void removeRequestHandler(const ReferencePtr&, const RequestHandlerPtr&); + +private: + + const InstancePtr _instance; + std::map<ReferencePtr, RequestHandlerPtr> _handlers; +}; + +} + +#endif diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index 12f0c508496..1073d245804 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -420,18 +420,18 @@ Selector::finish(EventHandler* handler, bool closeNow) if(handler->_registered) { update(handler, handler->_registered, SocketOperationNone); + } #if defined(ICE_USE_KQUEUE) - if(closeNow && !_changes.empty()) - { - // - // Update selector now to remove the FD from the kqueue if - // we're going to close it now. This isn't necessary for - // epoll since we always update the epoll FD immediately. - // - updateSelector(); - } -#endif + if(closeNow && !_changes.empty()) + { + // + // Update selector now to remove the FD from the kqueue if + // we're going to close it now. This isn't necessary for + // epoll since we always update the epoll FD immediately. + // + updateSelector(); } +#endif return closeNow; } @@ -509,7 +509,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti if(ev.flags & EV_ERROR) { Ice::Error out(_instance->initializationData().logger); - out << "error while updating selector:\n" << IceUtilInternal::errorToString(ev.data); + out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data); continue; } p.first = reinterpret_cast<EventHandler*>(ev.udata); diff --git a/cpp/src/Ice/winrt/Makefile.mak b/cpp/src/Ice/winrt/Makefile.mak index d2569a4a9a1..5c3d793db06 100644 --- a/cpp/src/Ice/winrt/Makefile.mak +++ b/cpp/src/Ice/winrt/Makefile.mak @@ -95,6 +95,7 @@ OBJS = $(ARCH)\$(CONFIG)\Acceptor.obj \ $(ARCH)\$(CONFIG)\RemoteLogger.obj \ $(ARCH)\$(CONFIG)\RetryQueue.obj \ $(ARCH)\$(CONFIG)\RequestHandler.obj \ + $(ARCH)\$(CONFIG)\RequestHandlerFactory.obj \ $(ARCH)\$(CONFIG)\ResponseHandler.obj \ $(ARCH)\$(CONFIG)\RouterInfo.obj \ $(ARCH)\$(CONFIG)\Router.obj \ diff --git a/cpp/test/Ice/hold/AllTests.cpp b/cpp/test/Ice/hold/AllTests.cpp index cab7f8e1a44..4688f5620c4 100644 --- a/cpp/test/Ice/hold/AllTests.cpp +++ b/cpp/test/Ice/hold/AllTests.cpp @@ -50,10 +50,7 @@ class SetCB : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex> { public: - SetCB(const ConditionPtr& condition, Ice::Int expected) : - _condition(condition), - _expected(expected), - _sent(false) + SetCB(const ConditionPtr& condition, Ice::Int expected) : _condition(condition), _expected(expected) { } @@ -71,29 +68,10 @@ public: { } - void - sent(bool) - { - Lock sync(*this); - _sent = true; - notify(); - } - - void - waitForSent() - { - Lock sync(*this); - while(!_sent) - { - wait(); - } - } - private: const ConditionPtr _condition; Ice::Int _expected; - bool _sent; }; typedef IceUtil::Handle<SetCB> SetCBPtr; @@ -144,23 +122,19 @@ allTests(const Ice::CommunicatorPtr& communicator) { ConditionPtr cond = new Condition(true); int value = 0; - SetCBPtr cb; + Ice::AsyncResultPtr result; while(cond->value()) { - cb = new SetCB(cond, value); - hold->begin_set(++value, IceUtilInternal::random(5), - newCallback_Hold_set(cb, &SetCB::response, &SetCB::exception, &SetCB::sent)); + result = hold->begin_set(value + 1, + IceUtilInternal::random(5), + newCallback_Hold_set(new SetCB(cond, value), &SetCB::response, &SetCB::exception)); + ++value; if(value % 100 == 0) { - cb->waitForSent(); - cb = 0; + result->waitForSent(); } } - if(cb) - { - cb->waitForSent(); - cb = 0; - } + result->waitForCompleted(); } cout << "ok" << endl; @@ -168,26 +142,24 @@ allTests(const Ice::CommunicatorPtr& communicator) { ConditionPtr cond = new Condition(true); int value = 0; - SetCBPtr cb; + Ice::AsyncResultPtr result; while(value < 3000 && cond->value()) { - cb = new SetCB(cond, value); - holdSerialized->begin_set(++value, IceUtilInternal::random(5), - newCallback_Hold_set(cb, &SetCB::response, &SetCB::exception, &SetCB::sent)); + result = holdSerialized->begin_set(value + 1, + IceUtilInternal::random(1), + newCallback_Hold_set(new SetCB(cond, value), + &SetCB::response, + &SetCB::exception)); + ++value; if(value % 100 == 0) { - cb->waitForSent(); - cb = 0; + result->waitForSent(); } } - if(cb) - { - cb->waitForSent(); - cb = 0; - } + result->waitForCompleted(); test(cond->value()); - for(int i = 0; i < 20000; ++i) + for(int i = 0; i < 10000; ++i) { holdSerialized->ice_oneway()->setOneway(value + 1, value); ++value; @@ -199,6 +171,26 @@ allTests(const Ice::CommunicatorPtr& communicator) } cout << "ok" << endl; + cout << "testing serialization... " << flush; + { + int value = 0; + holdSerialized->set(value, 0); + Ice::AsyncResultPtr result; + for(int i = 0; i < 10000; ++i) + { + // Create a new proxy for each request + result = holdSerialized->ice_oneway()->begin_setOneway(value + 1, value); + ++value; + if((i % 100) == 0) + { + result->waitForSent(); + holdSerialized->ice_getConnection()->close(false); + } + } + result->waitForCompleted(); + } + cout << "ok" << endl; + cout << "testing waitForHold... " << flush; { hold->waitForHold(); diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs index a0e1a354938..90079d3bc40 100644 --- a/cs/src/Ice/CollocatedRequestHandler.cs +++ b/cs/src/Ice/CollocatedRequestHandler.cs @@ -42,7 +42,7 @@ namespace IceInternal _batchStream = new BasicStream(@ref.getInstance(), Ice.Util.currentProtocolEncoding, _batchAutoFlush); } - public RequestHandler connect() + public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { return this; } diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs index a5953343b10..b516af9a4e7 100644 --- a/cs/src/Ice/ConnectRequestHandler.cs +++ b/cs/src/Ice/ConnectRequestHandler.cs @@ -36,19 +36,24 @@ namespace IceInternal internal Ice.AsyncCallback sentCallback = null; } - public RequestHandler connect() + public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { - Ice.ObjectPrxHelperBase proxy = _proxy; - try + // + // Initiate the connection if connect() is called by the proxy that + // created the handler. + // + if(Object.ReferenceEquals(proxy, _proxy)) { _reference.getConnection(this); + } + try + { lock(this) { if(!initialized()) { - // The proxy request handler will be updated when the connection is set. - _updateRequestHandler = true; + _proxies.Add(proxy); return this; } } @@ -59,11 +64,15 @@ namespace IceInternal throw ex; } - Debug.Assert(_connection != null); - - RequestHandler handler = new ConnectionRequestHandler(_reference, _connection, _compress); - proxy.setRequestHandler__(this, handler); - return handler; + if(_connectionRequestHandler != null) + { + proxy.setRequestHandler__(this, _connectionRequestHandler); + return _connectionRequestHandler; + } + else + { + return this; + } } public RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler) @@ -276,13 +285,17 @@ namespace IceInternal { Debug.Assert(!_initialized && _exception == null); _exception = ex; + _proxies.Clear(); _proxy = null; // Break cyclic reference count. - // - // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be - // called. - // + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException) + { + // Ignore + } flushRequestsWithException(); System.Threading.Monitor.PulseAll(this); @@ -313,7 +326,6 @@ namespace IceInternal _batchRequestInProgress = false; _batchRequestsSize = Protocol.requestBatchHdr.Length; _batchStream = new BasicStream(@ref.getInstance(), Ice.Util.currentProtocolEncoding, _batchAutoFlush); - _updateRequestHandler = false; } private bool initialized() @@ -424,17 +436,18 @@ namespace IceInternal } // - // We've finished sending the queued requests and the request handler now send - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. - // - // NOTE: _updateRequestHandler is immutable once _flushing = true + // If we aren't caching the connection, don't bother creating a + // connection request handler. Otherwise, update the proxies + // request handler to use the more efficient connection request + // handler. // - if(_updateRequestHandler && _exception == null) + if(_reference.getCacheConnection() && _exception == null) { - _proxy.setRequestHandler__(this, new ConnectionRequestHandler(_reference, _connection, _compress)); + _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + foreach(Ice.ObjectPrxHelperBase prx in _proxies) + { + prx.setRequestHandler__(this, _connectionRequestHandler); + } } lock(this) @@ -445,6 +458,15 @@ namespace IceInternal _initialized = true; _flushing = false; } + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException) + { + // Ignore + } + _proxies.Clear(); _proxy = null; // Break cyclic reference count. System.Threading.Monitor.PulseAll(this); } @@ -471,6 +493,7 @@ namespace IceInternal private bool _response; private Ice.ObjectPrxHelperBase _proxy; + private List<Ice.ObjectPrxHelperBase> _proxies = new List<Ice.ObjectPrxHelperBase>(); private bool _batchAutoFlush; @@ -484,6 +507,7 @@ namespace IceInternal private bool _batchRequestInProgress; private int _batchRequestsSize; private BasicStream _batchStream; - private bool _updateRequestHandler; + + private RequestHandler _connectionRequestHandler; } } diff --git a/cs/src/Ice/ConnectionRequestHandler.cs b/cs/src/Ice/ConnectionRequestHandler.cs index a0f1edec56f..683fefabc5e 100644 --- a/cs/src/Ice/ConnectionRequestHandler.cs +++ b/cs/src/Ice/ConnectionRequestHandler.cs @@ -16,10 +16,9 @@ namespace IceInternal { public class ConnectionRequestHandler : RequestHandler { - public RequestHandler connect() + public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { - Debug.Assert(false); // This request handler is only created after connection binding. - return null; + return this; } public RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler) diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs index 03b6a11fb06..3ec55288b22 100644 --- a/cs/src/Ice/Instance.cs +++ b/cs/src/Ice/Instance.cs @@ -117,6 +117,20 @@ namespace IceInternal } } + public RequestHandlerFactory requestHandlerFactory() + { + lock(this) + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + Debug.Assert(_requestHandlerFactory != null); + return _requestHandlerFactory; + } + } + public ProxyFactory proxyFactory() { lock(this) @@ -817,6 +831,8 @@ namespace IceInternal _proxyFactory = new ProxyFactory(this); + _requestHandlerFactory = new RequestHandlerFactory(this); + bool ipv4 = _initData.properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; bool ipv6 = _initData.properties.getPropertyAsIntWithDefault("Ice.IPv6", 1) > 0; if(!ipv4 && !ipv6) @@ -1178,6 +1194,9 @@ namespace IceInternal _referenceFactory = null; // No destroy function defined. + _requestHandlerFactory = null; + + // No destroy function defined. // _proxyFactory.destroy(); _proxyFactory = null; @@ -1421,6 +1440,7 @@ namespace IceInternal private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; + private RequestHandlerFactory _requestHandlerFactory; private ProxyFactory _proxyFactory; private OutgoingConnectionFactory _outgoingConnectionFactory; private ObjectFactoryManager _servantFactoryManager; diff --git a/cs/src/Ice/Makefile b/cs/src/Ice/Makefile index dbdd6671d1c..a7f918b555c 100644 --- a/cs/src/Ice/Makefile +++ b/cs/src/Ice/Makefile @@ -90,6 +90,7 @@ SRCS = Acceptor.cs \ ReferenceFactory.cs \ ReplyStatus.cs \ RequestHandler.cs \ + RequestHandlerFactory.cs \ ResponseHandler.cs \ RetryQueue.cs \ RouterInfo.cs \ diff --git a/cs/src/Ice/Makefile.mak b/cs/src/Ice/Makefile.mak index 454302543aa..09eeee5484f 100644 --- a/cs/src/Ice/Makefile.mak +++ b/cs/src/Ice/Makefile.mak @@ -91,6 +91,7 @@ SRCS = Acceptor.cs \ ReferenceFactory.cs \ ReplyStatus.cs \ RequestHandler.cs \ + RequestHandlerFactory.cs \ ResponseHandler.cs \ RetryQueue.cs \ RouterInfo.cs \ diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 6da49814641..f7c3e7d5d11 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -150,6 +150,11 @@ namespace IceInternal // try { + // + // It's important to let the retry queue do the retry even if + // the retry interval is 0. This method can be called with the + // connection locked so we can't just retry here. + // instance_.retryQueue().add(this, handleException(exc)); return null; } diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs index c815004d284..7846ddffe3a 100644 --- a/cs/src/Ice/Proxy.cs +++ b/cs/src/Ice/Proxy.cs @@ -2432,15 +2432,15 @@ namespace Ice { return _requestHandler; } - _requestHandler = createRequestHandler(); - handler = _requestHandler; + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); + _requestHandler = handler; } } else { - handler = createRequestHandler(); + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); } - return handler.connect(); + return handler.connect(this); } public void setRequestHandler__(IceInternal.RequestHandler previous, IceInternal.RequestHandler handler) @@ -2464,20 +2464,6 @@ namespace Ice } } - private IceInternal.RequestHandler createRequestHandler() - { - if(_reference.getCollocationOptimized()) - { - ObjectAdapter adapter = _reference.getInstance().objectAdapterFactory().findObjectAdapter(this); - if(adapter != null) - { - return new IceInternal.CollocatedRequestHandler(_reference, adapter); - } - } - - return new IceInternal.ConnectRequestHandler(_reference, this); - } - // // Only for use by IceInternal.ProxyFactory // diff --git a/cs/src/Ice/RequestHandler.cs b/cs/src/Ice/RequestHandler.cs index 6cef5d16d4b..56f53f8910c 100644 --- a/cs/src/Ice/RequestHandler.cs +++ b/cs/src/Ice/RequestHandler.cs @@ -19,7 +19,7 @@ namespace IceInternal public interface RequestHandler : CancellationHandler { - RequestHandler connect(); + RequestHandler connect(Ice.ObjectPrxHelperBase proxy); RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler); void prepareBatchRequest(BasicStream @out); diff --git a/cs/src/Ice/RequestHandlerFactory.cs b/cs/src/Ice/RequestHandlerFactory.cs new file mode 100644 index 00000000000..e25d9733c33 --- /dev/null +++ b/cs/src/Ice/RequestHandlerFactory.cs @@ -0,0 +1,71 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +using System.Collections.Generic; +using System.Diagnostics; + +namespace IceInternal +{ + public class RequestHandlerFactory + { + internal RequestHandlerFactory(Instance instance) + { + _instance = instance; + } + + public RequestHandler + getRequestHandler(Reference rf, Ice.ObjectPrxHelperBase proxy) + { + if(rf.getCollocationOptimized()) + { + Ice.ObjectAdapter adapter = _instance.objectAdapterFactory().findObjectAdapter(proxy); + if(adapter != null) + { + return new CollocatedRequestHandler(rf, adapter); + } + } + + if(rf.getCacheConnection()) + { + lock(this) + { + RequestHandler handler; + if(_handlers.TryGetValue(rf, out handler)) + { + return handler; + } + + handler = new ConnectRequestHandler(rf, proxy); + _handlers.Add(rf, handler); + return handler; + } + } + else + { + return new ConnectRequestHandler(rf, proxy); + } + } + + internal void + removeRequestHandler(Reference rf, RequestHandler handler) + { + if(rf.getCacheConnection()) + { + lock(this) + { + Debug.Assert(_handlers[rf] == handler); + _handlers.Remove(rf); + } + } + } + + readonly Instance _instance; + readonly Dictionary<Reference, RequestHandler> _handlers = new Dictionary<Reference, RequestHandler>(); + } +} diff --git a/cs/test/Ice/hold/AllTests.cs b/cs/test/Ice/hold/AllTests.cs index 5f210443414..c1eb3588422 100644 --- a/cs/test/Ice/hold/AllTests.cs +++ b/cs/test/Ice/hold/AllTests.cs @@ -75,29 +75,6 @@ public class AllTests : TestCommon.TestApp { } - public void - sent(bool sync) - { - lock(this) - { - _sent = true; - System.Threading.Monitor.Pulse(this); - } - } - - public void - waitForSent() - { - lock(this) - { - while(!_sent) - { - System.Threading.Monitor.Wait(this); - } - } - } - - private bool _sent = false; private Condition _condition; private int _expected; } @@ -157,16 +134,15 @@ public class AllTests : TestCommon.TestApp { Condition cond = new Condition(true); int value = 0; - SetCB cb = null; + Ice.AsyncResult result = null; while(cond.value()) { - cb = new SetCB(cond, value); - hold.begin_set(++value, value < 500 ? rand.Next(5) : 0). - whenCompleted(cb.response, cb.exception).whenSent(cb.sent); + SetCB cb = new SetCB(cond, value); + result = hold.begin_set(++value, value < 500 ? rand.Next(5) : 0).whenCompleted(cb.response, + cb.exception); if(value % 100 == 0) { - cb.waitForSent(); - cb = null; + result.waitForSent(); } if(value > 100000) @@ -177,11 +153,7 @@ public class AllTests : TestCommon.TestApp break; } } - if(cb != null) - { - cb.waitForSent(); - cb = null; - } + result.waitForSent(); } WriteLine("ok"); @@ -190,26 +162,20 @@ public class AllTests : TestCommon.TestApp { Condition cond = new Condition(true); int value = 0; - SetCB cb = null; + Ice.AsyncResult result = null; while(value < 3000 && cond.value()) { - cb = new SetCB(cond, value); - holdSerialized.begin_set(++value, value < 500 ? rand.Next(5) : 0). - whenCompleted(cb.response, cb.exception).whenSent(cb.sent); + SetCB cb = new SetCB(cond, value); + result = holdSerialized.begin_set(++value, 0).whenCompleted(cb.response, cb.exception); if(value % 100 == 0) { - cb.waitForSent(); - cb = null; + result.waitForSent(); } } - if(cb != null) - { - cb.waitForSent(); - cb = null; - } + result.waitForCompleted(); test(cond.value()); - for(int i = 0; i < 20000; ++i) + for(int i = 0; i < 10000; ++i) { holdSerializedOneway.setOneway(value + 1, value); ++value; @@ -221,6 +187,27 @@ public class AllTests : TestCommon.TestApp } WriteLine("ok"); + Write("testing serialization... "); + Flush(); + { + int value = 0; + holdSerialized.set(value, 0); + Ice.AsyncResult result = null; + for(int i = 0; i < 10000; ++i) + { + // Create a new proxy for each request + result = ((HoldPrx)holdSerialized.ice_oneway()).begin_setOneway(value + 1, value); + ++value; + if((i % 100) == 0) + { + result.waitForSent(); + holdSerialized.ice_getConnection().close(false); + } + } + result.waitForCompleted(); + } + WriteLine("ok"); + Write("testing waitForHold... "); Flush(); { diff --git a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java index 2c763f56cc2..27f8b15014a 100644 --- a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java @@ -2757,15 +2757,15 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { return _requestHandler; } - _requestHandler = createRequestHandler(); - handler = _requestHandler; + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); + _requestHandler = handler; } } else { - handler = createRequestHandler(); + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); } - return handler.connect(); + return handler.connect(this); } public void @@ -2803,26 +2803,6 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } - private IceInternal.RequestHandler - createRequestHandler() - { - if(_reference.getCollocationOptimized()) - { - ObjectAdapter adapter = _reference.getInstance().objectAdapterFactory().findObjectAdapter(this); - if(adapter != null) - { - return new IceInternal.CollocatedRequestHandler(_reference, adapter); - } - } - - IceInternal.RequestHandler handler = new IceInternal.ConnectRequestHandler(_reference, this); - if(_reference.getInstance().queueRequests()) - { - handler = new QueueRequestHandler(_reference.getInstance(), handler); - } - return handler; - } - // // Only for use by IceInternal.ProxyFactory // diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java index e3f5f045f58..bd882384441 100644 --- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java @@ -58,7 +58,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public RequestHandler - connect() + connect(Ice.ObjectPrxHelperBase proxy) { return this; } diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java index c1636662c8c..4da9f0ab401 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java @@ -33,19 +33,24 @@ public class ConnectRequestHandler @Override public RequestHandler - connect() + connect(Ice.ObjectPrxHelperBase proxy) { - Ice.ObjectPrxHelperBase proxy = _proxy; - try + // + // Initiate the connection if connect() is called by the proxy that + // created the handler. + // + if(proxy == _proxy) { _reference.getConnection(this); - + } + + try + { synchronized(this) { if(!initialized()) { - // The proxy request handler will be updated when the connection is set. - _updateRequestHandler = true; + _proxies.add(proxy); return this; } } @@ -56,11 +61,15 @@ public class ConnectRequestHandler throw ex; } - assert(_connection != null); - - RequestHandler handler = new ConnectionRequestHandler(_reference, _connection, _compress); - proxy.__setRequestHandler(this, handler); - return handler; + if(_connectionRequestHandler != null) + { + proxy.__setRequestHandler(this, _connectionRequestHandler); + return _connectionRequestHandler; + } + else + { + return this; + } } @Override @@ -286,15 +295,19 @@ public class ConnectRequestHandler { assert(!_initialized && _exception == null); _exception = ex; + _proxies.clear(); _proxy = null; // Break cyclic reference count. - // - // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be - // called. - // flushRequestsWithException(); + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException exc) + { + // Ignore + } notifyAll(); } @@ -313,7 +326,7 @@ public class ConnectRequestHandler } public - ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy) + ConnectRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy) { _reference = ref; _response = _reference.getMode() == Reference.ModeTwoway; @@ -325,7 +338,6 @@ public class ConnectRequestHandler _batchRequestInProgress = false; _batchRequestsSize = Protocol.requestBatchHdr.length; _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding, _batchAutoFlush); - _updateRequestHandler = false; } private boolean @@ -452,17 +464,18 @@ public class ConnectRequestHandler } // - // We've finished sending the queued requests and the request handler now send - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. - // - // NOTE: _updateRequestHandler is immutable once _flushing = true + // If we aren't caching the connection, don't bother creating a + // connection request handler. Otherwise, update the proxies + // request handler to use the more efficient connection request + // handler. // - if(_updateRequestHandler && _exception == null) + if(_reference.getCacheConnection() && _exception == null) { - _proxy.__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress)); + _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + for(Ice.ObjectPrxHelperBase proxy : _proxies) + { + proxy.__setRequestHandler(this, _connectionRequestHandler); + } } synchronized(this) @@ -473,6 +486,15 @@ public class ConnectRequestHandler _initialized = true; _flushing = false; } + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException ex) + { + // Ignore + } + _proxies.clear(); _proxy = null; // Break cyclic reference count. notifyAll(); } @@ -526,6 +548,7 @@ public class ConnectRequestHandler private boolean _response; private Ice.ObjectPrxHelperBase _proxy; + private java.util.List<Ice.ObjectPrxHelperBase> _proxies = new java.util.ArrayList<Ice.ObjectPrxHelperBase>(); private final boolean _batchAutoFlush; @@ -539,5 +562,6 @@ public class ConnectRequestHandler private boolean _batchRequestInProgress; private int _batchRequestsSize; private BasicStream _batchStream; - private boolean _updateRequestHandler; + + private RequestHandler _connectionRequestHandler; } diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java index 924b633e670..d9a1fc5e9a4 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java @@ -13,10 +13,9 @@ public class ConnectionRequestHandler implements RequestHandler { @Override public RequestHandler - connect() + connect(Ice.ObjectPrxHelperBase proxy) { - assert(false); // This request handler is only created after connection binding. - return null; + return this; } @Override diff --git a/java/src/Ice/src/main/java/IceInternal/Instance.java b/java/src/Ice/src/main/java/IceInternal/Instance.java index 900784eafd5..94a74a959ee 100644 --- a/java/src/Ice/src/main/java/IceInternal/Instance.java +++ b/java/src/Ice/src/main/java/IceInternal/Instance.java @@ -205,6 +205,18 @@ public final class Instance return _referenceFactory; } + public synchronized RequestHandlerFactory + requestHandlerFactory() + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + assert(_requestHandlerFactory != null); + return _requestHandlerFactory; + } + public synchronized ProxyFactory proxyFactory() { @@ -880,6 +892,8 @@ public final class Instance _referenceFactory = new ReferenceFactory(this, communicator); + _requestHandlerFactory = new RequestHandlerFactory(this); + _proxyFactory = new ProxyFactory(this); boolean ipv4 = _initData.properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; @@ -956,6 +970,7 @@ public final class Instance { IceUtilInternal.Assert.FinalizerAssert(_state == StateDestroyed); IceUtilInternal.Assert.FinalizerAssert(_referenceFactory == null); + IceUtilInternal.Assert.FinalizerAssert(_requestHandlerFactory == null); IceUtilInternal.Assert.FinalizerAssert(_proxyFactory == null); IceUtilInternal.Assert.FinalizerAssert(_outgoingConnectionFactory == null); IceUtilInternal.Assert.FinalizerAssert(_servantFactoryManager == null); @@ -1258,7 +1273,9 @@ public final class Instance //_referenceFactory.destroy(); // No destroy function defined. _referenceFactory = null; - + + _requestHandlerFactory = null; + // _proxyFactory.destroy(); // No destroy function defined. _proxyFactory = null; @@ -1564,6 +1581,7 @@ public final class Instance private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; + private RequestHandlerFactory _requestHandlerFactory; private ProxyFactory _proxyFactory; private OutgoingConnectionFactory _outgoingConnectionFactory; private ObjectFactoryManager _servantFactoryManager; diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java index 29779078111..ce20fb95e9d 100644 --- a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java +++ b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java @@ -52,6 +52,11 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase // try { + // + // It's important to let the retry queue do the retry even if + // the retry interval is 0. This method can be called with the + // connection locked so we can't just retry here. + // _instance.retryQueue().add(this, handleException(exc)); return false; } diff --git a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java index fdb7a672e2a..4d2be680c83 100644 --- a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java @@ -30,14 +30,14 @@ public class QueueRequestHandler implements RequestHandler @Override public RequestHandler - connect() + connect(final Ice.ObjectPrxHelperBase proxy) { performCallable(new Callable<Void>() { @Override public Void call() { - _delegate.connect(); + _delegate.connect(proxy); return null; } }); @@ -184,7 +184,8 @@ public class QueueRequestHandler implements RequestHandler return _delegate.waitForConnection(); } - private <T> T performCallable(Callable<T> callable) { + private <T> T performCallable(Callable<T> callable) + { try { Future<T> future = _executor.submit(callable); diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java index f87b215919f..6bca1b4ff24 100644 --- a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java @@ -11,7 +11,7 @@ package IceInternal; public interface RequestHandler extends CancellationHandler { - RequestHandler connect(); + RequestHandler connect(Ice.ObjectPrxHelperBase proxy); RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler); void prepareBatchRequest(BasicStream out) diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java new file mode 100644 index 00000000000..d8f79c7c79b --- /dev/null +++ b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java @@ -0,0 +1,79 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import java.util.Map; +import java.util.HashMap; + +public final class RequestHandlerFactory +{ + RequestHandlerFactory(Instance instance) + { + _instance = instance; + } + + public RequestHandler + getRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy) + { + if(ref.getCollocationOptimized()) + { + Ice.ObjectAdapter adapter = _instance.objectAdapterFactory().findObjectAdapter(proxy); + if(adapter != null) + { + return new CollocatedRequestHandler(ref, adapter); + } + } + + if(ref.getCacheConnection()) + { + synchronized(this) + { + RequestHandler handler = _handlers.get(ref); + if(handler != null) + { + return handler; + } + + handler = new ConnectRequestHandler(ref, proxy); + if(_instance.queueRequests()) + { + handler = new QueueRequestHandler(_instance, handler); + } + _handlers.put(ref, handler); + return handler; + } + } + else + { + RequestHandler handler = new ConnectRequestHandler(ref, proxy); + if(_instance.queueRequests()) + { + handler = new QueueRequestHandler(_instance, handler); + } + return handler; + } + } + + void + removeRequestHandler(Reference ref, RequestHandler handler) + { + if(ref.getCacheConnection()) + { + synchronized(this) + { + assert(_handlers.containsKey(ref)); + _handlers.remove(ref); + } + } + } + + private final Instance _instance; + private final Map<Reference, RequestHandler> _handlers = new HashMap<Reference, RequestHandler>(); +} diff --git a/java/test/Ice/hold/AllTests.java b/java/test/Ice/hold/AllTests.java index d320ea52ae8..bed3bec47bd 100644 --- a/java/test/Ice/hold/AllTests.java +++ b/java/test/Ice/hold/AllTests.java @@ -77,26 +77,8 @@ public class AllTests synchronized public void sent(boolean sync) { - _sent = true; - notify(); - } - - synchronized void - waitForSent() - { - while(!_sent) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } } - private boolean _sent = false; private Condition _condition; private int _expected; }; @@ -152,22 +134,17 @@ public class AllTests { Condition cond = new Condition(true); int value = 0; - AMICheckSetValue cb = null; + Ice.AsyncResult result = null; while(cond.value()) { - cb = new AMICheckSetValue(cond, value); - hold.begin_set(++value, random.nextInt(5), cb); + result = hold.begin_set(value + 1, random.nextInt(5), new AMICheckSetValue(cond, value)); + ++value; if(value % 100 == 0) { - cb.waitForSent(); - cb = null; + result.waitForSent(); } } - if(cb != null) - { - cb.waitForSent(); - cb = null; - } + result.waitForCompleted(); } out.println("ok"); @@ -176,25 +153,20 @@ public class AllTests { Condition cond = new Condition(true); int value = 0; - AMICheckSetValue cb = null; + Ice.AsyncResult result = null; while(value < 3000 && cond.value()) { - cb = new AMICheckSetValue(cond, value); - holdSerialized.begin_set(++value, 0, cb); + result = holdSerialized.begin_set(value + 1, random.nextInt(1), new AMICheckSetValue(cond, value)); + ++value; if(value % 100 == 0) { - cb.waitForSent(); - cb = null; + result.waitForSent(); } } - if(cb != null) - { - cb.waitForSent(); - cb = null; - } + result.waitForCompleted(); test(cond.value()); - for(int i = 0; i < 20000; ++i) + for(int i = 0; i < 10000; ++i) { holdSerializedOneway.setOneway(value + 1, value); ++value; @@ -206,6 +178,27 @@ public class AllTests } out.println("ok"); + out.print("testing serialization... "); + out.flush(); + { + int value = 0; + holdSerialized.set(value, 0); + Ice.AsyncResult result = null; + for(int i = 0; i < 10000; ++i) + { + // Create a new proxy for each request + result = ((HoldPrx)holdSerialized.ice_oneway()).begin_setOneway(value + 1, value); + ++value; + if((i % 100) == 0) + { + result.waitForSent(); + holdSerialized.ice_getConnection().close(false); + } + } + result.waitForCompleted(); + } + out.println("ok"); + out.print("testing waitForHold... "); out.flush(); { diff --git a/js/allTests.py b/js/allTests.py index 7811095379a..ca2f88504e8 100755 --- a/js/allTests.py +++ b/js/allTests.py @@ -34,6 +34,7 @@ tests = [ ("Ice/exceptionsBidir", ["once"]), ("Ice/facets", ["core"]), ("Ice/facetsBidir", ["core"]), + ("Ice/hold", ["core"]), ("Ice/inheritance", ["once"]), ("Ice/inheritanceBidir", ["once"]), ("Ice/location", ["once"]), diff --git a/js/demo/Ice/hello/Client.js b/js/demo/Ice/hello/Client.js index f06187a82bd..6806e7250f4 100644 --- a/js/demo/Ice/hello/Client.js +++ b/js/demo/Ice/hello/Client.js @@ -187,6 +187,6 @@ Ice.Promise.try( }, function(ex) { - console.log(ex.toString()); + console.log(ex); process.exit(1); }); diff --git a/js/src/Ice/ConnectRequestHandler.js b/js/src/Ice/ConnectRequestHandler.js index 22bfb213581..5ad7ae6cdc6 100644 --- a/js/src/Ice/ConnectRequestHandler.js +++ b/js/src/Ice/ConnectRequestHandler.js @@ -44,42 +44,40 @@ var ConnectRequestHandler = Ice.Class({ this._reference = ref; this._response = ref.getMode() === ReferenceMode.ModeTwoway; this._proxy = proxy; + this._proxies = []; this._batchAutoFlush = ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault( "Ice.BatchAutoFlush", 1) > 0 ? true : false; this._initialized = false; - this._flushing = false; this._batchRequestInProgress = false; this._batchRequestsSize = Protocol.requestBatchHdr.length; this._batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding, this._batchAutoFlush); - this._updateRequestHandler = false; this._connection = null; this._compress = false; this._exception = null; this._requests = []; - this._updateRequestHandler = false; }, - connect: function() + connect: function(proxy) { var self = this; - var proxy = this._proxy; - try + if(proxy === this._proxy) { - this._reference.getConnection().then( - function(connection, compress) - { - self.setConnection(connection, compress); - }).exception( - function(ex) - { - self.setException(ex); - }); + this._reference.getConnection().then(function(connection, compress) + { + self.setConnection(connection, compress); + }, + function(ex) + { + self.setException(ex); + }); + } + try + { if(!this.initialized()) { - // The proxy request handler will be updated when the connection is set. - this._updateRequestHandler = true; + this._proxies.push(proxy); return this; } } @@ -89,11 +87,15 @@ var ConnectRequestHandler = Ice.Class({ throw ex; } - Debug.Assert(this._connection !== null); - - var handler = new ConnectionRequestHandler(this._reference, this._connection, this._compress); - proxy.setRequestHandler__(this, handler); - return handler; + if(this._connectionRequestHandler) + { + proxy.__setRequestHandler(this, this._connectionRequestHandler); + return this._connectionRequestHandler; + } + else + { + return this; + } }, update: function(previousHandler, newHandler) { @@ -148,7 +150,6 @@ var ConnectRequestHandler = Ice.Class({ this._batchAutoFlush); this._batchStream.swap(dummy); this._batchRequestsSize = Protocol.requestBatchHdr.length; - return; } this._connection.abortBatchRequest(); @@ -218,7 +219,6 @@ var ConnectRequestHandler = Ice.Class({ setConnection: function(connection, compress) { Debug.assert(this._exception === null && this._connection === null); - Debug.assert(this._updateRequestHandler || this._requests.length === 0); this._connection = connection; this._compress = compress; @@ -231,24 +231,19 @@ var ConnectRequestHandler = Ice.Class({ if(ri !== null) { var self = this; - var promise = ri.addProxy(this._proxy).then( - function() - { - // - // The proxy was added to the router info, we're now ready to send the - // queued requests. - // - self.flushRequests(); - }).exception( - function(ex) - { - self.setException(ex); - }); - - if(!promise.completed()) - { - return; // The request handler will be initialized once addProxy completes. - } + ri.addProxy(this._proxy).then(function() + { + // + // The proxy was added to the router info, we're now ready to send the + // queued requests. + // + self.flushRequests(); + }, + function(ex) + { + self.setException(ex); + }); + return; // The request handler will be initialized once addProxy completes. } // @@ -259,19 +254,21 @@ var ConnectRequestHandler = Ice.Class({ setException: function(ex) { Debug.assert(!this._initialized && this._exception === null); - Debug.assert(this._updateRequestHandler || this._requests.length === 0); this._exception = ex; + this._proxies.length = 0; this._proxy = null; // Break cyclic reference count. - // - // If some requests were queued, we notify them of the failure. - // - if(this._requests.length > 0) + this.flushRequestsWithException(ex); + + try { - this.flushRequestsWithException(ex); + this._reference.getInstance().requestHandlerFactory().removeRequestHandler(this._reference, this); + } + catch(exc) + { + // Ignore } - }, initialized: function() { @@ -296,13 +293,6 @@ var ConnectRequestHandler = Ice.Class({ { Debug.assert(this._connection !== null && !this._initialized); - // - // We set the _flushing flag to true to prevent any additional queuing. Callers - // might block for a little while as the queued requests are being sent but this - // shouldn't be an issue as the request sends are non-blocking. - // - this._flushing = true; - try { while(this._requests.length > 0) @@ -358,27 +348,31 @@ var ConnectRequestHandler = Ice.Class({ } } - // - // We've finished sending the queued requests and the request handler now send - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. - // - // NOTE: _updateRequestHandler is immutable once _flushing = true - // - if(this._updateRequestHandler && this._exception === null) + if(this._reference.getCacheConnection() && this._exception === null) { - this._proxy.__setRequestHandler(this, new ConnectionRequestHandler(this._reference, this._connection, - this._compress)); + this._connectionRequestHandler = new ConnectionRequestHandler(this._reference, + this._connection, + this._compress); + for(var i in this._proxies) + { + this._proxies[i].__setRequestHandler(this, this._connectionRequestHandler); + } } Debug.assert(!this._initialized); if(this._exception === null) { this._initialized = true; - this._flushing = false; } + try + { + this._reference.getInstance().requestHandlerFactory().removeRequestHandler(this._reference, this); + } + catch(exc) + { + // Ignore + } + this._proxies.length = 0; this._proxy = null; // Break cyclic reference count. }, flushRequestsWithException: function() @@ -391,7 +385,7 @@ var ConnectRequestHandler = Ice.Class({ request.out.__completedEx(this._exception); } } - this._requests = []; + this._requests.length = 0; } }); diff --git a/js/src/Ice/ConnectionRequestHandler.js b/js/src/Ice/ConnectionRequestHandler.js index 914e620c040..eabfd274ec8 100644 --- a/js/src/Ice/ConnectionRequestHandler.js +++ b/js/src/Ice/ConnectionRequestHandler.js @@ -21,10 +21,10 @@ var ConnectionRequestHandler = Ice.Class({ this._connection = connection; this._compress = compress; }, - // connect : function() - // { - // This request handler is only created after connection binding. - // } + connect : function() + { + return this; + }, update: function(previousHandler, newHandler) { try diff --git a/js/src/Ice/Instance.js b/js/src/Ice/Instance.js index 0ac7976d627..ab9fdbfb710 100644 --- a/js/src/Ice/Instance.js +++ b/js/src/Ice/Instance.js @@ -34,6 +34,7 @@ Ice.__M.require(module, "../Ice/TcpEndpointFactory", "../Ice/WSEndpointFactory", "../Ice/Reference", + "../Ice/RequestHandlerFactory", "../Ice/LocalException", "../Ice/Exception", "../Ice/ProcessLogger", @@ -61,6 +62,7 @@ var RouterManager = Ice.RouterManager; var Timer = Ice.Timer; var TraceLevels = Ice.TraceLevels; var ReferenceFactory = Ice.ReferenceFactory; +var RequestHandlerFactory = Ice.RequestHandlerFactory; var ACMConfig = Ice.ACMConfig; var StateActive = 0; @@ -84,6 +86,7 @@ var Instance = Ice.Class({ this._routerManager = null; this._locatorManager = null; this._referenceFactory = null; + this._requestHandlerFactory = null; this._proxyFactory = null; this._outgoingConnectionFactory = null; this._servantFactoryManager = null; @@ -145,6 +148,16 @@ var Instance = Ice.Class({ Debug.assert(this._referenceFactory !== null); return this._referenceFactory; }, + requestHandlerFactory: function() + { + if(this._state === StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + Debug.assert(this._requestHandlerFactory !== null); + return this._requestHandlerFactory; + }, proxyFactory: function() { if(this._state === StateDestroyed) @@ -332,6 +345,8 @@ var Instance = Ice.Class({ this._referenceFactory = new ReferenceFactory(this, communicator); + this._requestHandlerFactory = new RequestHandlerFactory(this, communicator); + this._proxyFactory = new ProxyFactory(this); this._endpointFactoryManager = new EndpointFactoryManager(this); @@ -482,11 +497,11 @@ var Instance = Ice.Class({ self._servantFactoryManager = null; } - if(self._referenceFactory) - { - //self._referenceFactory.destroy(); // No destroy function defined. - self._referenceFactory = null; - } + //self._referenceFactory.destroy(); // No destroy function defined. + self._referenceFactory = null; + + //self._requestHandlerFactory.destroy(); // No destroy function defined. + self._requestHandlerFactory = null; // self._proxyFactory.destroy(); // No destroy function defined. self._proxyFactory = null; diff --git a/js/src/Ice/Makefile b/js/src/Ice/Makefile index b9cdf2525f9..cd453b0347e 100644 --- a/js/src/Ice/Makefile +++ b/js/src/Ice/Makefile @@ -96,6 +96,7 @@ COMMON_SRCS = \ ProxyFactory.js \ Reference.js \ ReferenceMode.js \ + RequestHandlerFactory.js \ RetryException.js \ RetryQueue.js \ RouterInfo.js \ diff --git a/js/src/Ice/Makefile.mak b/js/src/Ice/Makefile.mak index 5c1138f3870..c12fa2d9e72 100644 --- a/js/src/Ice/Makefile.mak +++ b/js/src/Ice/Makefile.mak @@ -92,6 +92,7 @@ COMMON_SRCS = \ ProxyFactory.js \ Reference.js \ ReferenceMode.js \ + RequestHandlerFactory.js \ RetryException.js \ RetryQueue.js \ RouterInfo.js \ diff --git a/js/src/Ice/ObjectPrx.js b/js/src/Ice/ObjectPrx.js index 455f10bcbca..ef653e7d5a1 100644 --- a/js/src/Ice/ObjectPrx.js +++ b/js/src/Ice/ObjectPrx.js @@ -13,7 +13,6 @@ Ice.__M.require(module, "../Ice/Class", "../Ice/ArrayUtil", "../Ice/AsyncResult", - "../Ice/ConnectRequestHandler", "../Ice/Debug", "../Ice/FormatType", "../Ice/HashMap", @@ -29,7 +28,6 @@ Ice.__M.require(module, var ArrayUtil = Ice.ArrayUtil; var AsyncResultBase = Ice.AsyncResultBase; var AsyncResult = Ice.AsyncResult; -var ConnectRequestHandler = Ice.ConnectRequestHandler; var Debug = Ice.Debug; var FormatType = Ice.FormatType; var HashMap = Ice.HashMap; @@ -540,14 +538,14 @@ var ObjectPrx = Ice.Class({ { return this._requestHandler; } - this._requestHandler = new ConnectRequestHandler(this._reference, this); - handler = this._requestHandler; + handler = this._reference.getInstance().requestHandlerFactory().getRequestHandler(this._reference, this); + this._requestHandler = handler; } else { - handler = new ConnectRequestHandler(this._reference, this); + handler = this._reference.getInstance().requestHandlerFactory().getRequestHandler(this._reference, this); } - return handler.connect(); + return handler.connect(this); }, __setRequestHandler: function(previous, handler) { diff --git a/js/src/Ice/Operation.js b/js/src/Ice/Operation.js index 0fc4db50335..e11f377ada1 100644 --- a/js/src/Ice/Operation.js +++ b/js/src/Ice/Operation.js @@ -376,7 +376,6 @@ var __dispatchImpl = function(servant, op, incomingAsync, current) var comm = current.adapter.getCommunicator(); var msg = "servant for identity " + comm.identityToString(current.id) + " does not define operation `" + op.servantMethod + "'"; - console.log(msg); throw new Ice.UnknownException(msg); } diff --git a/js/src/Ice/Promise.js b/js/src/Ice/Promise.js index de0a2ce2bfe..326611c0f57 100644 --- a/js/src/Ice/Promise.js +++ b/js/src/Ice/Promise.js @@ -142,13 +142,10 @@ var Promise = Ice.Class({ }; }; - setTimeout( - function() - { - self.then(delayHandler(p, p.succeed), - delayHandler(p, p.fail)); - }); - + setTimeout(function() + { + self.then(delayHandler(p, p.succeed), delayHandler(p, p.fail)); + }); return p; }, resolve: function() diff --git a/js/src/Ice/RequestHandlerFactory.js b/js/src/Ice/RequestHandlerFactory.js new file mode 100644 index 00000000000..58c63aa339b --- /dev/null +++ b/js/src/Ice/RequestHandlerFactory.js @@ -0,0 +1,60 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +var Ice = require("../Ice/ModuleRegistry").Ice; +Ice.__M.require(module, + [ + "../Ice/Class", + "../Ice/Debug", + "../Ice/HashMap", + "../Ice/Reference", + "../Ice/ConnectRequestHandler" + ]); + +var Debug = Ice.Debug; +var HashMap = Ice.HashMap; +var ConnectRequestHandler = Ice.ConnectRequestHandler; + +var RequestHandlerFactory = Ice.Class({ + __init__: function(instance) + { + this._instance = instance; + this._handlers = new HashMap(); + this._handlers.keyComparator = HashMap.compareEquals; + }, + getRequestHandler: function(ref, proxy) + { + if(ref.getCacheConnection()) + { + var handler = this._handlers.get(ref); + if(handler) + { + return handler; + } + handler = new ConnectRequestHandler(ref, proxy); + this._handlers.set(ref, handler); + return handler; + } + else + { + return new ConnectRequestHandler(ref, proxy); + } + }, + removeRequestHandler: function(ref, handler) + { + if(ref.getCacheConnection()) + { + var h = this._handlers.delete(ref); + Debug.assert(h === handler); + } + } +}); + +Ice.RequestHandlerFactory = RequestHandlerFactory; +module.exports.Ice = Ice; diff --git a/js/test/Common/TestSuite.js b/js/test/Common/TestSuite.js index 04f10bf864c..a3e9e5fd85e 100644 --- a/js/test/Common/TestSuite.js +++ b/js/test/Common/TestSuite.js @@ -89,7 +89,7 @@ $(document).foundation(); }, function(ex) { - out.writeLine("failed! (" + ex.ice_name() + ")"); + out.writeLine("failed! (" + ex + ")"); return __test__(out, id); } ).then( diff --git a/js/test/Common/index.html b/js/test/Common/index.html index 976452c9b31..80bb50eec7f 100644 --- a/js/test/Common/index.html +++ b/js/test/Common/index.html @@ -120,6 +120,7 @@ "../exceptions/Client.js", "Client.js"], "Ice/facets": ["Test.js", "Client.js"], "Ice/facetsBidir": ["Test.js", "TestI.js", "../facets/Client.js", "Client.js"], + "Ice/hold": ["Test.js", "Client.js"], "Ice/inheritance": ["Test.js", "Client.js"], "Ice/inheritanceBidir": ["Test.js", "InitialI.js", "../inheritance/Client.js", "Client.js"], "Ice/operations": ["Test.js", "Twoways.js", "Oneways.js", "BatchOneways.js", "Client.js"], diff --git a/js/test/Ice/Makefile b/js/test/Ice/Makefile index 69dc0d5a4f2..fd3d9c6a5d2 100644 --- a/js/test/Ice/Makefile +++ b/js/test/Ice/Makefile @@ -21,6 +21,7 @@ SUBDIRS = \ exceptionsBidir \ facets \ facetsBidir \ + hold \ inheritance \ inheritanceBidir \ location \ diff --git a/js/test/Ice/Makefile.mak b/js/test/Ice/Makefile.mak index 16475a3cd81..7738c58073b 100644 --- a/js/test/Ice/Makefile.mak +++ b/js/test/Ice/Makefile.mak @@ -21,6 +21,7 @@ SUBDIRS = \ exceptionsBidir \ facets \ facetsBidir \ + hold \ inheritance \ inheritanceBidir \ location \ diff --git a/js/test/Ice/hold/.depend.mak b/js/test/Ice/hold/.depend.mak new file mode 100644 index 00000000000..a3b320ad9c2 --- /dev/null +++ b/js/test/Ice/hold/.depend.mak @@ -0,0 +1,3 @@ + +Test.js: \ + .\Test.ice diff --git a/js/test/Ice/hold/.gitignore b/js/test/Ice/hold/.gitignore new file mode 100644 index 00000000000..d158d9308ba --- /dev/null +++ b/js/test/Ice/hold/.gitignore @@ -0,0 +1,2 @@ +Test.js +index.html diff --git a/js/test/Ice/hold/Client.js b/js/test/Ice/hold/Client.js new file mode 100644 index 00000000000..dd8ca258cc9 --- /dev/null +++ b/js/test/Ice/hold/Client.js @@ -0,0 +1,307 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +(function(module, require, exports) +{ + var Ice = require("icejs").Ice; + var Test = require("Test").Test; + var Promise = Ice.Promise; + + function loop(fn, repetitions, condition) + { + var i = 0; + var next = function() + { + while(i++ < repetitions && (!condition || condition.value)) + { + var r = fn.call(i); + if(r) + { + return r.then(next); + } + } + }; + return next(); + } + + var allTests = function(out, communicator) + { + var failCB = function() { test(false); }; + var hold, holdOneway, holdSerialized, holdSerializedOneway; + var condition = { value: true }; + var value = 0; + + var p = new Promise(); + var test = function(b) + { + if(!b) + { + try + { + throw new Error("test failed"); + } + catch(err) + { + p.fail(err); + throw err; + } + } + }; + + var seq; + Promise.try( + function() + { + var ref = "hold:default -p 12010"; + return Test.HoldPrx.checkedCast(communicator.stringToProxy(ref)); + } + ).then( + function(obj) + { + test(obj !== null); + hold = obj; + holdOneway = Test.HoldPrx.uncheckedCast(hold.ice_oneway()); + + var refSerialized = "hold:default -p 12011"; + return Test.HoldPrx.checkedCast(communicator.stringToProxy(refSerialized)); + } + ).then( + function(obj) + { + test(obj !== null); + holdSerialized = obj; + holdSerializedOneway = Test.HoldPrx.uncheckedCast(holdSerialized.ice_oneway()); + + out.write("changing state between active and hold rapidly... "); + + var i; + var r = new Ice.Promise().succeed(); + for(i = 0; i < 1; ++i) + { + r = r.then(function() { return hold.putOnHold(0); }); + } + for(i = 0; i < 1; ++i) + { + r = r.then(function() { return holdOneway.putOnHold(0); }); + } + for(i = 0; i < 1; ++i) + { + r = r.then(function() { return holdSerialized.putOnHold(0); }); + } + for(i = 0; i < 1; ++i) + { + r = r.then(function() { return holdSerializedOneway.putOnHold(0); }); + } + return r; + } + ).then( + function() + { + out.writeLine("ok"); + + out.write("testing without serialize mode... "); + var result = null; + condition.value = true; + return loop(function() + { + var expected = value; + var result = hold.set(value + 1, 3).then(function(v) + { + condition.value = (v == expected); + }); + ++value; + if(value % 100 === 0) + { + return result; + } + return null; + }, 100000, condition); + } + ).then( + function() + { + test(!condition.value); + out.writeLine("ok"); + + out.write("testing with serialize mode... "); + + condition.value = true; + var result; + return loop( + function() + { + var expected = value; + result = holdSerialized.set(value + 1, 1).then(function(v) + { + condition.value = (v == expected); + }); + ++value; + if(value % 100 === 0) + { + return result; + } + return null; + }, 1000, condition); + } + ).then( + function() + { + test(condition.value); + return loop(function() + { + holdSerializedOneway.setOneway(value + 1, value); + ++value; + if((value % 100) === 0) + { + holdSerializedOneway.putOnHold(1); + } + }, 3000); + } + ).then( + function() + { + out.writeLine("ok"); + + out.write("testing serialization... "); + + condition.value = true; + value = 0; + return holdSerialized.set(value, 0); + } + ).then( + function() + { + return loop( + function() + { + // Create a new proxy for each request + var result = holdSerialized.ice_oneway().setOneway(value + 1, value); + ++value; + if((value % 100) === 0) + { + return result.then(function() + { + return holdSerialized.ice_getConnection().then( + function(con) + { + return con.close(false); + }); + }); + } + return null; + }, 1000); + } + ).then( + function() + { + out.writeLine("ok"); + + out.write("testing waitForHold... "); + + return hold.waitForHold().then( + function() + { + return hold.waitForHold(); + } + ).then( + function() + { + return loop(function(i) + { + var r = hold.ice_oneway().ice_ping(); + if((i % 20) == 0) + { + r = r.then(function() { return hold.putOnHold(0); }) + } + return r; + }, 100); + } + ).then( + function() + { + return hold.putOnHold(-1); + } + ).then( + function() + { + return hold.ice_ping(); + } + ).then( + function() + { + return hold.putOnHold(-1); + } + ).then( + function() + { + return hold.ice_ping(); + } + ); + } + ).then( + function() + { + out.writeLine("ok"); + + out.write("changing state to hold and shutting down server... "); + return hold.shutdown(); + } + ).then( + function() + { + out.writeLine("ok"); + p.succeed(); + }, + function(ex) + { + out.writeLine("failed!"); + p.fail(ex); + }); + return p; + }; + + var run = function(out, id) + { + // + // For this test, we want to disable retries. + // + id.properties.setProperty("Ice.RetryIntervals", "-1"); + + // + // We don't want connection warnings because of the timeout + // + id.properties.setProperty("Ice.Warn.Connections", "0"); + + // + // We need to send messages large enough to cause the transport + // buffers to fill up. + // + id.properties.setProperty("Ice.MessageSizeMax", "10000"); + + id.properties.setProperty("Ice.RetryIntervals", "-1"); + + var c = Ice.initialize(id); + return Promise.try( + function() + { + return allTests(out, c); + } + ).finally( + function() + { + return c.destroy(); + } + ); + }; + exports.__test__ = run; + exports.__runServer__ = true; +} +(typeof(global) !== "undefined" && typeof(global.process) !== "undefined" ? module : undefined, + typeof(global) !== "undefined" && typeof(global.process) !== "undefined" ? require : window.Ice.__require, + typeof(global) !== "undefined" && typeof(global.process) !== "undefined" ? exports : window)); diff --git a/js/test/Ice/hold/Makefile b/js/test/Ice/hold/Makefile new file mode 100644 index 00000000000..8f81ea44140 --- /dev/null +++ b/js/test/Ice/hold/Makefile @@ -0,0 +1,23 @@ +# ********************************************************************** +# +# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +top_srcdir = ../../.. + +TARGETS = index.html + +SLICES = Test.ice + +GEN_SRCS = $(patsubst %.ice, %.js, $(SLICES)) + +SRCS = Client.js + +include $(top_srcdir)/config/Make.rules.js + +SLICE2JSFLAGS := $(SLICE2JSFLAGS) -I$(slicedir) + diff --git a/js/test/Ice/hold/Makefile.mak b/js/test/Ice/hold/Makefile.mak new file mode 100644 index 00000000000..61a62482429 --- /dev/null +++ b/js/test/Ice/hold/Makefile.mak @@ -0,0 +1,20 @@ +# ********************************************************************** +# +# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +top_srcdir = ..\..\.. + +TARGETS = index.html + +GEN_SRCS = Test.js + +SRCS = Client.js + +!include $(top_srcdir)\config\Make.rules.mak.js + +SLICE2JSFLAGS = $(SLICE2JSFLAGS) -I"$(slicedir)" diff --git a/js/test/Ice/hold/Test.ice b/js/test/Ice/hold/Test.ice new file mode 100644 index 00000000000..dc7f1dbe67f --- /dev/null +++ b/js/test/Ice/hold/Test.ice @@ -0,0 +1,24 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#pragma once + +module Test +{ + +interface Hold +{ + void putOnHold(int seconds); + void waitForHold(); + void setOneway(int value, int expected); + int set(int value, int delay); + void shutdown(); +}; + +}; diff --git a/js/test/Ice/hold/run.js b/js/test/Ice/hold/run.js new file mode 100644 index 00000000000..8a86aa5886f --- /dev/null +++ b/js/test/Ice/hold/run.js @@ -0,0 +1,10 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +require("../../Common/Common").run(module); diff --git a/js/test/Ice/hold/run.py b/js/test/Ice/hold/run.py new file mode 100755 index 00000000000..ee3bc52bf23 --- /dev/null +++ b/js/test/Ice/hold/run.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ********************************************************************** +# +# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +import os, sys + +path = [ ".", "..", "../..", "../../..", "../../../.." ] +head = os.path.dirname(sys.argv[0]) +if len(head) > 0: + path = [os.path.join(head, p) for p in path] +path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "scripts", "TestUtil.py")) ] +if len(path) == 0: + raise RuntimeError("can't find toplevel directory!") +sys.path.append(os.path.join(path[0], "scripts")) +import TestUtil + +TestUtil.clientServerTest() |