diff options
Diffstat (limited to 'java/src/IceInternal/Selector.java')
-rw-r--r-- | java/src/IceInternal/Selector.java | 316 |
1 files changed, 127 insertions, 189 deletions
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java index 2ab11238b66..b23fbacb1b1 100644 --- a/java/src/IceInternal/Selector.java +++ b/java/src/IceInternal/Selector.java @@ -15,10 +15,11 @@ public final class Selector { } - public - Selector(Instance instance) + public Selector(Instance instance) { _instance = instance; + _selecting = false; + _interrupted = false; try { @@ -36,8 +37,7 @@ public final class Selector _keys = _selector.selectedKeys(); } - public void - destroy() + public void destroy() { try { @@ -49,14 +49,12 @@ public final class Selector _selector = null; } - public void - initialize(EventHandler handler) + public void initialize(EventHandler handler) { updateImpl(handler); } - public void - update(EventHandler handler, int remove, int add) + public void update(EventHandler handler, int remove, int add) { int previous = handler._registered; handler._registered = handler._registered & ~remove; @@ -65,23 +63,11 @@ public final class Selector { return; } - updateImpl(handler); - if(handler.hasMoreData() && (handler._disabled & SocketOperation.Read) == 0) - { - if((add & SocketOperation.Read) != 0) - { - _pendingHandlers.add(handler); - } - if((remove & SocketOperation.Read) != 0) - { - _pendingHandlers.remove(handler); - } - } + updateImpl(handler); } - public void - enable(EventHandler handler, int status) + public void enable(EventHandler handler, int status) { if((handler._disabled & status) == 0) { @@ -92,180 +78,85 @@ public final class Selector if((handler._registered & status) != 0) { updateImpl(handler); - - if((status & SocketOperation.Read) != 0 && handler.hasMoreData()) - { - // Add back the pending handler if reads are enabled. - _pendingHandlers.add(handler); - } } } - public void - disable(EventHandler handler, int status) + public void disable(EventHandler handler, int status) { if((handler._disabled & status) != 0) { return; } handler._disabled = handler._disabled | status; - + if((handler._registered & status) != 0) { updateImpl(handler); - - if((status & SocketOperation.Read) != 0 && handler.hasMoreData()) - { - // Remove the pending handler if reads are disabled. - _pendingHandlers.remove(handler); - } } } - public void - finish(EventHandler handler) + public void finish(EventHandler handler) { - handler._registered = 0; - - if(handler._key != null) + if(handler._registered != 0) { - handler._key.cancel(); - handler._key = null; - } - - _changes.remove(handler); - _pendingHandlers.remove(handler); - } + if(handler._key != null) + { + handler._key.cancel(); + handler._key = null; + } - public void - startSelect() - { - assert(_changes.isEmpty()); + _changes.remove(handler); - // - // Don't set _selecting = true if there are pending handlers, select() won't block - // and will just call selectNow(). - // - if(_pendingHandlers.isEmpty()) - { - _selecting = true; + update(handler, handler._registered, SocketOperation.None); } } - public void - finishSelect(java.util.List<EventHandler> handlers, long timeout) + public void startSelect() { - _selecting = false; - handlers.clear(); - - if(!_changes.isEmpty()) - { - for(EventHandler h : _changes) - { - updateImpl(h); - } - _changes.clear(); - } - else if(_keys.isEmpty() && _pendingHandlers.isEmpty() && timeout <= 0) + if(_interrupted) { - // - // This is necessary to prevent a busy loop in case of a spurious wake-up which - // sometime occurs in the client thread pool when the communicator is destroyed. - // If there are too many successive spurious wake-ups, we log an error. - // - try - { - Thread.sleep(1); - } - catch(java.lang.InterruptedException ex) - { - } - - if(++_spuriousWakeUp > 100) - { - _spuriousWakeUp = 0; - _instance.initializationData().logger.warning("spurious selector wake up"); - } - return; - } - - _spuriousWakeUp = 0; + _interrupted = false; - for(java.nio.channels.SelectionKey key : _keys) - { - EventHandler handler = (EventHandler)key.attachment(); - try - { - // - // It's important to check for interestOps here because the event handler - // registration might have changed above when _changes was processed. We - // don't want to return event handlers which aren't interested anymore in - // a given operation. - // - handler._ready = fromJavaOps(key.readyOps() & key.interestOps()); - if(handler.hasMoreData() && _pendingHandlers.remove(handler)) - { - handler._ready |= SocketOperation.Read; - } - handlers.add(handler); - } - catch(java.nio.channels.CancelledKeyException ex) + if(!_changes.isEmpty()) { - assert(handler._registered == 0); + updateSelector(); } } - _keys.clear(); + _selecting = true; + } - for(EventHandler handler : _pendingHandlers) - { - if(handler.hasMoreData()) - { - handler._ready = SocketOperation.Read; - handlers.add(handler); - } - } - _pendingHandlers.clear(); + public void finishSelect() + { + _selecting = false; } - public void - select(long timeout) + public void select(java.util.List<EventHandlerOpPair> handlers, long timeout) throws TimeoutException { while(true) { try { - // - // Only block if _selecting = true, otherwise we call selectNow() to retrieve new - // ready handlers and process handlers from _pendingHandlers. - // - if(_selecting) + if(timeout > 0) { - if(timeout > 0) + // + // NOTE: On some platforms, select() sometime returns slightly before + // the timeout (at least according to our monotonic time). To make sure + // timeouts are correctly detected, we wait for a little longer than + // the configured timeout (10ms). + // + long before = IceInternal.Time.currentMonotonicTimeMillis(); + if(_selector.select(timeout * 1000 + 10) == 0) { - // - // NOTE: On some platforms, select() sometime returns slightly before - // the timeout (at least according to our monotonic time). To make sure - // timeouts are correctly detected, we wait for a little longer than - // the configured timeout (10ms). - // - long before = IceInternal.Time.currentMonotonicTimeMillis(); - if(_selector.select(timeout * 1000 + 10) == 0) + if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000) { - if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000) - { - throw new TimeoutException(); - } + throw new TimeoutException(); } } - else - { - _selector.select(); - } } else { - _selector.selectNow(); + _selector.select(); } } catch(java.nio.channels.CancelledKeyException ex) @@ -299,63 +190,111 @@ public final class Selector break; } - } - public void - hasMoreData(EventHandler handler) - { - assert(!_selecting && handler.hasMoreData()); + handlers.clear(); - // - // Only add the handler if read is still registered and enabled. - // - if((handler._registered & ~handler._disabled & SocketOperation.Read) != 0) + if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers + { + return; + } + + if(_keys.isEmpty() && timeout <= 0) + { + // + // This is necessary to prevent a busy loop in case of a spurious wake-up which + // sometime occurs in the client thread pool when the communicator is destroyed. + // If there are too many successive spurious wake-ups, we log an error. + // + try + { + Thread.sleep(1); + } + catch(java.lang.InterruptedException ex) + { + } + + if(++_spuriousWakeUp > 100) + { + _spuriousWakeUp = 0; + _instance.initializationData().logger.warning("spurious selector wake up"); + } + return; + } + + _spuriousWakeUp = 0; + + for(java.nio.channels.SelectionKey key : _keys) { - _pendingHandlers.add(handler); + EventHandler handler = (EventHandler)key.attachment(); + try + { + // + // Use the intersection of readyOps and interestOps because we only want to + // report the operations in which the handler is still interested. + // + final int op = fromJavaOps(key.readyOps() & key.interestOps()); + handlers.add(new EventHandlerOpPair(handler, op)); + } + catch(java.nio.channels.CancelledKeyException ex) + { + assert(handler._registered == 0); + } } + _keys.clear(); } - private void - updateImpl(EventHandler handler) + private void updateImpl(EventHandler handler) { + _changes.add(handler); if(_selecting) { - // - // Queue the change since we can't change the selection key interest ops while a select - // operation is in progress (it could block depending on the underlying implementaiton - // of the Java selector). - // - if(_changes.isEmpty()) + if(!_interrupted) { + // + // We can't change the selection key interest ops while a select operation is in progress + // (it could block depending on the underlying implementation of the Java selector). + // + // Wake up the selector if necessary. + // _selector.wakeup(); + _interrupted = true; } - _changes.add(handler); - return; } + else + { + updateSelector(); + } + } - int ops = toJavaOps(handler, handler._registered & ~handler._disabled); - if(handler._key == null) + private void updateSelector() + { + for(EventHandler handler : _changes) { - if(handler._registered != 0) + int status = handler._registered & ~handler._disabled; + int ops = toJavaOps(handler, status); + if(handler._key == null) { - try + if(handler._registered != 0) { - handler._key = handler.fd().register(_selector, ops, handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); + try + { + handler._key = handler.fd().register(_selector, ops, handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + assert(false); + } } } + else + { + handler._key.interestOps(ops); + } } - else - { - handler._key.interestOps(ops); - } + _changes.clear(); } - int - toJavaOps(EventHandler handler, int o) + int toJavaOps(EventHandler handler, int o) { int op = 0; if((o & SocketOperation.Read) != 0) @@ -380,8 +319,7 @@ public final class Selector return op; } - int - fromJavaOps(int o) + int fromJavaOps(int o) { int op = 0; if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0) @@ -399,7 +337,6 @@ public final class Selector return op; } - final private Instance _instance; private java.nio.channels.Selector _selector; @@ -408,5 +345,6 @@ public final class Selector private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>(); private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>(); private boolean _selecting; + private boolean _interrupted; private int _spuriousWakeUp; } |