summaryrefslogtreecommitdiff
path: root/py/demo/Ice/async/Server.py
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2007-01-17 18:11:11 +0000
committerDwayne Boone <dwayne@zeroc.com>2007-01-17 18:11:11 +0000
commite124e693704faa56ecf32ce2a243d0c3fa3a5e6e (patch)
tree071dcaa6edc458f9371289976a4537880e5bb005 /py/demo/Ice/async/Server.py
parentChanged async demo (diff)
downloadice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.bz2
ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.xz
ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.zip
Changed demo
Diffstat (limited to 'py/demo/Ice/async/Server.py')
-rwxr-xr-xpy/demo/Ice/async/Server.py160
1 files changed, 86 insertions, 74 deletions
diff --git a/py/demo/Ice/async/Server.py b/py/demo/Ice/async/Server.py
index c8f8dd29988..386dd77bb59 100755
--- a/py/demo/Ice/async/Server.py
+++ b/py/demo/Ice/async/Server.py
@@ -10,92 +10,104 @@
import sys, os, traceback, threading, Ice
-slice_dir = os.getenv('ICEPY_HOME', '')
-if len(slice_dir) == 0 or not os.path.exists(os.path.join(slice_dir, 'slice')):
- slice_dir = os.getenv('ICE_HOME', '')
-if len(slice_dir) == 0 or not os.path.exists(os.path.join(slice_dir, 'slice')):
- slice_dir = os.path.join('/', 'usr', 'share')
-if not os.path.exists(os.path.join(slice_dir, 'slice')):
- print sys.argv[0] + ': Slice directory not found. Define ICEPY_HOME or ICE_HOME.'
- sys.exit(1)
-
-Ice.loadSlice('-I' + slice_dir + '/slice Queue.ice')
+Ice.loadSlice('Hello.ice')
import Demo
-class Request(object):
- def __init__(self, id, cb):
- self.id = id
+class CallbackEntry(object):
+ def __init__(self, cb, delay):
self.cb = cb
+ self.delay = delay
-class QueueI(Demo.Queue):
+class WorkQueue(threading.Thread):
def __init__(self):
- self._messageQueue = []
- self._requestQueue = []
- self._lock = threading.Lock()
-
- def get_async(self, cb, id, current=None):
- #
- # If there is already a message in the message queue, send the
- # response immediately. Otherwise add the callback to the
- # request queue.
- #
- self._lock.acquire()
- if len(self._messageQueue) != 0:
- try:
- cb.ice_response(self._messageQueue[0])
- del self._messageQueue[0]
- except Ice.Exception, ex:
- print ex
- else:
- request = Request(id, cb)
- self._requestQueue.append(request)
- self._lock.release()
-
- def add(self, message, current=None):
- #
- # If there is an outstanding request in the request queue,
- # send a response. Otherwise add the message to the message
- # queue.
- #
- self._lock.acquire()
- if len(self._requestQueue) != 0:
- try:
- self._requestQueue[0].cb.ice_response(message)
- except Ice.Exception, ex:
- print ex
- del self._requestQueue[0]
+ threading.Thread.__init__(self)
+ self._callbacks = []
+ self._done = False
+ self._cond = threading.Condition()
+
+ def run(self):
+ self._cond.acquire()
+
+ try:
+ while not self._done:
+ if len(self._callbacks) == 0:
+ self._cond.wait()
+
+ if not len(self._callbacks) == 0:
+ self._cond.wait(self._callbacks[0].delay / 1000.0)
+
+ if not self._done:
+ print "Belated Hello World!"
+ self._callbacks[0].cb.ice_response()
+ del self._callbacks[0]
+
+ for i in range(0, len(self._callbacks)):
+ self._callbacks[i].cb.ice_exception(Demo.RequestCanceledException())
+ finally:
+ self._cond.release()
+
+ def add(self, cb, delay):
+ self._cond.acquire()
+
+ try:
+ if not self._done:
+ entry = CallbackEntry(cb, delay)
+ if len(self._callbacks) == 0:
+ self._cond.notify()
+ self._callbacks.append(entry)
+ else:
+ cb.ice_exception(Demo.RequestCanceledException())
+ finally:
+ self._cond.release()
+
+ def destroy(self):
+ self._cond.acquire()
+
+ try:
+ self._done = True
+ self._cond.notify()
+ finally:
+ self._cond.release()
+
+class HelloI(Demo.Hello):
+ def __init__(self, workQueue):
+ self._workQueue = workQueue
+
+ def sayHello_async(self, cb, delay, current=None):
+ if delay == 0:
+ print "Hello World!"
+ cb.ice_response()
else:
- self._messageQueue.append(message)
- self._lock.release()
-
- def cancel_async(self, cb, ids, current=None):
- #
- # We send immediate response so that later call to ice_exception
- # on queued requests will not cause deadlocks.
- #
- cb.ice_response()
-
- self._lock.acquire()
- for p in ids:
- for i in range(0, len(self._requestQueue)):
- if self._requestQueue[i].id == p:
- try:
- self._requestQueue[i].cb.ice_exception(Demo.RequestCanceledException())
- except Ice.Exception, ex:
- # Ignore
- pass
-
- del self._requestQueue[i]
- break
- self._lock.release()
+ self._workQueue.add(cb, delay)
+
+ def shutdown(self, current=None):
+ self._workQueue.destroy()
+ self._workQueue.join()
+
+ current.adpater.getCommunicator().shutdown();
class Server(Ice.Application):
def run(self, args):
- adapter = self.communicator().createObjectAdapter("Queue")
- adapter.add(QueueI(), self.communicator().stringToIdentity("queue"))
+ self.callbackOnInterrupt()
+
+ adapter = self.communicator().createObjectAdapter("Hello")
+ self._workQueue = WorkQueue()
+ adapter.add(HelloI(self._workQueue), self.communicator().stringToIdentity("hello"))
+
+ self._workQueue.start()
adapter.activate()
+
self.communicator().waitForShutdown()
return True
+ def interruptCallback(self, sig):
+ self._workQueue.destroy()
+ self._workQueue.join()
+
+ try:
+ self._communicator.destroy()
+ except:
+ traceback.print_exc()
+
app = Server()
sys.exit(app.main(sys.argv, "config.server"))