summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2014-07-23 11:45:18 -0230
committerJoe George <joe@zeroc.com>2014-07-23 11:45:18 -0230
commita030c8fb975f34941812bff8b883dc1ed07ecf61 (patch)
treed36299f6cadd6146cb7f45427d98fe682c133f5f /java/src
parentICE-5580 - port connection ACM functionality to scripting languages (diff)
downloadice-a030c8fb975f34941812bff8b883dc1ed07ecf61.tar.bz2
ice-a030c8fb975f34941812bff8b883dc1ed07ecf61.tar.xz
ice-a030c8fb975f34941812bff8b883dc1ed07ecf61.zip
Java and C# IceGrid Discovery Plugins
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/ObjectPrx.java84
-rw-r--r--java/src/IceDiscovery/LookupI.java52
-rw-r--r--java/src/IceGrid/DiscoveryPluginFactoryI.java19
-rw-r--r--java/src/IceGrid/DiscoveryPluginI.java464
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java73
5 files changed, 587 insertions, 105 deletions
diff --git a/java/src/Ice/ObjectPrx.java b/java/src/Ice/ObjectPrx.java
index 113addd7daf..b14dfd7ec83 100644
--- a/java/src/Ice/ObjectPrx.java
+++ b/java/src/Ice/ObjectPrx.java
@@ -96,7 +96,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context, Callback_Object_ice_isA __cb);
-
+
/**
* Tests whether this proxy supports a given interface.
*
@@ -105,9 +105,9 @@ public interface ObjectPrx
* @param __exceptionCb The asynchronous exception callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_isA(String __id, IceInternal.Functional_BoolCallback __responseCb,
+ AsyncResult begin_ice_isA(String __id, IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Tests whether this proxy supports a given interface.
*
@@ -117,7 +117,7 @@ public interface ObjectPrx
* @param __sentCb The asynchronous sent callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_isA(String __id, IceInternal.Functional_BoolCallback __responseCb,
+ AsyncResult begin_ice_isA(String __id, IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
@@ -130,10 +130,10 @@ public interface ObjectPrx
* @param __exceptionCb The asynchronous exception callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context,
- IceInternal.Functional_BoolCallback __responseCb,
+ AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context,
+ IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Tests whether this proxy supports a given interface.
*
@@ -144,8 +144,8 @@ public interface ObjectPrx
* @param __sentCb The asynchronous sent callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context,
- IceInternal.Functional_BoolCallback __responseCb,
+ AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context,
+ IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
@@ -217,7 +217,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_ping(java.util.Map<String, String> __context, Callback_Object_ice_ping __cb);
-
+
/**
* Tests whether the target object of this proxy can be reached.
*
@@ -225,9 +225,9 @@ public interface ObjectPrx
* @param __exceptionCb The asynchronous exception callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_ping(IceInternal.Functional_VoidCallback __responseCb,
+ AsyncResult begin_ice_ping(IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Tests whether the target object of this proxy can be reached.
*
@@ -236,10 +236,10 @@ public interface ObjectPrx
* @param __sentCb The asynchronous sent callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_ping(IceInternal.Functional_VoidCallback __responseCb,
+ AsyncResult begin_ice_ping(IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
-
+
/**
* Tests whether the target object of this proxy can be reached.
*
@@ -248,10 +248,10 @@ public interface ObjectPrx
* @param __exceptionCb The asynchronous exception callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_ping(java.util.Map<String, String> __context,
- IceInternal.Functional_VoidCallback __responseCb,
+ AsyncResult begin_ice_ping(java.util.Map<String, String> __context,
+ IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Tests whether the target object of this proxy can be reached.
*
@@ -261,8 +261,8 @@ public interface ObjectPrx
* @param __sentCb The asynchronous sent callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_ping(java.util.Map<String, String> __context,
- IceInternal.Functional_VoidCallback __responseCb,
+ AsyncResult begin_ice_ping(java.util.Map<String, String> __context,
+ IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
@@ -338,7 +338,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_ids(java.util.Map<String, String> __context, Callback_Object_ice_ids __cb);
-
+
/**
* Returns the Slice type IDs of the interfaces supported by the target object of this proxy.
*
@@ -346,9 +346,9 @@ public interface ObjectPrx
* @param __exceptionCb The asynchronous exception callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb,
+ AsyncResult begin_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Returns the Slice type IDs of the interfaces supported by the target object of this proxy.
*
@@ -357,10 +357,10 @@ public interface ObjectPrx
* @param __sentCb The asynchronous sent callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb,
+ AsyncResult begin_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
-
+
/**
* Returns the Slice type IDs of the interfaces supported by the target object of this proxy.
*
@@ -370,9 +370,9 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_ids(java.util.Map<String, String> __context,
- IceInternal.Functional_GenericCallback1<String[]> __responseCb,
+ IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Returns the Slice type IDs of the interfaces supported by the target object of this proxy.
*
@@ -383,7 +383,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_ids(java.util.Map<String, String> __context,
- IceInternal.Functional_GenericCallback1<String[]> __responseCb,
+ IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
@@ -459,7 +459,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_id(java.util.Map<String, String> __context, Callback_Object_ice_id __cb);
-
+
/**
* Returns the Slice type ID of the most-derived interface supported by the target object of this proxy.
*
@@ -467,9 +467,9 @@ public interface ObjectPrx
* @param __exceptionCb The asynchronous exception callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_id(IceInternal.Functional_GenericCallback1<String> __responseCb,
+ AsyncResult begin_ice_id(IceInternal.Functional_GenericCallback1<String> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Returns the Slice type ID of the most-derived interface supported by the target object of this proxy.
*
@@ -478,10 +478,10 @@ public interface ObjectPrx
* @param __sentCb The asynchronous sent callback object.
* @return The asynchronous result object.
**/
- AsyncResult begin_ice_id(IceInternal.Functional_GenericCallback1<String> __responseCb,
+ AsyncResult begin_ice_id(IceInternal.Functional_GenericCallback1<String> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
-
+
/**
* Returns the Slice type ID of the most-derived interface supported by the target object of this proxy.
*
@@ -493,7 +493,7 @@ public interface ObjectPrx
AsyncResult begin_ice_id(java.util.Map<String, String> __context,
IceInternal.Functional_GenericCallback1<String> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Returns the Slice type ID of the most-derived interface supported by the target object of this proxy.
*
@@ -504,7 +504,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_id(java.util.Map<String, String> __context,
- IceInternal.Functional_GenericCallback1<String> __responseCb,
+ IceInternal.Functional_GenericCallback1<String> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
@@ -649,12 +649,12 @@ public interface ObjectPrx
**/
AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams,
java.util.Map<String, String> __context, Callback_Object_ice_invoke __cb);
-
+
public interface FunctionalCallback_Object_ice_invoke_Response
{
void apply(boolean result, byte[] outArgs);
}
-
+
/**
* Invokes an operation dynamically and asynchronously.
*
@@ -674,7 +674,7 @@ public interface ObjectPrx
FunctionalCallback_Object_ice_invoke_Response __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
-
+
/**
* Invokes an operation dynamically and asynchronously.
*
@@ -692,7 +692,7 @@ public interface ObjectPrx
AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams,
FunctionalCallback_Object_ice_invoke_Response __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb);
-
+
/**
* Invokes an operation dynamically and asynchronously.
*
@@ -714,7 +714,7 @@ public interface ObjectPrx
FunctionalCallback_Object_ice_invoke_Response __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb);
-
+
/**
* Invokes an operation dynamically and asynchronously.
*
@@ -818,7 +818,7 @@ public interface ObjectPrx
*
* @return The facet for this proxy. If the proxy uses the default facet, the return value is the empty string.
**/
- String ice_getFacet();
+ String ice_getFacet();
/**
* Creates a new proxy that is identical to this proxy, except for the facet.
@@ -1118,7 +1118,7 @@ public interface ObjectPrx
*
* @param connectionId The connection ID for the new proxy. An empty string removes the
* connection ID.
- *
+ *
* @return A new proxy with the specified connection ID.
**/
ObjectPrx ice_connectionId(String connectionId);
@@ -1183,7 +1183,7 @@ public interface ObjectPrx
* @return The asynchronous result object.
**/
AsyncResult begin_ice_flushBatchRequests(Callback_Object_ice_flushBatchRequests __cb);
-
+
/**
* Asynchronously flushes any pending batched requests for this communicator. The call does not block.
*
diff --git a/java/src/IceDiscovery/LookupI.java b/java/src/IceDiscovery/LookupI.java
index d71a4e9a217..b7027ffadd7 100644
--- a/java/src/IceDiscovery/LookupI.java
+++ b/java/src/IceDiscovery/LookupI.java
@@ -24,12 +24,12 @@ class LookupI extends _LookupDisp
_nRetry = retryCount;
}
- public T
+ public T
getId()
{
return _id;
}
-
+
public boolean
addCallback(AmdCB cb)
{
@@ -62,7 +62,7 @@ class LookupI extends _LookupDisp
{
return _proxies.size() == 0 && --_nRetry >= 0;
}
-
+
public boolean
response(Ice.ObjectPrx proxy, boolean isReplicaGroup)
{
@@ -111,14 +111,14 @@ class LookupI extends _LookupDisp
}
sendResponse(result.ice_endpoints(endpoints.toArray(new Ice.Endpoint[endpoints.size()])));
}
-
- public void
+
+ public void
runTimerTask()
{
adapterRequestTimedOut(this);
}
- private void
+ private void
sendResponse(Ice.ObjectPrx proxy)
{
for(Ice.AMD_Locator_findAdapterById cb : _callbacks)
@@ -127,7 +127,7 @@ class LookupI extends _LookupDisp
}
_callbacks.clear();
}
-
+
private List<Ice.ObjectPrx> _proxies = new ArrayList<Ice.ObjectPrx>();
private long _start;
private long _latency;
@@ -135,19 +135,19 @@ class LookupI extends _LookupDisp
private class ObjectRequest extends Request<Ice.Identity, Ice.AMD_Locator_findObjectById>
{
- public
+ public
ObjectRequest(Ice.Identity id, int retryCount)
{
super(id, retryCount);
}
- public void
+ public void
response(Ice.ObjectPrx proxy)
{
finished(proxy);
}
-
- public void
+
+ public void
finished(Ice.ObjectPrx proxy)
{
for(Ice.AMD_Locator_findObjectById cb : _callbacks)
@@ -164,7 +164,7 @@ class LookupI extends _LookupDisp
};
public LookupI(LocatorRegistryI registry, LookupPrx lookup, Ice.Properties properties)
- {
+ {
_registry = registry;
_lookup = lookup;
_timeout = properties.getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300);
@@ -174,20 +174,20 @@ class LookupI extends _LookupDisp
_timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer();
}
- void
+ void
setLookupReply(LookupReplyPrx lookupReply)
{
_lookupReply = lookupReply;
}
- public void
+ public void
findObjectById(String domainId, Ice.Identity id, IceDiscovery.LookupReplyPrx reply, Ice.Current c)
{
if(!domainId.equals(_domainId))
{
return; // Ignore.
}
-
+
Ice.ObjectPrx proxy = _registry.findObject(id);
if(proxy != null)
{
@@ -198,14 +198,14 @@ class LookupI extends _LookupDisp
}
}
- public void
+ public void
findAdapterById(String domainId, String adapterId, IceDiscovery.LookupReplyPrx reply, Ice.Current c)
{
if(!domainId.equals(_domainId))
{
return; // Ignore.
}
-
+
Ice.BooleanHolder isReplicaGroup = new Ice.BooleanHolder();
Ice.ObjectPrx proxy = _registry.findAdapter(adapterId, isReplicaGroup);
if(proxy != null)
@@ -217,7 +217,7 @@ class LookupI extends _LookupDisp
}
}
- synchronized void
+ synchronized void
findObject(Ice.AMD_Locator_findObjectById cb, Ice.Identity id)
{
ObjectRequest request = _objectRequests.get(id);
@@ -226,15 +226,15 @@ class LookupI extends _LookupDisp
request = new ObjectRequest(id, _retryCount);
_objectRequests.put(id, request);
}
-
+
if(request.addCallback(cb))
{
_lookup.begin_findObjectById(_domainId, id, _lookupReply);
_timer.schedule(request, _timeout);
}
}
-
- synchronized void
+
+ synchronized void
findAdapter(Ice.AMD_Locator_findAdapterById cb, String adapterId)
{
AdapterRequest request = _adapterRequests.get(adapterId);
@@ -243,7 +243,7 @@ class LookupI extends _LookupDisp
request = new AdapterRequest(adapterId, _retryCount);
_adapterRequests.put(adapterId, request);
}
-
+
if(request.addCallback(cb))
{
_lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply);
@@ -259,7 +259,7 @@ class LookupI extends _LookupDisp
{
return;
}
-
+
request.response(proxy);
_timer.cancel(request);
_objectRequests.remove(id);
@@ -273,7 +273,7 @@ class LookupI extends _LookupDisp
{
return;
}
-
+
if(request.response(proxy, isReplicaGroup))
{
_timer.cancel(request);
@@ -289,7 +289,7 @@ class LookupI extends _LookupDisp
{
return;
}
-
+
if(request.retry())
{
_lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply);
@@ -310,7 +310,7 @@ class LookupI extends _LookupDisp
{
return;
}
-
+
if(request.retry())
{
_lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply);
diff --git a/java/src/IceGrid/DiscoveryPluginFactoryI.java b/java/src/IceGrid/DiscoveryPluginFactoryI.java
new file mode 100644
index 00000000000..4ac13e8b470
--- /dev/null
+++ b/java/src/IceGrid/DiscoveryPluginFactoryI.java
@@ -0,0 +1,19 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceGrid;
+
+public class DiscoveryPluginFactoryI implements Ice.PluginFactory
+{
+ public Ice.Plugin
+ create(Ice.Communicator communicator, String name, String[] args)
+ {
+ return new DiscoveryPluginI(communicator);
+ }
+}
diff --git a/java/src/IceGrid/DiscoveryPluginI.java b/java/src/IceGrid/DiscoveryPluginI.java
new file mode 100644
index 00000000000..cf9cfa66a29
--- /dev/null
+++ b/java/src/IceGrid/DiscoveryPluginI.java
@@ -0,0 +1,464 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceGrid;
+
+import java.util.List;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+class DiscoveryPluginI implements Ice.Plugin
+{
+
+ abstract private class Request
+ {
+ public
+ Request(LocatorI locator)
+ {
+ _locator = locator;
+ }
+
+ abstract public void
+ invoke(Ice.LocatorPrx locator);
+
+ abstract public void
+ response(Ice.ObjectPrx locator);
+
+ protected LocatorI _locator;
+ protected Ice.LocatorPrx _locatorPrx;
+ };
+
+ private class LocatorI extends Ice._LocatorDisp implements IceInternal.TimerTask
+ {
+ public
+ LocatorI(LookupPrx lookup, Ice.Properties properties)
+ {
+ _lookup = lookup;
+ _timeout = properties.getPropertyAsIntWithDefault("IceGridDiscovery.Timeout", 300) * 1000;
+ _retryCount = properties.getPropertyAsIntWithDefault("IceGridDiscovery.RetryCount",3);
+ _timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer();
+ _instanceName = properties.getProperty("IceGridDiscovery.InstanceName");
+ _warned = false;
+ _locator = lookup.ice_getCommunicator().getDefaultLocator();
+ _pendingRetryCount = 0;
+ }
+
+ public void
+ setLookupReply(LookupReplyPrx lookupReply)
+ {
+ _lookupReply = lookupReply;
+ }
+
+ public synchronized void
+ findObjectById_async(Ice.AMD_Locator_findObjectById amdCB, Ice.Identity id, Ice.Current curr)
+ {
+ ((LocatorI)this).invoke(null, new ObjectRequest((LocatorI)this, id, amdCB));
+ }
+
+ public synchronized void
+ findAdapterById_async(Ice.AMD_Locator_findAdapterById amdCB, String adapterId, Ice.Current curr)
+ {
+ ((LocatorI)this).invoke(null, new AdapterRequest((LocatorI)this, adapterId, amdCB));
+ }
+
+
+ public synchronized Ice.LocatorRegistryPrx
+ getRegistry(Ice.Current current)
+ {
+ Ice.LocatorPrx locator;
+ if(_locator != null)
+ {
+ ((LocatorI)this).queueRequest(null); // Search for locator if not already doing so.
+ while(_pendingRetryCount > 0)
+ {
+ try
+ {
+ wait();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+ }
+ }
+ locator = _locator;
+ return locator != null ? locator.getRegistry() : null;
+ }
+
+ public synchronized void
+ foundLocator(LocatorPrx locator)
+ {
+ if(locator == null)
+ {
+ return;
+ }
+
+ //
+ // If we already have a locator assigned, ensure the given locator
+ // has the same identity, otherwise ignore it.
+ //
+ if(_locator != null && !locator.ice_getIdentity().category.equals(_locator.ice_getIdentity().category))
+ {
+ if(!_warned)
+ {
+ _warned = true; // Only warn once
+
+ locator.ice_getCommunicator().getLogger().warning(
+ "received IceGrid locator with different instance name:\n" +
+ "using = `" + _locator.ice_getIdentity().category + "'\n" +
+ "received = `" + locator.ice_getIdentity().category + "'\n" +
+ "This is typically the case if multiple IceGrid registries with different " +
+ "nstance names are deployed and the property `IceGridDiscovery.InstanceName'" +
+ "is not set.");
+ }
+ return;
+ }
+
+ if(_pendingRetryCount > 0) // No need to retry, we found a locator
+ {
+ _timer.cancel(this);
+ _pendingRetryCount = 0;
+ }
+
+ if(_locator != null)
+ {
+ //
+ // We found another locator replica, append its endpoints to the
+ // current locator proxy endpoints.
+ //
+ List<Ice.Endpoint> newEndpoints = new ArrayList<Ice.Endpoint>(Arrays.asList(_locator.ice_getEndpoints()));
+ for(Ice.Endpoint p : locator.ice_getEndpoints())
+ {
+ //
+ // Only add endpoints if not already in the locator proxy endpoints
+ //
+ boolean found = false;
+ for(Ice.Endpoint q : newEndpoints)
+ {
+ if (p.equals(q))
+ {
+ found = true;
+ break;
+ }
+ }
+ if(!found)
+ {
+ newEndpoints.add(p);
+ }
+
+ }
+ _locator = (LocatorPrx)_locator.ice_endpoints(newEndpoints.toArray(new Ice.Endpoint[newEndpoints.size()]));
+ }
+ else
+ {
+ _locator = locator;
+ if(_instanceName.isEmpty())
+ {
+ _instanceName = _locator.ice_getIdentity().category;
+ }
+ }
+
+ //
+ // Send pending requests if any.
+ //
+ for(Request req : _pendingRequests)
+ {
+ req.invoke(_locator);
+ }
+ _pendingRequests.clear();
+ notifyAll();
+ }
+
+
+ public synchronized void
+ invoke(Ice.LocatorPrx locator, Request request)
+ {
+
+ if(_locator != null && !(_locator.equals(locator)))
+ {
+
+ request.invoke(_locator);
+ }
+ else
+ {
+
+ _locator = null;
+ queueRequest(request);
+ }
+ }
+
+ public void runTimerTask()
+ {
+ synchronized(this)
+ {
+
+ if(--_pendingRetryCount > 0)
+ {
+
+ _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _timer.schedule(this, _timeout);
+ }
+ else
+ {
+ assert !_pendingRequests.isEmpty();
+ for(Request req : _pendingRequests)
+ {
+ req.response(null);
+ }
+ _pendingRequests.clear();
+ notifyAll();
+
+ }
+ }
+ }
+
+ private void
+ queueRequest(Request request)
+ {
+ if(request != null)
+ {
+
+ _pendingRequests.add(request);
+ }
+
+ if(_pendingRetryCount == 0) // No request in progress
+ {
+
+ _pendingRetryCount = _retryCount;
+ _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _timer.schedule(this, _timeout);
+ }
+ }
+
+ private final LookupPrx _lookup;
+ private final int _timeout;
+ private final IceInternal.Timer _timer;
+ private final int _retryCount;
+
+ private String _instanceName;
+ private boolean _warned;
+ private LookupReplyPrx _lookupReply;
+ private Ice.LocatorPrx _locator;
+
+ private int _pendingRetryCount;
+ private List<Request> _pendingRequests = new ArrayList<Request>();;
+ };
+
+ private class LookupReplyI extends _LookupReplyDisp
+ {
+ public LookupReplyI(LocatorI locator)
+ {
+ _locator = locator;
+ }
+
+ public void
+ foundLocator(LocatorPrx locator, Ice.Current curr)
+ {
+ _locator.foundLocator(locator);
+ }
+
+ private final LocatorI _locator;
+ };
+
+ class ObjectRequest extends Request
+ {
+ public
+ ObjectRequest(LocatorI locator, Ice.Identity id, Ice.AMD_Locator_findObjectById amdCB)
+ {
+ super(locator);
+ _id = id;
+ _amdCB = amdCB;
+ }
+
+ public void
+ invoke(Ice.LocatorPrx l)
+ {
+ _locatorPrx = l;
+ l.begin_findObjectById(_id,
+ new Ice.Callback_Locator_findObjectById() {
+ public void
+ response(Ice.ObjectPrx proxy)
+ {
+ ObjectRequest.this.response(proxy);
+ }
+
+ public void
+ exception(Ice.UserException ex)
+ {
+ ObjectRequest.this.exception(ex);
+ }
+
+ public void
+ exception(Ice.LocalException ex)
+ {
+ ObjectRequest.this.exception(ex);
+ }
+ });
+ }
+
+ public void
+ response(Ice.ObjectPrx prx)
+ {
+ _amdCB.ice_response(prx);
+ }
+
+ public void
+ exception(Exception ex)
+ {
+ _locator.invoke(_locatorPrx, this);
+ }
+
+ private final Ice.Identity _id;
+ private final Ice.AMD_Locator_findObjectById _amdCB;
+ };
+
+ class AdapterRequest extends Request {
+
+ public
+ AdapterRequest(LocatorI locator, String adapterId, Ice.AMD_Locator_findAdapterById amdCB) {
+ super(locator);
+ _adapterId = adapterId;
+ _amdCB = amdCB;
+ }
+
+ public void
+ invoke(Ice.LocatorPrx l)
+ {
+ _locatorPrx = l;
+ l.begin_findAdapterById(_adapterId,
+ new Ice.Callback_Locator_findAdapterById()
+ {
+ public void
+ response(Ice.ObjectPrx proxy)
+ {
+ AdapterRequest.this.response(proxy);
+ }
+
+ public void
+ exception(Ice.UserException ex)
+ {
+ AdapterRequest.this.exception(ex);
+ }
+
+ public void
+ exception(Ice.LocalException ex)
+ {
+ AdapterRequest.this.exception(ex);
+ }
+ });
+ }
+
+ public void
+ response(Ice.ObjectPrx prx)
+ {
+ _amdCB.ice_response(prx);
+ }
+
+ public void
+ exception(Exception ex)
+ {
+ _locator.invoke(_locatorPrx, this); // Retry with new locator proxy.
+ }
+
+ private final String _adapterId;
+ private final Ice.AMD_Locator_findAdapterById _amdCB;
+ };
+
+ public
+ DiscoveryPluginI(Ice.Communicator communicator)
+ {
+ _communicator = communicator;
+ }
+
+ public void
+ initialize()
+ {
+ Ice.Properties properties = _communicator.getProperties();
+
+ boolean ipv4 = properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0;
+ String address;
+ if(ipv4)
+ {
+ address = properties.getPropertyWithDefault("IceGridDiscovery.Address", "239.255.0.1");
+ }
+ else
+ {
+ address = properties.getPropertyWithDefault("IceGridDiscovery.Address", "ff15::1");
+ }
+ int port = properties.getPropertyAsIntWithDefault("IceGridDiscovery.Port", 4061);
+ String intf = properties.getProperty("IceGridDiscovery.Interface");
+
+ if(properties.getProperty("IceGridDiscovery.Reply.Endpoints").isEmpty())
+ {
+ StringBuilder s = new StringBuilder();
+ s.append("udp");
+ if(!intf.isEmpty())
+ {
+ s.append(" -h \"").append(intf).append("\"");
+ }
+ properties.setProperty("IceGridDiscovery.Reply.Endpoints", s.toString());
+ }
+ if(properties.getProperty("IceGridDiscovery.Locator.Endpoints").isEmpty())
+ {
+ properties.setProperty("IceGridDiscovery.Locator.AdapterId", java.util.UUID.randomUUID().toString());
+ }
+
+ _replyAdapter = _communicator.createObjectAdapter("IceGridDiscovery.Reply");
+ _locatorAdapter = _communicator.createObjectAdapter("IceGridDiscovery.Locator");
+
+ // We don't want those adapters to be registered with the locator so clear their locator.
+ _replyAdapter.setLocator(null);
+ _locatorAdapter.setLocator(null);
+
+ String lookupEndpoints = properties.getProperty("IceGridDiscovery.Lookup");
+ if(lookupEndpoints.isEmpty())
+ {
+ StringBuilder s = new StringBuilder();
+ s.append("udp -h \"").append(address).append("\" -p ").append(port);
+ if(!intf.isEmpty())
+ {
+ s.append(" --interface \"").append(intf).append("\"");
+ }
+ lookupEndpoints = s.toString();
+ }
+
+ Ice.ObjectPrx lookupPrx = _communicator.stringToProxy("IceGridDiscovery/Lookup -d:" + lookupEndpoints);
+ lookupPrx = lookupPrx.ice_collocationOptimized(false); // No collocation optimization for the multicast proxy!
+ try
+ {
+ lookupPrx.ice_getConnection(); // Ensure we can establish a connection to the multicast proxy
+ }
+ catch(Ice.LocalException ex)
+ {
+ StringBuilder s = new StringBuilder();
+ s.append("unable to establish multicast connection, IceGrid discovery will be disabled:\n");
+ s.append("proxy = ").append(lookupPrx.toString()).append("\n");
+ throw new Ice.PluginInitializationException(s.toString());
+ }
+
+ LocatorI locator = new LocatorI(LookupPrxHelper.uncheckedCast(lookupPrx), properties);
+ _communicator.setDefaultLocator(Ice.LocatorPrxHelper.uncheckedCast(_locatorAdapter.addWithUUID(locator)));
+
+ Ice.ObjectPrx lookupReply = _replyAdapter.addWithUUID(new LookupReplyI(locator)).ice_datagram();
+ locator.setLookupReply(LookupReplyPrxHelper.uncheckedCast(lookupReply));
+
+ _replyAdapter.activate();
+ _locatorAdapter.activate();
+ }
+
+ public void
+ destroy()
+ {
+ _replyAdapter.destroy();
+ _locatorAdapter.destroy();
+ }
+
+ private Ice.Communicator _communicator;
+ private Ice.ObjectAdapter _locatorAdapter;
+ private Ice.ObjectAdapter _replyAdapter;
+}
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index d857b443475..1cae4d80b32 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -15,7 +15,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
class InvokeAll extends DispatchWorkItem
{
- public
+ public
InvokeAll(OutgoingMessageCallback out, BasicStream os, int requestId, int invokeNum, boolean batch)
{
_out = out;
@@ -33,7 +33,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
invokeAll(_os, _requestId, _invokeNum, _batch);
}
}
-
+
private final OutgoingMessageCallback _out;
private final BasicStream _os;
private final int _requestId;
@@ -43,13 +43,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
class InvokeAllAsync extends DispatchWorkItem
{
- public InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
+ public InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
boolean batch)
{
_outAsync = outAsync;
- _os = os;
- _requestId = requestId;
- _invokeNum = invokeNum;
+ _os = os;
+ _requestId = requestId;
+ _invokeNum = invokeNum;
_batch = batch;
}
@@ -61,7 +61,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
invokeAll(_os, _requestId, _invokeNum, _batch);
}
}
-
+
private final OutgoingAsyncMessageCallback _outAsync;
private final BasicStream _os;
private final int _requestId;
@@ -72,11 +72,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private void
fillInValue(BasicStream os, int pos, int value)
{
- os.pos(pos);
- os.writeInt(value);
+ os.rewriteInt(pos, value);
}
-
- public
+
+ public
CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter)
{
_reference = ref;
@@ -133,7 +132,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
synchronized(this)
{
_batchStream.swap(os);
-
+
if(_batchAutoFlush & (_batchStream.size() > _reference.getInstance().messageSizeMax()))
{
//
@@ -166,7 +165,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
//
_batchRequestNum = 0;
_batchMarker = 0;
-
+
//
// Check again if the last request doesn't exceed what we can send with the auto flush
//
@@ -182,7 +181,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchStream.writeBlob(lastRequest);
}
-
+
//
// Increment the number of requests in the batch.
//
@@ -219,14 +218,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
out.invokeCollocated(this);
return !_response && _reference.getInvocationTimeout() == 0;
}
-
+
public int
sendAsyncRequest(OutgoingAsyncMessageCallback outAsync)
{
return outAsync.__invokeCollocated(this);
}
-
- synchronized public void
+
+ synchronized public void
requestTimedOut(OutgoingMessageCallback out)
{
Integer requestId = _sendRequests.get(out);
@@ -367,19 +366,19 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
}
}
-
+
invokeNum = _batchRequestNum;
-
+
if(_batchRequestNum > 0)
{
if(_reference.getInvocationTimeout() > 0)
{
_sendRequests.put(out, 0);
}
-
+
assert(!_batchStream.isEmpty());
_batchStream.swap(out.os());
-
+
//
// Reset the batch stream.
//
@@ -390,7 +389,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_batchMarker = 0;
}
}
-
+
out.attachCollocatedObserver(_adapter, 0);
if(invokeNum > 0)
@@ -431,7 +430,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
}
}
-
+
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
{
@@ -442,7 +441,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
assert(!_batchStream.isEmpty());
_batchStream.swap(outAsync.__getOs());
-
+
//
// Reset the batch stream.
//
@@ -455,7 +454,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
outAsync.__attachCollocatedObserver(_adapter, 0);
-
+
if(invokeNum > 0)
{
_adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true));
@@ -471,7 +470,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
- public void
+ public void
sendResponse(int requestId, BasicStream os, byte status)
{
OutgoingAsync outAsync = null;
@@ -515,8 +514,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_adapter.decDirectCount();
}
-
- public void
+
+ public void
invokeException(int requestId, Ice.LocalException ex, int invokeNum)
{
if(requestId > 0)
@@ -553,7 +552,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
return null;
}
-
+
boolean
sent(OutgoingMessageCallback out)
{
@@ -570,7 +569,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
out.sent();
return true;
}
-
+
boolean
sentAsync(OutgoingAsyncMessageCallback outAsync)
{
@@ -590,7 +589,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
return true;
}
-
+
void
invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch)
{
@@ -602,7 +601,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
os.pos(Protocol.requestHdr.length);
}
-
+
if(_traceLevels.protocol >= 1)
{
fillInValue(os, 10, os.size());
@@ -621,7 +620,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
try
{
while(invokeNum > 0)
- {
+ {
try
{
_adapter.incDirectCount();
@@ -632,7 +631,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return;
}
- Incoming in = new Incoming(_reference.getInstance(), this, null, _adapter, _response, (byte)0,
+ Incoming in = new Incoming(_reference.getInstance(), this, null, _adapter, _response, (byte)0,
requestId);
try
{
@@ -716,10 +715,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private boolean _batchAutoFlush;
private int _requestId;
-
- private java.util.Map<OutgoingMessageCallback, Integer> _sendRequests =
+
+ private java.util.Map<OutgoingMessageCallback, Integer> _sendRequests =
new java.util.HashMap<OutgoingMessageCallback, Integer>();
- private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
+ private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>();
private java.util.Map<Integer, Outgoing> _requests = new java.util.HashMap<Integer, Outgoing>();