diff options
author | Mark Spruiell <mes@zeroc.com> | 2001-11-26 22:56:03 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2001-11-26 22:56:03 +0000 |
commit | 2c65289df827070be8a9f2f8e106edf8022134e4 (patch) | |
tree | 62cc5003d314e72545b4665b330c3eef300e9054 /java/src | |
parent | minor fix (diff) | |
download | ice-2c65289df827070be8a9f2f8e106edf8022134e4.tar.bz2 ice-2c65289df827070be8a9f2f8e106edf8022134e4.tar.xz ice-2c65289df827070be8a9f2f8e106edf8022134e4.zip |
initial emitter impl
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 18 | ||||
-rw-r--r-- | java/src/IceInternal/Emitter.java | 466 | ||||
-rw-r--r-- | java/src/IceInternal/EmitterFactory.java | 149 |
3 files changed, 633 insertions, 0 deletions
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index 8086ef10b20..62b0ff3e899 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -986,6 +986,24 @@ public class BasicStream throw new Ice.NoUserExceptionFactoryException(); } + int + pos() + { + return _buf.pos; + } + + void + pos(int p) + { + _buf.pos = p; + } + + int + size() + { + return _buf.len; + } + private IceInternal.Instance _instance; private Buffer _buf = new Buffer(); diff --git a/java/src/IceInternal/Emitter.java b/java/src/IceInternal/Emitter.java new file mode 100644 index 00000000000..a83c57a7daf --- /dev/null +++ b/java/src/IceInternal/Emitter.java @@ -0,0 +1,466 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package IceInternal; + +public final class Emitter extends EventHandler +{ + public void + destroy() + { + _mutex.lock(); + try + { + setState(StateClosed, new Ice.CommunicatorDestroyedException()); + } + finally + { + _mutex.unlock(); + } + } + + public boolean + destroyed() + { + _mutex.lock(); + try + { + return _state >= StateClosing; + } + finally + { + _mutex.unlock(); + } + } + + public void + prepareRequest(Outgoing out) + { + BasicStream os = out.os(); + os.writeByte(Protocol.protocolVersion); + os.writeByte(Protocol.encodingVersion); + os.writeByte(Protocol.requestMsg); + os.writeInt(0); // Message size (placeholder) + os.writeInt(0); // Request ID (placeholder) + } + + public void + sendRequest(Outgoing out, boolean oneway) + { + _mutex.lock(); + try + { + if (_exception != null) + { + throw _exception; + } + assert(_state == StateActive); + + int requestId = 0; + + try + { + BasicStream os = out.os(); + os.pos(3); + + // + // Fill in the message size and request ID + // + os.writeInt(os.size()); + if (!_endpoint.oneway() && !oneway) + { + requestId = _nextRequestId++; + if (requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + os.writeInt(requestId); + } + TraceUtil.traceRequest("sending request", os, _logger, + _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } + catch (Ice.LocalException ex) + { + setState(StateClosed, ex); + throw ex; + } + + // + // Only add to the request map if there was no exception, and if + // the operation is not oneway. + // + if (!_endpoint.oneway() && !oneway) + { + _requests.put(new Integer(requestId), out); + } + } + finally + { + _mutex.unlock(); + } + } + + public void + prepareBatchRequest(Outgoing out) + { + lock(); + + if (_exception != null) + { + unlock(); + throw _exception; + } + assert(_state == StateActive); + + // + // The Emitter now belongs to `out', until finishBatchRequest() is + // called. + // + + if (_batchStream.size() == 0) + { + _batchStream.writeByte(Protocol.protocolVersion); + _batchStream.writeByte(Protocol.encodingVersion); + _batchStream.writeByte(Protocol.requestMsg); + _batchStream.writeInt(0); // Message size (placeholder) + } + + _batchStream.startWriteEncaps(); + + // + // Give the batch stream to `out', until finishBatchRequest() is + // called. + // + _batchStream.swap(out.os()); + } + + public void + finishBatchRequest(Outgoing out) + { + if (_exception != null) + { + unlock(); + throw _exception; + } + assert(_state == StateActive); + + _batchStream.swap(out.os()); // Get the batch stream back + unlock(); // Give the Emitter back + + _batchStream.endWriteEncaps(); + } + + public void + abortBatchRequest() + { + state(StateClosed, new Ice.AbortBatchRequestException()); + unlock(); + } + + public void + flushBatchRequest() + { + _mutex.lock(); + try + { + if (_exception != null) + { + throw _exception; + } + assert(_state == StateActive); + + try + { + if (_batch.size() == 0) + { + return; // Nothing to send + } + + _batchStream.pos(3); + + // + // Fill in the message size + // + _batchStream.writeInt(_batchStream.size()); + TraceUtil.traceBatchRequest("sending batch request", + _batchStream, _logger, + _traceLevels); + _transceiver.write(_batchStream, _endpoint.timeout()); + + // + // Reset _batchStream so that new batch messages can be sent. + // + _batchStream = new BasicStream(_instance); + } + catch (Ice.LocalException ex) + { + setState(StateClosed, ex); + throw ex; + } + } + finally + { + _mutex.unlock(); + } + } + + public int + timeout() + { + return _endpoint.timeout(); + } + + // + // Operations from EventHandler + // + public boolean + server() + { + return true; + } + + public boolean + readable() + { + return true; + } + + public void + read(BasicStream is) + { + _transceiver.read(stream, 0); + } + + public void + message(BasicStream stream) + { + _mutex.lock(); + try + { + _threadPool.promoteFollower(); + + if (_state != StateActive) + { + return; + } + + try + { + assert(stream.pos() == stream.size()); + stream.pos(2); + byte messageType = stream.readByte(); + stream.pos(Protocol.headerSize); + + switch (messageType) + { + case Protocol.requestMsg: + { + TraceUtil.traceRequest("received request on " + + "the client side\n(invalid, " + + "closing connection)", + stream, _logger, _traceLevels); + throw new Ice.InvalidMessageException(); + } + + case Protocol.requestBatchMsg: + { + TraceUtil.traceRequest("received batch request on " + + "the client side\n(invalid, " + + "closing connection)", + stream, _logger, _traceLevels); + throw new Ice.InvalidMessageException(); + } + + case Protocol.replyMsg: + { + TraceUtil.traceReply("received reply", stream, + _logger, _traceLevels); + int requestId = stream.readInt(); + Outgoing out = + (Outgoing)_requests.remove(new Integer(requestId)); + if (out == null) + { + throw new Ice.UnknownRequestIdException(); + } + out.finished(stream); + break; + } + + case Protocol.closeConnectionMsg: + { + TraceUtil.traceHeader("received close connection", + stream, _logger, _traceLevels); + throw new Ice.CloseConnectionException(); + } + + default: + { + TraceUtil.traceHeader("received unknown message\n" + + "(invalid, closing connection)", + stream, _logger, _traceLevels); + throw new Ice.UnknownMessageException(); + } + } + } + catch (Ice.LocalException ex) + { + setState(StateClosed, ex); + return; + } + } + finally + { + _mutex.unlock(); + } + } + + public void + exception(Ice.LocalException ex) + { + _mutex.lock(); + try + { + setState(StateClosed, ex); + } + finally + { + _mutex.unlock(); + } + } + + public void + finished() + { + _mutex.lock(); + try + { + _transceiver.close(); + } + finally + { + _mutex.unlock(); + } + } + + public boolean + tryDestroy() + { + boolean isLocked = _mutex.trylock(); + if (!isLocked) + { + return false; + } + + _threadPool.promoteFollower(); + + try + { + setState(StateClosed, new Ice.CommunicatorDestroyedException()); + return true; + } + finally + { + unlock(); + } + } + + Emitter(Instance instance, Transceiver transceiver, Endpoint endpoint) + { + super(instance); + _transceiver = transceiver; + _endpoint = endpoint; + _nextRequestId = 1; + _batchStream = new BasicStream(instance); + _state = StateActive; + + _traceLevels = _instance.traceLevels(); + _logger = _instance.logger(); + + if (!_endpoint.oneway()) + { + _threadPool = _instance.threadPool(); + _threadPool._register(_transceiver.fd(), this); + } + } + + protected void + finalize() + throws Throwable + { + assert(_state == StateClosed); + + super.finalize(); + } + + private static final int StateActive = 0; + private static final int StateHolding = 1; + private static final int StateClosing = 2; + private static final int StateClosed = 3; + + private void + setState(int state, Ice.LocalException ex) + { + if (_state == state) // Don't switch twice + { + return; + } + + switch (state) + { + case StateActive: + { + return; // Can't switch back to holding state + } + + case StateClosed: + { + if (_threadPool != null) + { + _threadPool.unregister(_transceiver.fd()); + } + else + { + _transceiver.close(); + } + break; + } + } + + if (_exception == null) + { + _exception = ex; + } + + java.util.Set entries = _requests.entrySet(); + java.util.Iterator p = entries.iterator(); + while (p.hasNext()) + { + Outgoing out = (Outgoing)p.next(); + out.finished(_exception); + } + _requests.clear(); + + _state = state; + } + + private Transceiver _transceiver; + private Endpoint _endpoint; + private TraceLevels _traceLevels; + private Ice.Logger _logger; + private ThreadPool _threadPool; + private int _nextRequestId; + private java.util.HashMap _requests = new java.util.HashMap(); + private Ice.LocalException _exception; + private BasicStream _batchStream; + private int _state; + private RecursiveMutex _mutex = new RecursiveMutex(); +} diff --git a/java/src/IceInternal/EmitterFactory.java b/java/src/IceInternal/EmitterFactory.java new file mode 100644 index 00000000000..eb445502ba6 --- /dev/null +++ b/java/src/IceInternal/EmitterFactory.java @@ -0,0 +1,149 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package IceInternal; + +public final class EmitterFactory +{ + public synchronized Emitter + create(Endpoint[] endpoints) + { + if (_instance == null) + { + throw new Ice.CommunicatorDestroyedException(); + } + + assert(endpoints.length > 0); + + // + // First reap destroyed emitters + // + java.util.Set entries = _emitters.entrySet(); + java.util.Iterator p = entries.iterator(); + while (p.hasNext()) + { + Emitter emitter = (Emitter)p.next(); + if (emitter.destroyed()) + { + p.remove(); + } + } + + for (int i = 0; i < endpoints.length; i++) + { + Emitter emitter = (Emitter)_emitters.get(endpoints[i]); + if (emitter != null) + { + return emitter; + } + } + + // + // No emitters exist, try to create one + // + TraceLevels traceLevels = _instance.traceLevels(); + Ice.Logger logger = _instance.logger(); + + Emitter emitter = null; + Ice.LocalException exception = null; + for (int i = 0; i < endpoints.length; i++) + { + try + { + Transceiver transceiver = + endpoints[i].clientTransceiver(_instance); + if (transceiver == null) + { + Connector connector = endpoints[i].connector(_instance); + assert(connector != null); + transceiver = connector.connect(endpoints[i].timeout); + assert(transceiver != null); + } + emitter = new Emitter(_instance, transceiver, endpoints[i]); + _emitters.put(endpoints[i], emitter); + break; + } + catch (Ice.SocketException ex) + { + exception = ex; + } + catch (Ice.DNSException ex) + { + exception = ex; + } + catch (Ice.TimeoutException ex) + { + exception = ex; + } + + if (traceLevels.retry >= 2) + { + StringBuffer s = new StringBuffer(); + s.append("connection to endpoint failed"); + if (i < endpoints.length - 1) + { + s.append(", trying next endpoint\n"); + } + else + { + s.append(" and no more endpoints to try\n"); + } + s.append(exception.toString()); + logger.trace(traceLevels.retryCat, s.toString()); + } + } + + if (emitter == null) + { + assert(exception != null); + throw exception; + } + + return emitter; + } + + // + // For use by Instance + // + EmitterFactory(Instance instance) + { + _instance = instance; + } + + protected void + finalize() + throws Throwable + { + assert(_instance == null); + + super.finalize(); + } + + synchronized void + destroy() + { + if (_instance == null) + { + return; + } + + java.util.Set entries = _emitters.entrySet(); + java.util.Iterator p = entries.iterator(); + while (p.hasNext()) + { + Emitter emitter = (Emitter)p.next(); + emitter.destroy(); + } + _emitters.clear(); + } + + private Instance _instance; + private java.util.HashMap _emitters = new java.util.HashMap(); +} |