From 344a7fd6e0d716f81dc27495e97a7ad9c2ab07b8 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Tue, 1 Jul 2014 17:42:04 +0200 Subject: IceMX and Python support for the new collocation optimization --- java/src/Ice/AsyncResult.java | 17 ++++----- java/src/IceInternal/BatchOutgoing.java | 31 +++++++++-------- java/src/IceInternal/BatchOutgoingAsync.java | 14 ++++---- java/src/IceInternal/CollocatedObserverI.java | 32 +++++++++++++++++ java/src/IceInternal/CollocatedRequestHandler.java | 8 ++--- .../CommunicatorBatchOutgoingAsync.java | 20 +++++------ java/src/IceInternal/CommunicatorObserverI.java | 2 ++ java/src/IceInternal/InvocationObserverI.java | 32 ++++++++--------- java/src/IceInternal/Outgoing.java | 40 +++++++++++----------- java/src/IceInternal/OutgoingAsync.java | 22 ++++++------ 10 files changed, 126 insertions(+), 92 deletions(-) create mode 100644 java/src/IceInternal/CollocatedObserverI.java (limited to 'java/src') diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index bf636fa6872..706616fe32f 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -334,23 +334,24 @@ public class AsyncResult { if(_observer != null) { - _remoteObserver = _observer.getRemoteObserver(info, endpt, requestId, size); - if(_remoteObserver != null) + _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size); + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } - public void __attachCollocatedObserver(int requestId) + public void __attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) { if(_observer != null) { - _remoteObserver = _observer.getCollocatedObserver(requestId, + _childObserver = _observer.getCollocatedObserver(adapter, + requestId, _os.size() - IceInternal.Protocol.headerSize - 4); - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } @@ -535,7 +536,7 @@ public class AsyncResult protected Ice.Exception _exception; protected Ice.Instrumentation.InvocationObserver _observer; - protected Ice.Instrumentation.RemoteObserver _remoteObserver; + protected Ice.Instrumentation.ChildInvocationObserver _childObserver; private IceInternal.CallbackBase _callback; } diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index b60e29268aa..8647e89ccde 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -172,10 +172,10 @@ public final class BatchOutgoing implements OutgoingMessageCallback synchronized public void sent() { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.detach(); + _childObserver = null; } _sent = true; notify(); @@ -184,11 +184,11 @@ public final class BatchOutgoing implements OutgoingMessageCallback public synchronized void finished(Ice.Exception ex, boolean sent) { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + _childObserver = null; } _exception = ex; notify(); @@ -205,22 +205,23 @@ public final class BatchOutgoing implements OutgoingMessageCallback { if(_observer != null) { - _remoteObserver = _observer.getRemoteObserver(info, endpt, 0, size); - if(_remoteObserver != null) + _childObserver = _observer.getRemoteObserver(info, endpt, 0, size); + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } - public void attachCollocatedObserver(int requestId) + public void + attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) { if(_observer != null) { - _remoteObserver = _observer.getCollocatedObserver(requestId, _os.size() - Protocol.headerSize - 4); - if(_remoteObserver != null) + _childObserver = _observer.getCollocatedObserver(adapter, requestId, _os.size() - Protocol.headerSize - 4); + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } @@ -232,6 +233,6 @@ public final class BatchOutgoing implements OutgoingMessageCallback private Ice.Exception _exception; private InvocationObserver _observer; - private Observer _remoteObserver; + private Observer _childObserver; } diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 6153624c88e..ad289e019fb 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -35,10 +35,10 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync { _state |= Done | OK | Sent; //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.detach(); + _childObserver = null; } if(_timeoutRequestHandler != null) { @@ -59,11 +59,11 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync public void __finished(Ice.Exception exc, boolean sent) { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.failed(exc.ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); + _childObserver = null; } if(_timeoutRequestHandler != null) { diff --git a/java/src/IceInternal/CollocatedObserverI.java b/java/src/IceInternal/CollocatedObserverI.java new file mode 100644 index 00000000000..e2355935dca --- /dev/null +++ b/java/src/IceInternal/CollocatedObserverI.java @@ -0,0 +1,32 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class CollocatedObserverI + extends IceMX.ObserverWithDelegate + implements Ice.Instrumentation.CollocatedObserver +{ + public void + reply(final int size) + { + forEach(new MetricsUpdate() + { + public void + update(IceMX.CollocatedMetrics v) + { + v.replySize += size; + } + }); + if(_delegate != null) + { + _delegate.reply(size); + } + } +} diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 13965c3c10b..245d714a64b 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -315,7 +315,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - out.attachCollocatedObserver(requestId); + out.attachCollocatedObserver(_adapter, requestId); if(_reference.getInvocationTimeout() > 0) { @@ -348,7 +348,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - outAsync.__attachCollocatedObserver(requestId); + outAsync.__attachCollocatedObserver(_adapter, requestId); _adapter.getThreadPool().execute(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false)); @@ -395,7 +395,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - out.attachCollocatedObserver(0); + out.attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { @@ -454,7 +454,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - outAsync.__attachCollocatedObserver(0); + outAsync.__attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java index ea0a8d26177..8f7f2707a7f 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -50,10 +50,10 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult public boolean __sent() { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.detach(); + _childObserver = null; } check(false); return false; @@ -62,11 +62,11 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult public void __finished(Ice.LocalException ex, boolean sent) { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + _childObserver = null; } check(false); } @@ -76,11 +76,11 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult { if(CommunicatorBatchOutgoingAsync.this._observer != null) { - _remoteObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt, + _childObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt, requestId, size); - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } diff --git a/java/src/IceInternal/CommunicatorObserverI.java b/java/src/IceInternal/CommunicatorObserverI.java index bf23296294a..d0451465955 100644 --- a/java/src/IceInternal/CommunicatorObserverI.java +++ b/java/src/IceInternal/CommunicatorObserverI.java @@ -617,6 +617,8 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { _invocations.registerSubMap("Remote", RemoteMetrics.class, InvocationMetrics.class.getDeclaredField("remotes")); + _invocations.registerSubMap("Collocated", CollocatedMetrics.class, + InvocationMetrics.class.getDeclaredField("collocated")); } catch(Exception ex) { diff --git a/java/src/IceInternal/InvocationObserverI.java b/java/src/IceInternal/InvocationObserverI.java index 7997a22d9d0..6745c281ef8 100644 --- a/java/src/IceInternal/InvocationObserverI.java +++ b/java/src/IceInternal/InvocationObserverI.java @@ -114,7 +114,7 @@ public class InvocationObserverI private Ice.EndpointInfo _endpointInfo; }; - static public final class CollocatedInvocationHelper extends MetricsHelper + static public final class CollocatedInvocationHelper extends MetricsHelper { static private final AttributeResolver _attributes = new AttributeResolver() { @@ -134,15 +134,16 @@ public class InvocationObserverI } }; - CollocatedInvocationHelper(int requestId, int size) + CollocatedInvocationHelper(Ice.ObjectAdapter adapter, int requestId, int size) { super(_attributes); + _id = adapter.getName(); _requestId = requestId; _size = size; } public void - initMetrics(RemoteMetrics v) + initMetrics(CollocatedMetrics v) { v.size += _size; } @@ -150,10 +151,6 @@ public class InvocationObserverI public String getId() { - if(_id == null) - { - _id = Integer.toString(_requestId); - } return _id; } @@ -171,7 +168,7 @@ public class InvocationObserverI final private int _requestId; final private int _size; - private String _id; + final private String _id; }; public void @@ -209,19 +206,20 @@ public class InvocationObserverI delegate); } - public Ice.Instrumentation.RemoteObserver - getCollocatedObserver(int requestId, int sz) + public Ice.Instrumentation.CollocatedObserver + getCollocatedObserver(Ice.ObjectAdapter adapter, int requestId, int sz) { - Ice.Instrumentation.RemoteObserver delegate = null; + Ice.Instrumentation.CollocatedObserver delegate = null; if(_delegate != null) { - delegate = _delegate.getCollocatedObserver(requestId, sz); + delegate = _delegate.getCollocatedObserver(adapter, requestId, sz); } - return (Ice.Instrumentation.RemoteObserver)getObserver("Collocated", - new CollocatedInvocationHelper(requestId, sz), - RemoteMetrics.class, - RemoteObserverI.class, - delegate); + return (Ice.Instrumentation.CollocatedObserver)getObserver( + "Collocated", + new CollocatedInvocationHelper(adapter, requestId, sz), + CollocatedMetrics.class, + CollocatedObserverI.class, + delegate); } final MetricsUpdate _incrementRetry = new MetricsUpdate() diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index ac06172ce72..3a14e835110 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -9,7 +9,7 @@ package IceInternal; -import Ice.Instrumentation.RemoteObserver; +import Ice.Instrumentation.ChildInvocationObserver; import Ice.Instrumentation.InvocationObserver; public final class Outgoing implements OutgoingMessageCallback @@ -255,10 +255,10 @@ public final class Outgoing implements OutgoingMessageCallback { if(_proxy.__reference().getMode() != Reference.ModeTwoway) { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.detach(); + _childObserver = null; } _state = StateOK; } @@ -273,11 +273,11 @@ public final class Outgoing implements OutgoingMessageCallback assert(_state <= StateInProgress); - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.reply(is.size() - Protocol.headerSize - 4); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.reply(is.size() - Protocol.headerSize - 4); + _childObserver.detach(); + _childObserver = null; } if(_is == null) @@ -418,11 +418,11 @@ public final class Outgoing implements OutgoingMessageCallback finished(Ice.Exception ex, boolean sent) { assert(_state <= StateInProgress); - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + _childObserver = null; } _state = StateFailed; _exception = ex; @@ -520,23 +520,23 @@ public final class Outgoing implements OutgoingMessageCallback { if(_observer != null) { - _remoteObserver = _observer.getRemoteObserver(info, endpt, requestId, size); - if(_remoteObserver != null) + _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size); + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } public void - attachCollocatedObserver(int requestId) + attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) { if(_observer != null) { - _remoteObserver = _observer.getCollocatedObserver(requestId, _os.size() - Protocol.headerSize - 4); - if(_remoteObserver != null) + _childObserver = _observer.getCollocatedObserver(adapter, requestId, _os.size() - Protocol.headerSize - 4); + if(_childObserver != null) { - _remoteObserver.attach(); + _childObserver.attach(); } } } @@ -659,7 +659,7 @@ public final class Outgoing implements OutgoingMessageCallback private int _state; private InvocationObserver _observer; - private RemoteObserver _remoteObserver; + private ChildInvocationObserver _childObserver; public Outgoing next; // For use by Ice.ObjectPrxHelperBase diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 41c56076a94..9d18c397d6a 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -124,10 +124,10 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa if(!_proxy.ice_isTwoway()) { - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.detach(); + _childObserver = null; } if(_timeoutRequestHandler != null) { @@ -154,11 +154,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa synchronized(_monitor) { assert((_state & Done) == 0); - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.failed(exc.ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); + _childObserver = null; } if(_timeoutRequestHandler != null) { @@ -197,11 +197,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa synchronized(_monitor) { assert(_exception == null && (_state & Done) == 0); - if(_remoteObserver != null) + if(_childObserver != null) { - _remoteObserver.reply(is.size() - Protocol.headerSize - 4); - _remoteObserver.detach(); - _remoteObserver = null; + _childObserver.reply(is.size() - Protocol.headerSize - 4); + _childObserver.detach(); + _childObserver = null; } if(_timeoutRequestHandler != null) -- cgit v1.2.3