summaryrefslogtreecommitdiff
path: root/py/demo/Ice/async/Server.py
diff options
context:
space:
mode:
Diffstat (limited to 'py/demo/Ice/async/Server.py')
-rwxr-xr-xpy/demo/Ice/async/Server.py63
1 files changed, 57 insertions, 6 deletions
diff --git a/py/demo/Ice/async/Server.py b/py/demo/Ice/async/Server.py
index d2970424731..5f09f1c7845 100755
--- a/py/demo/Ice/async/Server.py
+++ b/py/demo/Ice/async/Server.py
@@ -8,35 +8,86 @@
#
# **********************************************************************
-import sys, traceback, Ice
+import sys, os, traceback, threading, Ice
-Ice.loadSlice('Queue.ice')
+slice_dir = os.getenv('ICEPY_HOME', '')
+if len(slice_dir) == 0 or not os.path.exists(os.path.join(slice_dir, 'slice')):
+ slice_dir = os.getenv('ICE_HOME', '')
+if len(slice_dir) == 0 or not os.path.exists(os.path.join(slice_dir, 'slice')):
+ slice_dir = os.path.join('/', 'usr', 'share')
+if not os.path.exists(os.path.join(slice_dir, 'slice')):
+ print sys.argv[0] + ': Slice directory not found. Define ICEPY_HOME or ICE_HOME.'
+ sys.exit(1)
+
+Ice.loadSlice('-I' + slice_dir + '/slice Queue.ice')
import Demo
+class Request(object):
+ def __init__(self, id, cb):
+ self.id = id
+ self.cb = cb
+
class QueueI(Demo.Queue):
def __init__(self):
self._messageQueue = []
self._requestQueue = []
+ self._lock = threading.Lock()
- def get_async(self, getCB, current=None):
+ def get_async(self, cb, id, current=None):
+ #
+ # If there is already a message in the message queue, send the
+ # response immediately. Otherwise add the callback to the
+ # request queue.
+ #
+ self._lock.acquire()
if len(self._messageQueue) != 0:
try:
- getCB.ice_response(self._messageQueue[0])
+ cb.ice_response(self._messageQueue[0])
del self._messageQueue[0]
except Ice.Exception, ex:
print ex
else:
- self._requestQueue.append(getCB)
+ request = Request(id, cb)
+ self._requestQueue.append(request)
+ self._lock.release()
def add(self, message, current=None):
+ #
+ # If there is an outstanding request in the request queue,
+ # send a response. Otherwise add the message to the message
+ # queue.
+ #
+ self._lock.acquire()
if len(self._requestQueue) != 0:
try:
- self._requestQueue[0].ice_response(message)
+ self._requestQueue[0].cb.ice_response(message)
except Ice.Exception, ex:
print ex
del self._requestQueue[0]
else:
self._messageQueue.append(message)
+ self._lock.release()
+
+ def cancel_async(self, cb, ids, current=None):
+ #
+ # We send immediate response so that later call to ice_exception
+ # on queued requests will not cause deadlocks.
+ #
+ cb.ice_response()
+
+ self._lock.acquire()
+ for p in ids:
+ for i in range(0, len(self._requestQueue)):
+ if self._requestQueue[i].id == p:
+ try:
+ self._requestQueue[i].cb.ice_exception(Demo.RequestCanceledException())
+ except Ice.Exception, ex:
+ # Ignore
+ pass
+
+ del self._requestQueue[i]
+ break
+ self._lock.release()
class Server(Ice.Application):
def run(self, args):