diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2007-01-17 18:11:11 +0000 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2007-01-17 18:11:11 +0000 |
commit | e124e693704faa56ecf32ce2a243d0c3fa3a5e6e (patch) | |
tree | 071dcaa6edc458f9371289976a4537880e5bb005 /py/demo/Ice/async/Server.py | |
parent | Changed async demo (diff) | |
download | ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.bz2 ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.xz ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.zip |
Changed demo
Diffstat (limited to 'py/demo/Ice/async/Server.py')
-rwxr-xr-x | py/demo/Ice/async/Server.py | 160 |
1 files changed, 86 insertions, 74 deletions
diff --git a/py/demo/Ice/async/Server.py b/py/demo/Ice/async/Server.py index c8f8dd29988..386dd77bb59 100755 --- a/py/demo/Ice/async/Server.py +++ b/py/demo/Ice/async/Server.py @@ -10,92 +10,104 @@ import sys, os, traceback, threading, Ice -slice_dir = os.getenv('ICEPY_HOME', '') -if len(slice_dir) == 0 or not os.path.exists(os.path.join(slice_dir, 'slice')): - slice_dir = os.getenv('ICE_HOME', '') -if len(slice_dir) == 0 or not os.path.exists(os.path.join(slice_dir, 'slice')): - slice_dir = os.path.join('/', 'usr', 'share') -if not os.path.exists(os.path.join(slice_dir, 'slice')): - print sys.argv[0] + ': Slice directory not found. Define ICEPY_HOME or ICE_HOME.' - sys.exit(1) - -Ice.loadSlice('-I' + slice_dir + '/slice Queue.ice') +Ice.loadSlice('Hello.ice') import Demo -class Request(object): - def __init__(self, id, cb): - self.id = id +class CallbackEntry(object): + def __init__(self, cb, delay): self.cb = cb + self.delay = delay -class QueueI(Demo.Queue): +class WorkQueue(threading.Thread): def __init__(self): - self._messageQueue = [] - self._requestQueue = [] - self._lock = threading.Lock() - - def get_async(self, cb, id, current=None): - # - # If there is already a message in the message queue, send the - # response immediately. Otherwise add the callback to the - # request queue. - # - self._lock.acquire() - if len(self._messageQueue) != 0: - try: - cb.ice_response(self._messageQueue[0]) - del self._messageQueue[0] - except Ice.Exception, ex: - print ex - else: - request = Request(id, cb) - self._requestQueue.append(request) - self._lock.release() - - def add(self, message, current=None): - # - # If there is an outstanding request in the request queue, - # send a response. Otherwise add the message to the message - # queue. - # - self._lock.acquire() - if len(self._requestQueue) != 0: - try: - self._requestQueue[0].cb.ice_response(message) - except Ice.Exception, ex: - print ex - del self._requestQueue[0] + threading.Thread.__init__(self) + self._callbacks = [] + self._done = False + self._cond = threading.Condition() + + def run(self): + self._cond.acquire() + + try: + while not self._done: + if len(self._callbacks) == 0: + self._cond.wait() + + if not len(self._callbacks) == 0: + self._cond.wait(self._callbacks[0].delay / 1000.0) + + if not self._done: + print "Belated Hello World!" + self._callbacks[0].cb.ice_response() + del self._callbacks[0] + + for i in range(0, len(self._callbacks)): + self._callbacks[i].cb.ice_exception(Demo.RequestCanceledException()) + finally: + self._cond.release() + + def add(self, cb, delay): + self._cond.acquire() + + try: + if not self._done: + entry = CallbackEntry(cb, delay) + if len(self._callbacks) == 0: + self._cond.notify() + self._callbacks.append(entry) + else: + cb.ice_exception(Demo.RequestCanceledException()) + finally: + self._cond.release() + + def destroy(self): + self._cond.acquire() + + try: + self._done = True + self._cond.notify() + finally: + self._cond.release() + +class HelloI(Demo.Hello): + def __init__(self, workQueue): + self._workQueue = workQueue + + def sayHello_async(self, cb, delay, current=None): + if delay == 0: + print "Hello World!" + cb.ice_response() else: - self._messageQueue.append(message) - self._lock.release() - - def cancel_async(self, cb, ids, current=None): - # - # We send immediate response so that later call to ice_exception - # on queued requests will not cause deadlocks. - # - cb.ice_response() - - self._lock.acquire() - for p in ids: - for i in range(0, len(self._requestQueue)): - if self._requestQueue[i].id == p: - try: - self._requestQueue[i].cb.ice_exception(Demo.RequestCanceledException()) - except Ice.Exception, ex: - # Ignore - pass - - del self._requestQueue[i] - break - self._lock.release() + self._workQueue.add(cb, delay) + + def shutdown(self, current=None): + self._workQueue.destroy() + self._workQueue.join() + + current.adpater.getCommunicator().shutdown(); class Server(Ice.Application): def run(self, args): - adapter = self.communicator().createObjectAdapter("Queue") - adapter.add(QueueI(), self.communicator().stringToIdentity("queue")) + self.callbackOnInterrupt() + + adapter = self.communicator().createObjectAdapter("Hello") + self._workQueue = WorkQueue() + adapter.add(HelloI(self._workQueue), self.communicator().stringToIdentity("hello")) + + self._workQueue.start() adapter.activate() + self.communicator().waitForShutdown() return True + def interruptCallback(self, sig): + self._workQueue.destroy() + self._workQueue.join() + + try: + self._communicator.destroy() + except: + traceback.print_exc() + app = Server() sys.exit(app.main(sys.argv, "config.server")) |