summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2012-09-28 10:40:14 +0200
committerBenoit Foucher <benoit@zeroc.com>2012-09-28 10:40:14 +0200
commit8527be5894d0e0ba90db306b8ab124c04144ab44 (patch)
tree5edbe2c2104764f9b3ba8721e573b32fe32f9baf /java/src/Ice/ConnectionI.java
parentminor fix to build IceGridGUI in OsX (diff)
downloadice-8527be5894d0e0ba90db306b8ab124c04144ab44.tar.bz2
ice-8527be5894d0e0ba90db306b8ab124c04144ab44.tar.xz
ice-8527be5894d0e0ba90db306b8ab124c04144ab44.zip
Java & C# port
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java213
1 files changed, 199 insertions, 14 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 1feeb7b34e0..8e0c99e9761 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -9,6 +9,8 @@
package Ice;
+import Ice.Instrumentation.InvocationObserver;
+
public final class ConnectionI extends IceInternal.EventHandler implements Connection
{
public interface StartCallback
@@ -248,6 +250,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
synchronized public void
+ updateObserver()
+ {
+ if(_state < StateNotValidated || _state > StateClosed)
+ {
+ return;
+ }
+
+ assert(_instance.initializationData().observer != null);
+ _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer);
+ if(_observer != null)
+ {
+ _observer.attach();
+ }
+ }
+
+ synchronized public void
monitor(long now)
{
if(_state != StateActive)
@@ -291,6 +312,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint);
+
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -365,6 +388,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ out.__attachRemoteObserver(initConnectionInfo(), _endpoint);
+
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -615,7 +640,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
flushBatchRequests()
{
- IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance);
+ final InvocationObserver observer = IceInternal.ObserverHelper.get(_instance, __flushBatchRequests_name);
+ IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, observer);
out.invoke();
}
@@ -681,6 +707,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw (Ice.LocalException)_exception.fillInStackTrace();
}
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint);
+
if(_batchRequestNum == 0)
{
out.sent(false);
@@ -738,6 +766,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw (Ice.LocalException)_exception.fillInStackTrace();
}
+ outAsync.__attachRemoteObserver(initConnectionInfo(), _endpoint);
+
if(_batchRequestNum == 0)
{
int status = IceInternal.AsyncStatus.Sent;
@@ -791,7 +821,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateFinished)
{
- _reaper.add(this);
+ _reaper.add(this, _observer);
}
notifyAll();
}
@@ -825,7 +855,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateFinished)
{
- _reaper.add(this);
+ _reaper.add(this, _observer);
}
notifyAll();
}
@@ -934,12 +964,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
unscheduleTimeout(current.operation);
if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty())
{
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.pos());
+ }
if(!_transceiver.write(_writeStream.getBuffer()))
{
assert(!_writeStream.isEmpty());
scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
return;
}
+ if(_observer != null)
+ {
+ observerFinishWrite(_writeStream.pos());
+ }
assert(!_writeStream.getBuffer().b.hasRemaining());
}
if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty())
@@ -953,6 +991,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(!_readStream.getBuffer().b.hasRemaining());
_readHeader = false;
+ if(_observer != null)
+ {
+ _observer.receivedBytes(IceInternal.Protocol.headerSize);
+ }
+
int pos = _readStream.pos();
if(pos < IceInternal.Protocol.headerSize)
{
@@ -1008,12 +1051,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else
{
+ if(_observer != null)
+ {
+ observerStartRead(_readStream.pos());
+ }
if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData))
{
assert(!_readStream.isEmpty());
scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout());
return;
}
+ if(_observer != null)
+ {
+ observerFinishRead(_readStream.pos());
+ }
assert(!_readStream.getBuffer().b.hasRemaining());
}
}
@@ -1235,7 +1286,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else if(_state == StateFinished)
{
- _reaper.add(this);
+ _reaper.add(this, _observer);
}
notifyAll();
}
@@ -1361,7 +1412,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateFinished);
if(_dispatchCount == 0)
{
- _reaper.add(this);
+ _reaper.add(this, _observer);
}
}
}
@@ -1420,10 +1471,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
throw (Ice.LocalException)_exception.fillInStackTrace();
}
- ConnectionInfo info = _transceiver.getInfo();
- info.adapterName = _adapter != null ? _adapter.getName() : "";
- info.incoming = _connector == null;
- return info;
+ return initConnectionInfo();
}
public String
@@ -1457,7 +1505,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateFinished)
{
- _reaper.add(this);
+ _reaper.add(this, _observer);
}
notifyAll();
}
@@ -1781,6 +1829,34 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ if(_instance.initializationData().observer != null)
+ {
+ Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state);
+ Ice.Instrumentation.ConnectionState newState = toConnectionState(state);
+ if(oldState != newState)
+ {
+ _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer);
+ if(_observer != null)
+ {
+ _observer.attach();
+ }
+ }
+ if(_observer != null && state == StateClosed && _exception != null)
+ {
+ if(!(_exception instanceof CloseConnectionException ||
+ _exception instanceof ForcedCloseConnectionException ||
+ _exception instanceof ConnectionTimeoutException ||
+ _exception instanceof CommunicatorDestroyedException ||
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state == StateClosing)))
+ {
+ _observer.failed(_exception.ice_name());
+ }
+ }
+ }
_state = state;
notifyAll();
@@ -1882,12 +1958,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_writeStream.prepareWrite();
}
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.pos());
+ }
if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer()))
{
scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout());
_threadPool.update(this, operation, IceInternal.SocketOperation.Write);
return false;
}
+ if(_observer != null)
+ {
+ observerFinishWrite(_writeStream.pos());
+ }
}
else // The client side has the passive role for connection validation.
{
@@ -1897,12 +1981,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_readStream.pos(0);
}
+ if(_observer != null)
+ {
+ observerStartRead(_readStream.pos());
+ }
if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData))
{
scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout());
_threadPool.update(this, operation, IceInternal.SocketOperation.Read);
return false;
}
+ if(_observer != null)
+ {
+ observerFinishRead(_readStream.pos());
+ }
assert(_readStream.pos() == IceInternal.Protocol.headerSize);
_readStream.pos(0);
@@ -2008,12 +2100,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Send the message.
//
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.pos());
+ }
if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer()))
{
assert(!_writeStream.isEmpty());
scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
return callbacks;
}
+ if(_observer != null)
+ {
+ observerFinishWrite(_writeStream.pos());
+ }
}
}
catch(Ice.LocalException ex)
@@ -2071,8 +2171,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
}
+ if(_observer != null)
+ {
+ observerStartWrite(message.stream.pos());
+ }
if(_transceiver.write(message.stream.getBuffer()))
{
+ if(_observer != null)
+ {
+ observerFinishWrite(message.stream.pos());
+ }
int status = IceInternal.AsyncStatus.Sent;
if(message.sent(this, false))
{
@@ -2459,6 +2567,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private ConnectionInfo
+ initConnectionInfo()
+ {
+ if(_info != null)
+ {
+ return _info;
+ }
+
+ ConnectionInfo info = _transceiver.getInfo();
+ info.connectionId = _endpoint.connectionId();
+ info.adapterName = _adapter != null ? _adapter.getName() : "";
+ info.incoming = _connector == null;
+ if(_state > StateNotInitialized)
+ {
+ _info = info; // Cache the connection information only if initialized.
+ }
+ return info;
+ }
+
+ private Ice.Instrumentation.ConnectionState
+ toConnectionState(int state)
+ {
+ return connectionStateMap[state];
+ }
+
private void
warning(String msg, Exception ex)
{
@@ -2470,6 +2603,42 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_logger.warning(s);
}
+ private void
+ observerStartRead(int pos)
+ {
+ if(_readStreamPos > 0)
+ {
+ _observer.receivedBytes(pos - _readStreamPos);
+ }
+ _readStreamPos = pos;
+ }
+
+ private void
+ observerFinishRead(int pos)
+ {
+ assert(pos >= _readStreamPos);
+ _observer.receivedBytes(pos - _readStreamPos);
+ _readStreamPos = 0;
+ }
+
+ private void
+ observerStartWrite(int pos)
+ {
+ if(_writeStreamPos > 0)
+ {
+ _observer.sentBytes(pos - _writeStreamPos);
+ }
+ _writeStreamPos = pos;
+ }
+
+ private void
+ observerFinishWrite(int pos)
+ {
+ assert(pos >= _writeStreamPos);
+ _observer.sentBytes(pos - _writeStreamPos);
+ _writeStreamPos = 0;
+ }
+
private IceInternal.Incoming
getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId)
{
@@ -2519,7 +2688,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public IceInternal.Outgoing
getOutgoing(IceInternal.RequestHandler handler, String operation, OperationMode mode,
- java.util.Map<String, String> context)
+ java.util.Map<String, String> context, InvocationObserver observer)
throws IceInternal.LocalExceptionWrapper
{
IceInternal.Outgoing out = null;
@@ -2530,20 +2699,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_outgoingCache == null)
{
- out = new IceInternal.Outgoing(handler, operation, mode, context);
+ out = new IceInternal.Outgoing(handler, operation, mode, context, observer);
}
else
{
out = _outgoingCache;
_outgoingCache = _outgoingCache.next;
- out.reset(handler, operation, mode, context);
+ out.reset(handler, operation, mode, context, observer);
out.next = null;
}
}
}
else
{
- out = new IceInternal.Outgoing(handler, operation, mode, context);
+ out = new IceInternal.Outgoing(handler, operation, mode, context, observer);
}
return out;
@@ -2709,6 +2878,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private boolean _readHeader;
private IceInternal.BasicStream _writeStream;
+ private Ice.Instrumentation.ConnectionObserver _observer;
+ private int _readStreamPos;
+ private int _writeStreamPos;
+
private int _dispatchCount;
private int _state; // The current state.
@@ -2724,4 +2897,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private Ice.EncodingVersion _readProtocolEncoding = new Ice.EncodingVersion();
private int _cacheBuffers;
+
+ private Ice.ConnectionInfo _info;
+
+ private static Ice.Instrumentation.ConnectionState connectionStateMap[] = {
+ Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized
+ Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated
+ Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive
+ Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
+ };
}