diff options
Diffstat (limited to 'java/src/IceInternal/IncomingConnectionFactory.java')
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 1014 |
1 files changed, 507 insertions, 507 deletions
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 4c57ff4c51a..68fa8177fb3 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -32,110 +32,110 @@ public final class IncomingConnectionFactory extends EventHandler public void waitUntilHolding() { - java.util.LinkedList connections; - - synchronized(this) - { - // - // First we wait until the connection factory itself is in holding - // state. - // - while(_state < StateHolding) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - // - // We want to wait until all connections are in holding state - // outside the thread synchronization. - // - connections = (java.util.LinkedList)_connections.clone(); - } - - // - // Now we wait until each connection is in holding state. - // - java.util.ListIterator p = connections.listIterator(); - while(p.hasNext()) - { - Ice.ConnectionI connection = (Ice.ConnectionI)p.next(); - connection.waitUntilHolding(); - } + java.util.LinkedList connections; + + synchronized(this) + { + // + // First we wait until the connection factory itself is in holding + // state. + // + while(_state < StateHolding) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + // + // We want to wait until all connections are in holding state + // outside the thread synchronization. + // + connections = (java.util.LinkedList)_connections.clone(); + } + + // + // Now we wait until each connection is in holding state. + // + java.util.ListIterator p = connections.listIterator(); + while(p.hasNext()) + { + Ice.ConnectionI connection = (Ice.ConnectionI)p.next(); + connection.waitUntilHolding(); + } } public void waitUntilFinished() { - Thread threadPerIncomingConnectionFactory = null; - java.util.LinkedList connections; - - synchronized(this) - { - // - // First we wait until the factory is destroyed. If we are using - // an acceptor, we also wait for it to be closed. - // - while(_state != StateClosed || _acceptor != null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; - _threadPerIncomingConnectionFactory = null; + Thread threadPerIncomingConnectionFactory = null; + java.util.LinkedList connections; + + synchronized(this) + { + // + // First we wait until the factory is destroyed. If we are using + // an acceptor, we also wait for it to be closed. + // + while(_state != StateClosed || _acceptor != null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; + _threadPerIncomingConnectionFactory = null; // // Clear the OA. See bug 1673 for the details of why this is necessary. // _adapter = null; - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - // For consistency with C#, we set _connections to null rather than to a - // new empty list so that our finalizer does not try to invoke any - // methods on member objects. - // - connections = _connections; - _connections = null; - } - - if(threadPerIncomingConnectionFactory != null) - { - while(true) - { - try - { - threadPerIncomingConnectionFactory.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } - - if(connections != null) - { - java.util.ListIterator p = connections.listIterator(); - while(p.hasNext()) - { - Ice.ConnectionI connection = (Ice.ConnectionI)p.next(); - connection.waitUntilFinished(); - } - } + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + // For consistency with C#, we set _connections to null rather than to a + // new empty list so that our finalizer does not try to invoke any + // methods on member objects. + // + connections = _connections; + _connections = null; + } + + if(threadPerIncomingConnectionFactory != null) + { + while(true) + { + try + { + threadPerIncomingConnectionFactory.join(); + break; + } + catch(InterruptedException ex) + { + } + } + } + + if(connections != null) + { + java.util.ListIterator p = connections.listIterator(); + while(p.hasNext()) + { + Ice.ConnectionI connection = (Ice.ConnectionI)p.next(); + connection.waitUntilFinished(); + } + } } public EndpointI @@ -160,11 +160,11 @@ public final class IncomingConnectionFactory extends EventHandler public synchronized Ice.ConnectionI[] connections() { - java.util.LinkedList connections = new java.util.LinkedList(); + java.util.LinkedList connections = new java.util.LinkedList(); - // - // Only copy connections which have not been destroyed. - // + // + // Only copy connections which have not been destroyed. + // java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) { @@ -184,17 +184,17 @@ public final class IncomingConnectionFactory extends EventHandler flushBatchRequests() { Ice.ConnectionI[] c = connections(); // connections() is synchronized, so no need to synchronize here. - for(int i = 0; i < c.length; i++) - { - try - { - c[i].flushBatchRequests(); - } - catch(Ice.LocalException ex) - { - // Ignore. - } - } + for(int i = 0; i < c.length; i++) + { + try + { + c[i].flushBatchRequests(); + } + catch(Ice.LocalException ex) + { + // Ignore. + } + } } // @@ -204,141 +204,141 @@ public final class IncomingConnectionFactory extends EventHandler public boolean datagram() { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint.datagram(); } public boolean readable() { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return false; } public boolean read(BasicStream unused) { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(false); // Must not be called. - return false; + return false; } public void message(BasicStream unused, ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - - Ice.ConnectionI connection = null; - - synchronized(this) - { - try - { - if(_state != StateActive) - { - Thread.yield(); - return; - } - - // - // Reap connections for which destruction has completed. - // - java.util.ListIterator p = _connections.listIterator(); - while(p.hasNext()) - { - Ice.ConnectionI con = (Ice.ConnectionI)p.next(); - if(con.isFinished()) - { - p.remove(); - } - } - - // - // Now accept a new connection. - // - Transceiver transceiver; - try - { - transceiver = _acceptor.accept(0); - } - catch(Ice.TimeoutException ex) - { - // Ignore timeouts. - return; - } - catch(Ice.LocalException ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - warning(ex); - } - return; - } - - assert(transceiver != null); - - try - { - connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection); - } - catch(Ice.LocalException ex) - { - return; - } - - _connections.add(connection); - } - finally - { - // - // This makes sure that we promote a follower before - // we leave the scope of the mutex above, but after we - // call accept() (if we call it). - // - threadPool.promoteFollower(); - } - } - - assert(connection != null); - - // - // We validate and activate outside the thread - // synchronization, to not block the factory. - // - try - { - connection.validate(); - } + assert(!_threadPerConnection); // Only for use with a thread pool. + + Ice.ConnectionI connection = null; + + synchronized(this) + { + try + { + if(_state != StateActive) + { + Thread.yield(); + return; + } + + // + // Reap connections for which destruction has completed. + // + java.util.ListIterator p = _connections.listIterator(); + while(p.hasNext()) + { + Ice.ConnectionI con = (Ice.ConnectionI)p.next(); + if(con.isFinished()) + { + p.remove(); + } + } + + // + // Now accept a new connection. + // + Transceiver transceiver; + try + { + transceiver = _acceptor.accept(0); + } + catch(Ice.TimeoutException ex) + { + // Ignore timeouts. + return; + } + catch(Ice.LocalException ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + warning(ex); + } + return; + } + + assert(transceiver != null); + + try + { + connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection); + } + catch(Ice.LocalException ex) + { + return; + } + + _connections.add(connection); + } + finally + { + // + // This makes sure that we promote a follower before + // we leave the scope of the mutex above, but after we + // call accept() (if we call it). + // + threadPool.promoteFollower(); + } + } + + assert(connection != null); + + // + // We validate and activate outside the thread + // synchronization, to not block the factory. + // + try + { + connection.validate(); + } catch(Ice.LocalException ex) - { - synchronized(this) - { - connection.waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - _connections.remove(connection); - return; - } - } - - connection.activate(); + { + synchronized(this) + { + connection.waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + _connections.remove(connection); + return; + } + } + + connection.activate(); } public synchronized void finished(ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. threadPool.promoteFollower(); assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool()); - --_finishedCount; + --_finishedCount; - if(_finishedCount == 0 && _state == StateClosed) - { - _acceptor.close(); - _acceptor = null; - notifyAll(); - } + if(_finishedCount == 0 && _state == StateClosed) + { + _acceptor.close(); + _acceptor = null; + notifyAll(); + } } public void @@ -352,141 +352,141 @@ public final class IncomingConnectionFactory extends EventHandler { if(_transceiver != null) { - return _transceiver.toString(); + return _transceiver.toString(); } - assert(_acceptor != null); - return _acceptor.toString(); + assert(_acceptor != null); + return _acceptor.toString(); } public IncomingConnectionFactory(Instance instance, EndpointI endpoint, Ice.ObjectAdapter adapter, - String adapterName) + String adapterName) { super(instance); _endpoint = endpoint; _adapter = adapter; - _registeredWithPool = false; - _finishedCount = 0; - _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; + _registeredWithPool = false; + _finishedCount = 0; + _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; _state = StateHolding; - DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideTimeout) - { - _endpoint = _endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); - } + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideTimeout) + { + _endpoint = _endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); + } - if(defaultsAndOverrides.overrideCompress) - { - _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompressValue); - } + if(defaultsAndOverrides.overrideCompress) + { + _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompressValue); + } Ice.ObjectAdapterI adapterImpl = (Ice.ObjectAdapterI)_adapter; _threadPerConnection = adapterImpl.getThreadPerConnection(); - EndpointIHolder h = new EndpointIHolder(); - h.value = _endpoint; - _transceiver = _endpoint.serverTransceiver(h); - - try - { - if(_transceiver != null) - { - _endpoint = h.value; - - Ice.ConnectionI connection = null; - - try - { - connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter, + EndpointIHolder h = new EndpointIHolder(); + h.value = _endpoint; + _transceiver = _endpoint.serverTransceiver(h); + + try + { + if(_transceiver != null) + { + _endpoint = h.value; + + Ice.ConnectionI connection = null; + + try + { + connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection); - connection.validate(); - } - catch(Ice.LocalException ex) - { - // - // If a connection object was constructed, then - // validate() must have raised the exception. - // - if(connection != null) - { - connection.waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - } - - return; - } - - _connections.add(connection); - } - else - { - h.value = _endpoint; - _acceptor = _endpoint.acceptor(h, adapterName); - _endpoint = h.value; - assert(_acceptor != null); - _acceptor.listen(); - - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we also use - // one thread per incoming connection factory, that - // accepts new connections on this endpoint. - // - try - { - _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(); - _threadPerIncomingConnectionFactory.start(); - } - catch(java.lang.Exception ex) - { - error("cannot create thread for incoming connection factory", ex); - throw ex; - } - } - } - } - catch(java.lang.Exception ex) - { - // - // Clean up for finalizer. - // - - if(_acceptor != null) - { - try - { - _acceptor.close(); - } - catch(Ice.LocalException e) - { - // Here we ignore any exceptions in close(). - } - } - - synchronized(this) - { - _state = StateClosed; - _acceptor = null; - _connections = null; - _threadPerIncomingConnectionFactory = null; - } - - Ice.SyscallException e = new Ice.SyscallException(); - e.initCause(ex); - throw e; - } + connection.validate(); + } + catch(Ice.LocalException ex) + { + // + // If a connection object was constructed, then + // validate() must have raised the exception. + // + if(connection != null) + { + connection.waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + } + + return; + } + + _connections.add(connection); + } + else + { + h.value = _endpoint; + _acceptor = _endpoint.acceptor(h, adapterName); + _endpoint = h.value; + assert(_acceptor != null); + _acceptor.listen(); + + if(_threadPerConnection) + { + // + // If we are in thread per connection mode, we also use + // one thread per incoming connection factory, that + // accepts new connections on this endpoint. + // + try + { + _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(); + _threadPerIncomingConnectionFactory.start(); + } + catch(java.lang.Exception ex) + { + error("cannot create thread for incoming connection factory", ex); + throw ex; + } + } + } + } + catch(java.lang.Exception ex) + { + // + // Clean up for finalizer. + // + + if(_acceptor != null) + { + try + { + _acceptor.close(); + } + catch(Ice.LocalException e) + { + // Here we ignore any exceptions in close(). + } + } + + synchronized(this) + { + _state = StateClosed; + _acceptor = null; + _connections = null; + _threadPerIncomingConnectionFactory = null; + } + + Ice.SyscallException e = new Ice.SyscallException(); + e.initCause(ex); + throw e; + } } protected synchronized void finalize() throws Throwable { - IceUtil.Assert.FinalizerAssert(_state == StateClosed); - IceUtil.Assert.FinalizerAssert(_acceptor == null); - IceUtil.Assert.FinalizerAssert(_connections == null); - IceUtil.Assert.FinalizerAssert(_threadPerIncomingConnectionFactory == null); + IceUtil.Assert.FinalizerAssert(_state == StateClosed); + IceUtil.Assert.FinalizerAssert(_acceptor == null); + IceUtil.Assert.FinalizerAssert(_connections == null); + IceUtil.Assert.FinalizerAssert(_threadPerIncomingConnectionFactory == null); super.finalize(); } @@ -511,10 +511,10 @@ public final class IncomingConnectionFactory extends EventHandler { return; } - if(!_threadPerConnection && _acceptor != null) - { - registerWithPool(); - } + if(!_threadPerConnection && _acceptor != null) + { + registerWithPool(); + } java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) @@ -531,10 +531,10 @@ public final class IncomingConnectionFactory extends EventHandler { return; } - if(!_threadPerConnection && _acceptor != null) - { - unregisterWithPool(); - } + if(!_threadPerConnection && _acceptor != null) + { + unregisterWithPool(); + } java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) @@ -547,28 +547,28 @@ public final class IncomingConnectionFactory extends EventHandler case StateClosed: { - if(_acceptor != null) - { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we connect - // to our own acceptor, which unblocks our thread per - // incoming connection factory stuck in accept(). - // - _acceptor.connectToSelf(); - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); - } - } + if(_acceptor != null) + { + if(_threadPerConnection) + { + // + // If we are in thread per connection mode, we connect + // to our own acceptor, which unblocks our thread per + // incoming connection factory stuck in accept(). + // + _acceptor.connectToSelf(); + } + else + { + // + // Otherwise we first must make sure that we are + // registered, then we unregister, and let finished() + // do the close. + // + registerWithPool(); + unregisterWithPool(); + } + } java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) @@ -576,38 +576,38 @@ public final class IncomingConnectionFactory extends EventHandler Ice.ConnectionI connection = (Ice.ConnectionI)p.next(); connection.destroy(Ice.ConnectionI.ObjectAdapterDeactivated); } - break; + break; } } _state = state; - notifyAll(); + notifyAll(); } private void registerWithPool() { - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(_acceptor != null); + assert(!_threadPerConnection); // Only for use with a thread pool. + assert(_acceptor != null); if(!_registeredWithPool) - { - ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(_acceptor.fd(), this); - _registeredWithPool = true; + { + ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(_acceptor.fd(), this); + _registeredWithPool = true; } } private void unregisterWithPool() { - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(_acceptor != null); + assert(!_threadPerConnection); // Only for use with a thread pool. + assert(_acceptor != null); if(_registeredWithPool) - { - ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(_acceptor.fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. + { + ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(_acceptor.fd()); + _registeredWithPool = false; + ++_finishedCount; // For each unregistration, finished() is called once. } } @@ -625,155 +625,155 @@ public final class IncomingConnectionFactory extends EventHandler private void error(String msg, Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = msg + ":\n" + toString() + "\n" + sw.toString(); - _instance.initializationData().logger.error(s); + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = msg + ":\n" + toString() + "\n" + sw.toString(); + _instance.initializationData().logger.error(s); } private void run() { - assert(_acceptor != null); - - while(true) - { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - Transceiver transceiver = null; - try - { - transceiver = _acceptor.accept(-1); - } - catch(Ice.SocketException ex) - { - // Do not ignore SocketException in Java. - throw ex; - } - catch(Ice.TimeoutException ex) - { - // Ignore timeouts. - } - catch(Ice.LocalException ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - warning(ex); - } - } - - Ice.ConnectionI connection = null; - - synchronized(this) - { - while(_state == StateHolding) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state == StateClosed) - { - if(transceiver != null) - { - try - { - transceiver.close(); - } - catch(Ice.LocalException ex) - { - // Here we ignore any exceptions in close(). - } - } - - try - { - _acceptor.close(); - } - catch(Ice.LocalException ex) - { - _acceptor = null; - notifyAll(); - throw ex; - } - - _acceptor = null; - notifyAll(); - return; - } - - assert(_state == StateActive); - - // - // Reap connections for which destruction has completed. - // - java.util.ListIterator p = _connections.listIterator(); - while(p.hasNext()) - { - Ice.ConnectionI con = (Ice.ConnectionI)p.next(); - if(con.isFinished()) - { - p.remove(); - } - } - - // - // Create a connection object for the connection. - // - if(transceiver != null) - { - try - { - connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, + assert(_acceptor != null); + + while(true) + { + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + Transceiver transceiver = null; + try + { + transceiver = _acceptor.accept(-1); + } + catch(Ice.SocketException ex) + { + // Do not ignore SocketException in Java. + throw ex; + } + catch(Ice.TimeoutException ex) + { + // Ignore timeouts. + } + catch(Ice.LocalException ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + warning(ex); + } + } + + Ice.ConnectionI connection = null; + + synchronized(this) + { + while(_state == StateHolding) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + if(_state == StateClosed) + { + if(transceiver != null) + { + try + { + transceiver.close(); + } + catch(Ice.LocalException ex) + { + // Here we ignore any exceptions in close(). + } + } + + try + { + _acceptor.close(); + } + catch(Ice.LocalException ex) + { + _acceptor = null; + notifyAll(); + throw ex; + } + + _acceptor = null; + notifyAll(); + return; + } + + assert(_state == StateActive); + + // + // Reap connections for which destruction has completed. + // + java.util.ListIterator p = _connections.listIterator(); + while(p.hasNext()) + { + Ice.ConnectionI con = (Ice.ConnectionI)p.next(); + if(con.isFinished()) + { + p.remove(); + } + } + + // + // Create a connection object for the connection. + // + if(transceiver != null) + { + try + { + connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection); - } - catch(Ice.LocalException ex) - { - return; - } - - _connections.add(connection); - } - } - - // - // In thread per connection mode, the connection's thread - // will take care of connection validation and activation - // (for non-datagram connections). We don't want to block - // this thread waiting until validation is complete, - // because in contrast to thread pool mode, it is the only - // thread that can accept connections with this factory's - // acceptor. Therefore we don't call validate() and - // activate() from the connection factory in thread per - // connection mode. - // - } + } + catch(Ice.LocalException ex) + { + return; + } + + _connections.add(connection); + } + } + + // + // In thread per connection mode, the connection's thread + // will take care of connection validation and activation + // (for non-datagram connections). We don't want to block + // this thread waiting until validation is complete, + // because in contrast to thread pool mode, it is the only + // thread that can accept connections with this factory's + // acceptor. Therefore we don't call validate() and + // activate() from the connection factory in thread per + // connection mode. + // + } } private class ThreadPerIncomingConnectionFactory extends Thread { - public void - run() - { - try - { - IncomingConnectionFactory.this.run(); - } - catch(Exception ex) - { - IncomingConnectionFactory.this.error("exception in thread per incoming connection factory", ex); - } - } + public void + run() + { + try + { + IncomingConnectionFactory.this.run(); + } + catch(Exception ex) + { + IncomingConnectionFactory.this.error("exception in thread per incoming connection factory", ex); + } + } } private Thread _threadPerIncomingConnectionFactory; |