# -*- Mode: Python; tab-width: 4 -*- import coro_rtsig import sys # linux real-time signal test # from /usr/include/bits/poll.h poll_bits = { 'POLLIN' : 0x001, # There is data to read. 'POLLPRI' : 0x002, # There is urgent data to read. 'POLLOUT' : 0x004, # Writing now will not block. 'POLLRDNORM': 0x040, # Normal data may be read. 'POLLRDBAND': 0x080, # Priority data may be read. 'POLLWRNORM': 0x100, # Writing now will not block. 'POLLWRBAND': 0x200, # Priority data may be written. # Event types always implicitly polled for. These bits need not be # set in `events', but they will appear in `revents' to indicate the # status of the file descriptor. 'POLLERR' : 0x008, # Error condition. 'POLLHUP' : 0x010, # Hung up. 'POLLNVAL' : 0x020, # Invalid polling request. } def untwiddle (flags): result = [] for name, val in poll_bits.items(): if (flags & val): result.append (name) result.sort() return result def describe_next_event(): sys.stderr.write ('(') fd, flags = coro_rtsig.get_next_event() sys.stderr.write ('%d %s)\n' % (fd, untwiddle (flags))) def test(): import socket s = socket.socket (socket.AF_INET, socket.SOCK_STREAM) coro_rtsig.attach (s.fileno()) s.connect (('127.0.0.1', 25)) describe_next_event() block = s.recv (8192) s.send ('HELO there\r\n') describe_next_event() block = s.recv (8192) s.send ('QUIT\r\n') describe_next_event() block = s.recv (8192) describe_next_event() if __name__ == '__main__': test() # $ python test_coro_rtsig.py # (4 ['POLLOUT', 'POLLWRBAND', 'POLLWRNORM']) # (4 ['POLLIN', 'POLLRDNORM']) # (4 ['POLLIN', 'POLLRDNORM']) # (4 ['POLLIN', 'POLLRDNORM']) # $