summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2001-11-26 22:56:03 +0000
committerMark Spruiell <mes@zeroc.com>2001-11-26 22:56:03 +0000
commit2c65289df827070be8a9f2f8e106edf8022134e4 (patch)
tree62cc5003d314e72545b4665b330c3eef300e9054 /java/src
parentminor fix (diff)
downloadice-2c65289df827070be8a9f2f8e106edf8022134e4.tar.bz2
ice-2c65289df827070be8a9f2f8e106edf8022134e4.tar.xz
ice-2c65289df827070be8a9f2f8e106edf8022134e4.zip
initial emitter impl
Diffstat (limited to 'java/src')
-rw-r--r--java/src/IceInternal/BasicStream.java18
-rw-r--r--java/src/IceInternal/Emitter.java466
-rw-r--r--java/src/IceInternal/EmitterFactory.java149
3 files changed, 633 insertions, 0 deletions
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index 8086ef10b20..62b0ff3e899 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -986,6 +986,24 @@ public class BasicStream
throw new Ice.NoUserExceptionFactoryException();
}
+ int
+ pos()
+ {
+ return _buf.pos;
+ }
+
+ void
+ pos(int p)
+ {
+ _buf.pos = p;
+ }
+
+ int
+ size()
+ {
+ return _buf.len;
+ }
+
private IceInternal.Instance _instance;
private Buffer _buf = new Buffer();
diff --git a/java/src/IceInternal/Emitter.java b/java/src/IceInternal/Emitter.java
new file mode 100644
index 00000000000..a83c57a7daf
--- /dev/null
+++ b/java/src/IceInternal/Emitter.java
@@ -0,0 +1,466 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+package IceInternal;
+
+public final class Emitter extends EventHandler
+{
+ public void
+ destroy()
+ {
+ _mutex.lock();
+ try
+ {
+ setState(StateClosed, new Ice.CommunicatorDestroyedException());
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public boolean
+ destroyed()
+ {
+ _mutex.lock();
+ try
+ {
+ return _state >= StateClosing;
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public void
+ prepareRequest(Outgoing out)
+ {
+ BasicStream os = out.os();
+ os.writeByte(Protocol.protocolVersion);
+ os.writeByte(Protocol.encodingVersion);
+ os.writeByte(Protocol.requestMsg);
+ os.writeInt(0); // Message size (placeholder)
+ os.writeInt(0); // Request ID (placeholder)
+ }
+
+ public void
+ sendRequest(Outgoing out, boolean oneway)
+ {
+ _mutex.lock();
+ try
+ {
+ if (_exception != null)
+ {
+ throw _exception;
+ }
+ assert(_state == StateActive);
+
+ int requestId = 0;
+
+ try
+ {
+ BasicStream os = out.os();
+ os.pos(3);
+
+ //
+ // Fill in the message size and request ID
+ //
+ os.writeInt(os.size());
+ if (!_endpoint.oneway() && !oneway)
+ {
+ requestId = _nextRequestId++;
+ if (requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+ os.writeInt(requestId);
+ }
+ TraceUtil.traceRequest("sending request", os, _logger,
+ _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
+ catch (Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ throw ex;
+ }
+
+ //
+ // Only add to the request map if there was no exception, and if
+ // the operation is not oneway.
+ //
+ if (!_endpoint.oneway() && !oneway)
+ {
+ _requests.put(new Integer(requestId), out);
+ }
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public void
+ prepareBatchRequest(Outgoing out)
+ {
+ lock();
+
+ if (_exception != null)
+ {
+ unlock();
+ throw _exception;
+ }
+ assert(_state == StateActive);
+
+ //
+ // The Emitter now belongs to `out', until finishBatchRequest() is
+ // called.
+ //
+
+ if (_batchStream.size() == 0)
+ {
+ _batchStream.writeByte(Protocol.protocolVersion);
+ _batchStream.writeByte(Protocol.encodingVersion);
+ _batchStream.writeByte(Protocol.requestMsg);
+ _batchStream.writeInt(0); // Message size (placeholder)
+ }
+
+ _batchStream.startWriteEncaps();
+
+ //
+ // Give the batch stream to `out', until finishBatchRequest() is
+ // called.
+ //
+ _batchStream.swap(out.os());
+ }
+
+ public void
+ finishBatchRequest(Outgoing out)
+ {
+ if (_exception != null)
+ {
+ unlock();
+ throw _exception;
+ }
+ assert(_state == StateActive);
+
+ _batchStream.swap(out.os()); // Get the batch stream back
+ unlock(); // Give the Emitter back
+
+ _batchStream.endWriteEncaps();
+ }
+
+ public void
+ abortBatchRequest()
+ {
+ state(StateClosed, new Ice.AbortBatchRequestException());
+ unlock();
+ }
+
+ public void
+ flushBatchRequest()
+ {
+ _mutex.lock();
+ try
+ {
+ if (_exception != null)
+ {
+ throw _exception;
+ }
+ assert(_state == StateActive);
+
+ try
+ {
+ if (_batch.size() == 0)
+ {
+ return; // Nothing to send
+ }
+
+ _batchStream.pos(3);
+
+ //
+ // Fill in the message size
+ //
+ _batchStream.writeInt(_batchStream.size());
+ TraceUtil.traceBatchRequest("sending batch request",
+ _batchStream, _logger,
+ _traceLevels);
+ _transceiver.write(_batchStream, _endpoint.timeout());
+
+ //
+ // Reset _batchStream so that new batch messages can be sent.
+ //
+ _batchStream = new BasicStream(_instance);
+ }
+ catch (Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ throw ex;
+ }
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public int
+ timeout()
+ {
+ return _endpoint.timeout();
+ }
+
+ //
+ // Operations from EventHandler
+ //
+ public boolean
+ server()
+ {
+ return true;
+ }
+
+ public boolean
+ readable()
+ {
+ return true;
+ }
+
+ public void
+ read(BasicStream is)
+ {
+ _transceiver.read(stream, 0);
+ }
+
+ public void
+ message(BasicStream stream)
+ {
+ _mutex.lock();
+ try
+ {
+ _threadPool.promoteFollower();
+
+ if (_state != StateActive)
+ {
+ return;
+ }
+
+ try
+ {
+ assert(stream.pos() == stream.size());
+ stream.pos(2);
+ byte messageType = stream.readByte();
+ stream.pos(Protocol.headerSize);
+
+ switch (messageType)
+ {
+ case Protocol.requestMsg:
+ {
+ TraceUtil.traceRequest("received request on " +
+ "the client side\n(invalid, " +
+ "closing connection)",
+ stream, _logger, _traceLevels);
+ throw new Ice.InvalidMessageException();
+ }
+
+ case Protocol.requestBatchMsg:
+ {
+ TraceUtil.traceRequest("received batch request on " +
+ "the client side\n(invalid, " +
+ "closing connection)",
+ stream, _logger, _traceLevels);
+ throw new Ice.InvalidMessageException();
+ }
+
+ case Protocol.replyMsg:
+ {
+ TraceUtil.traceReply("received reply", stream,
+ _logger, _traceLevels);
+ int requestId = stream.readInt();
+ Outgoing out =
+ (Outgoing)_requests.remove(new Integer(requestId));
+ if (out == null)
+ {
+ throw new Ice.UnknownRequestIdException();
+ }
+ out.finished(stream);
+ break;
+ }
+
+ case Protocol.closeConnectionMsg:
+ {
+ TraceUtil.traceHeader("received close connection",
+ stream, _logger, _traceLevels);
+ throw new Ice.CloseConnectionException();
+ }
+
+ default:
+ {
+ TraceUtil.traceHeader("received unknown message\n" +
+ "(invalid, closing connection)",
+ stream, _logger, _traceLevels);
+ throw new Ice.UnknownMessageException();
+ }
+ }
+ }
+ catch (Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ return;
+ }
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public void
+ exception(Ice.LocalException ex)
+ {
+ _mutex.lock();
+ try
+ {
+ setState(StateClosed, ex);
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public void
+ finished()
+ {
+ _mutex.lock();
+ try
+ {
+ _transceiver.close();
+ }
+ finally
+ {
+ _mutex.unlock();
+ }
+ }
+
+ public boolean
+ tryDestroy()
+ {
+ boolean isLocked = _mutex.trylock();
+ if (!isLocked)
+ {
+ return false;
+ }
+
+ _threadPool.promoteFollower();
+
+ try
+ {
+ setState(StateClosed, new Ice.CommunicatorDestroyedException());
+ return true;
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
+ Emitter(Instance instance, Transceiver transceiver, Endpoint endpoint)
+ {
+ super(instance);
+ _transceiver = transceiver;
+ _endpoint = endpoint;
+ _nextRequestId = 1;
+ _batchStream = new BasicStream(instance);
+ _state = StateActive;
+
+ _traceLevels = _instance.traceLevels();
+ _logger = _instance.logger();
+
+ if (!_endpoint.oneway())
+ {
+ _threadPool = _instance.threadPool();
+ _threadPool._register(_transceiver.fd(), this);
+ }
+ }
+
+ protected void
+ finalize()
+ throws Throwable
+ {
+ assert(_state == StateClosed);
+
+ super.finalize();
+ }
+
+ private static final int StateActive = 0;
+ private static final int StateHolding = 1;
+ private static final int StateClosing = 2;
+ private static final int StateClosed = 3;
+
+ private void
+ setState(int state, Ice.LocalException ex)
+ {
+ if (_state == state) // Don't switch twice
+ {
+ return;
+ }
+
+ switch (state)
+ {
+ case StateActive:
+ {
+ return; // Can't switch back to holding state
+ }
+
+ case StateClosed:
+ {
+ if (_threadPool != null)
+ {
+ _threadPool.unregister(_transceiver.fd());
+ }
+ else
+ {
+ _transceiver.close();
+ }
+ break;
+ }
+ }
+
+ if (_exception == null)
+ {
+ _exception = ex;
+ }
+
+ java.util.Set entries = _requests.entrySet();
+ java.util.Iterator p = entries.iterator();
+ while (p.hasNext())
+ {
+ Outgoing out = (Outgoing)p.next();
+ out.finished(_exception);
+ }
+ _requests.clear();
+
+ _state = state;
+ }
+
+ private Transceiver _transceiver;
+ private Endpoint _endpoint;
+ private TraceLevels _traceLevels;
+ private Ice.Logger _logger;
+ private ThreadPool _threadPool;
+ private int _nextRequestId;
+ private java.util.HashMap _requests = new java.util.HashMap();
+ private Ice.LocalException _exception;
+ private BasicStream _batchStream;
+ private int _state;
+ private RecursiveMutex _mutex = new RecursiveMutex();
+}
diff --git a/java/src/IceInternal/EmitterFactory.java b/java/src/IceInternal/EmitterFactory.java
new file mode 100644
index 00000000000..eb445502ba6
--- /dev/null
+++ b/java/src/IceInternal/EmitterFactory.java
@@ -0,0 +1,149 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+package IceInternal;
+
+public final class EmitterFactory
+{
+ public synchronized Emitter
+ create(Endpoint[] endpoints)
+ {
+ if (_instance == null)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
+ assert(endpoints.length > 0);
+
+ //
+ // First reap destroyed emitters
+ //
+ java.util.Set entries = _emitters.entrySet();
+ java.util.Iterator p = entries.iterator();
+ while (p.hasNext())
+ {
+ Emitter emitter = (Emitter)p.next();
+ if (emitter.destroyed())
+ {
+ p.remove();
+ }
+ }
+
+ for (int i = 0; i < endpoints.length; i++)
+ {
+ Emitter emitter = (Emitter)_emitters.get(endpoints[i]);
+ if (emitter != null)
+ {
+ return emitter;
+ }
+ }
+
+ //
+ // No emitters exist, try to create one
+ //
+ TraceLevels traceLevels = _instance.traceLevels();
+ Ice.Logger logger = _instance.logger();
+
+ Emitter emitter = null;
+ Ice.LocalException exception = null;
+ for (int i = 0; i < endpoints.length; i++)
+ {
+ try
+ {
+ Transceiver transceiver =
+ endpoints[i].clientTransceiver(_instance);
+ if (transceiver == null)
+ {
+ Connector connector = endpoints[i].connector(_instance);
+ assert(connector != null);
+ transceiver = connector.connect(endpoints[i].timeout);
+ assert(transceiver != null);
+ }
+ emitter = new Emitter(_instance, transceiver, endpoints[i]);
+ _emitters.put(endpoints[i], emitter);
+ break;
+ }
+ catch (Ice.SocketException ex)
+ {
+ exception = ex;
+ }
+ catch (Ice.DNSException ex)
+ {
+ exception = ex;
+ }
+ catch (Ice.TimeoutException ex)
+ {
+ exception = ex;
+ }
+
+ if (traceLevels.retry >= 2)
+ {
+ StringBuffer s = new StringBuffer();
+ s.append("connection to endpoint failed");
+ if (i < endpoints.length - 1)
+ {
+ s.append(", trying next endpoint\n");
+ }
+ else
+ {
+ s.append(" and no more endpoints to try\n");
+ }
+ s.append(exception.toString());
+ logger.trace(traceLevels.retryCat, s.toString());
+ }
+ }
+
+ if (emitter == null)
+ {
+ assert(exception != null);
+ throw exception;
+ }
+
+ return emitter;
+ }
+
+ //
+ // For use by Instance
+ //
+ EmitterFactory(Instance instance)
+ {
+ _instance = instance;
+ }
+
+ protected void
+ finalize()
+ throws Throwable
+ {
+ assert(_instance == null);
+
+ super.finalize();
+ }
+
+ synchronized void
+ destroy()
+ {
+ if (_instance == null)
+ {
+ return;
+ }
+
+ java.util.Set entries = _emitters.entrySet();
+ java.util.Iterator p = entries.iterator();
+ while (p.hasNext())
+ {
+ Emitter emitter = (Emitter)p.next();
+ emitter.destroy();
+ }
+ _emitters.clear();
+ }
+
+ private Instance _instance;
+ private java.util.HashMap _emitters = new java.util.HashMap();
+}