summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/Selector.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/Selector.java')
-rw-r--r--java/src/IceInternal/Selector.java316
1 files changed, 127 insertions, 189 deletions
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java
index 2ab11238b66..b23fbacb1b1 100644
--- a/java/src/IceInternal/Selector.java
+++ b/java/src/IceInternal/Selector.java
@@ -15,10 +15,11 @@ public final class Selector
{
}
- public
- Selector(Instance instance)
+ public Selector(Instance instance)
{
_instance = instance;
+ _selecting = false;
+ _interrupted = false;
try
{
@@ -36,8 +37,7 @@ public final class Selector
_keys = _selector.selectedKeys();
}
- public void
- destroy()
+ public void destroy()
{
try
{
@@ -49,14 +49,12 @@ public final class Selector
_selector = null;
}
- public void
- initialize(EventHandler handler)
+ public void initialize(EventHandler handler)
{
updateImpl(handler);
}
- public void
- update(EventHandler handler, int remove, int add)
+ public void update(EventHandler handler, int remove, int add)
{
int previous = handler._registered;
handler._registered = handler._registered & ~remove;
@@ -65,23 +63,11 @@ public final class Selector
{
return;
}
- updateImpl(handler);
- if(handler.hasMoreData() && (handler._disabled & SocketOperation.Read) == 0)
- {
- if((add & SocketOperation.Read) != 0)
- {
- _pendingHandlers.add(handler);
- }
- if((remove & SocketOperation.Read) != 0)
- {
- _pendingHandlers.remove(handler);
- }
- }
+ updateImpl(handler);
}
- public void
- enable(EventHandler handler, int status)
+ public void enable(EventHandler handler, int status)
{
if((handler._disabled & status) == 0)
{
@@ -92,180 +78,85 @@ public final class Selector
if((handler._registered & status) != 0)
{
updateImpl(handler);
-
- if((status & SocketOperation.Read) != 0 && handler.hasMoreData())
- {
- // Add back the pending handler if reads are enabled.
- _pendingHandlers.add(handler);
- }
}
}
- public void
- disable(EventHandler handler, int status)
+ public void disable(EventHandler handler, int status)
{
if((handler._disabled & status) != 0)
{
return;
}
handler._disabled = handler._disabled | status;
-
+
if((handler._registered & status) != 0)
{
updateImpl(handler);
-
- if((status & SocketOperation.Read) != 0 && handler.hasMoreData())
- {
- // Remove the pending handler if reads are disabled.
- _pendingHandlers.remove(handler);
- }
}
}
- public void
- finish(EventHandler handler)
+ public void finish(EventHandler handler)
{
- handler._registered = 0;
-
- if(handler._key != null)
+ if(handler._registered != 0)
{
- handler._key.cancel();
- handler._key = null;
- }
-
- _changes.remove(handler);
- _pendingHandlers.remove(handler);
- }
+ if(handler._key != null)
+ {
+ handler._key.cancel();
+ handler._key = null;
+ }
- public void
- startSelect()
- {
- assert(_changes.isEmpty());
+ _changes.remove(handler);
- //
- // Don't set _selecting = true if there are pending handlers, select() won't block
- // and will just call selectNow().
- //
- if(_pendingHandlers.isEmpty())
- {
- _selecting = true;
+ update(handler, handler._registered, SocketOperation.None);
}
}
- public void
- finishSelect(java.util.List<EventHandler> handlers, long timeout)
+ public void startSelect()
{
- _selecting = false;
- handlers.clear();
-
- if(!_changes.isEmpty())
- {
- for(EventHandler h : _changes)
- {
- updateImpl(h);
- }
- _changes.clear();
- }
- else if(_keys.isEmpty() && _pendingHandlers.isEmpty() && timeout <= 0)
+ if(_interrupted)
{
- //
- // This is necessary to prevent a busy loop in case of a spurious wake-up which
- // sometime occurs in the client thread pool when the communicator is destroyed.
- // If there are too many successive spurious wake-ups, we log an error.
- //
- try
- {
- Thread.sleep(1);
- }
- catch(java.lang.InterruptedException ex)
- {
- }
-
- if(++_spuriousWakeUp > 100)
- {
- _spuriousWakeUp = 0;
- _instance.initializationData().logger.warning("spurious selector wake up");
- }
- return;
- }
-
- _spuriousWakeUp = 0;
+ _interrupted = false;
- for(java.nio.channels.SelectionKey key : _keys)
- {
- EventHandler handler = (EventHandler)key.attachment();
- try
- {
- //
- // It's important to check for interestOps here because the event handler
- // registration might have changed above when _changes was processed. We
- // don't want to return event handlers which aren't interested anymore in
- // a given operation.
- //
- handler._ready = fromJavaOps(key.readyOps() & key.interestOps());
- if(handler.hasMoreData() && _pendingHandlers.remove(handler))
- {
- handler._ready |= SocketOperation.Read;
- }
- handlers.add(handler);
- }
- catch(java.nio.channels.CancelledKeyException ex)
+ if(!_changes.isEmpty())
{
- assert(handler._registered == 0);
+ updateSelector();
}
}
- _keys.clear();
+ _selecting = true;
+ }
- for(EventHandler handler : _pendingHandlers)
- {
- if(handler.hasMoreData())
- {
- handler._ready = SocketOperation.Read;
- handlers.add(handler);
- }
- }
- _pendingHandlers.clear();
+ public void finishSelect()
+ {
+ _selecting = false;
}
- public void
- select(long timeout)
+ public void select(java.util.List<EventHandlerOpPair> handlers, long timeout)
throws TimeoutException
{
while(true)
{
try
{
- //
- // Only block if _selecting = true, otherwise we call selectNow() to retrieve new
- // ready handlers and process handlers from _pendingHandlers.
- //
- if(_selecting)
+ if(timeout > 0)
{
- if(timeout > 0)
+ //
+ // NOTE: On some platforms, select() sometime returns slightly before
+ // the timeout (at least according to our monotonic time). To make sure
+ // timeouts are correctly detected, we wait for a little longer than
+ // the configured timeout (10ms).
+ //
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ if(_selector.select(timeout * 1000 + 10) == 0)
{
- //
- // NOTE: On some platforms, select() sometime returns slightly before
- // the timeout (at least according to our monotonic time). To make sure
- // timeouts are correctly detected, we wait for a little longer than
- // the configured timeout (10ms).
- //
- long before = IceInternal.Time.currentMonotonicTimeMillis();
- if(_selector.select(timeout * 1000 + 10) == 0)
+ if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000)
{
- if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000)
- {
- throw new TimeoutException();
- }
+ throw new TimeoutException();
}
}
- else
- {
- _selector.select();
- }
}
else
{
- _selector.selectNow();
+ _selector.select();
}
}
catch(java.nio.channels.CancelledKeyException ex)
@@ -299,63 +190,111 @@ public final class Selector
break;
}
- }
- public void
- hasMoreData(EventHandler handler)
- {
- assert(!_selecting && handler.hasMoreData());
+ handlers.clear();
- //
- // Only add the handler if read is still registered and enabled.
- //
- if((handler._registered & ~handler._disabled & SocketOperation.Read) != 0)
+ if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers
+ {
+ return;
+ }
+
+ if(_keys.isEmpty() && timeout <= 0)
+ {
+ //
+ // This is necessary to prevent a busy loop in case of a spurious wake-up which
+ // sometime occurs in the client thread pool when the communicator is destroyed.
+ // If there are too many successive spurious wake-ups, we log an error.
+ //
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ if(++_spuriousWakeUp > 100)
+ {
+ _spuriousWakeUp = 0;
+ _instance.initializationData().logger.warning("spurious selector wake up");
+ }
+ return;
+ }
+
+ _spuriousWakeUp = 0;
+
+ for(java.nio.channels.SelectionKey key : _keys)
{
- _pendingHandlers.add(handler);
+ EventHandler handler = (EventHandler)key.attachment();
+ try
+ {
+ //
+ // Use the intersection of readyOps and interestOps because we only want to
+ // report the operations in which the handler is still interested.
+ //
+ final int op = fromJavaOps(key.readyOps() & key.interestOps());
+ handlers.add(new EventHandlerOpPair(handler, op));
+ }
+ catch(java.nio.channels.CancelledKeyException ex)
+ {
+ assert(handler._registered == 0);
+ }
}
+ _keys.clear();
}
- private void
- updateImpl(EventHandler handler)
+ private void updateImpl(EventHandler handler)
{
+ _changes.add(handler);
if(_selecting)
{
- //
- // Queue the change since we can't change the selection key interest ops while a select
- // operation is in progress (it could block depending on the underlying implementaiton
- // of the Java selector).
- //
- if(_changes.isEmpty())
+ if(!_interrupted)
{
+ //
+ // We can't change the selection key interest ops while a select operation is in progress
+ // (it could block depending on the underlying implementation of the Java selector).
+ //
+ // Wake up the selector if necessary.
+ //
_selector.wakeup();
+ _interrupted = true;
}
- _changes.add(handler);
- return;
}
+ else
+ {
+ updateSelector();
+ }
+ }
- int ops = toJavaOps(handler, handler._registered & ~handler._disabled);
- if(handler._key == null)
+ private void updateSelector()
+ {
+ for(EventHandler handler : _changes)
{
- if(handler._registered != 0)
+ int status = handler._registered & ~handler._disabled;
+ int ops = toJavaOps(handler, status);
+ if(handler._key == null)
{
- try
+ if(handler._registered != 0)
{
- handler._key = handler.fd().register(_selector, ops, handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
+ try
+ {
+ handler._key = handler.fd().register(_selector, ops, handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ assert(false);
+ }
}
}
+ else
+ {
+ handler._key.interestOps(ops);
+ }
}
- else
- {
- handler._key.interestOps(ops);
- }
+ _changes.clear();
}
- int
- toJavaOps(EventHandler handler, int o)
+ int toJavaOps(EventHandler handler, int o)
{
int op = 0;
if((o & SocketOperation.Read) != 0)
@@ -380,8 +319,7 @@ public final class Selector
return op;
}
- int
- fromJavaOps(int o)
+ int fromJavaOps(int o)
{
int op = 0;
if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0)
@@ -399,7 +337,6 @@ public final class Selector
return op;
}
-
final private Instance _instance;
private java.nio.channels.Selector _selector;
@@ -408,5 +345,6 @@ public final class Selector
private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>();
private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>();
private boolean _selecting;
+ private boolean _interrupted;
private int _spuriousWakeUp;
}