;; -*- Mode: Irken -*-
(include "lib/basis.scm")
;; kqueue demo.
;; should work on modern BSD's, including OS X.
;; see the 'doom' subdirectory for followon.
(cinclude "sys/types.h")
(cinclude "sys/event.h")
(define (kqueue)
(%%cexp (-> int) "kqueue()"))
;; filters
(define EVFILT_READ (%%cexp int "EVFILT_READ"))
(define EVFILT_WRITE (%%cexp int "EVFILT_WRITE"))
(define EVFILT_AIO (%%cexp int "EVFILT_AIO"))
(define EVFILT_VNODE (%%cexp int "EVFILT_VNODE"))
(define EVFILT_PROC (%%cexp int "EVFILT_PROC"))
(define EVFILT_SIGNAL (%%cexp int "EVFILT_SIGNAL"))
(define EVFILT_MACHPORT (%%cexp int "EVFILT_MACHPORT"))
(define EVFILT_TIMER (%%cexp int "EVFILT_TIMER"))
(define EVFILT_SESSION (%%cexp int "EVFILT_SESSION"))
(define EVFILT_SYSCOUNT (%%cexp int "EVFILT_SYSCOUNT"))
;; flags
(define EV_ADD (%%cexp int "EV_ADD"))
(define EV_ENABLE (%%cexp int "EV_ENABLE"))
(define EV_DISABLE (%%cexp int "EV_DISABLE"))
(define EV_DELETE (%%cexp int "EV_DELETE"))
(define EV_RECEIPT (%%cexp int "EV_RECEIPT"))
(define EV_ONESHOT (%%cexp int "EV_ONESHOT"))
(define EV_CLEAR (%%cexp int "EV_CLEAR"))
(define EV_EOF (%%cexp int "EV_EOF"))
(define EV_ERROR (%%cexp int "EV_ERROR"))
(define EV_ADDONE (%%cexp int "EV_ADD|EV_ONESHOT"))
(define (make-changelist n)
{size=n
index=0
buffer = (%callocate (struct kevent) n)
})
(define (add-kevent changes ident filter flags) ;; fflags data udata
(if (< changes.index changes.size)
(begin
(%%cexp ((buffer (struct kevent)) int int int int -> undefined)
"EV_SET (%0+%1, %2, %3, %4, 0, 0, 0)"
changes.buffer changes.index ident filter flags)
(set! changes.index (+ 1 changes.index)))
(error1 "changes overflowed" changes.index)))
(define (get-kevent changes i)
(:kev
(%%cexp ((buffer (struct kevent)) int -> int) "%0[%1].ident" changes.buffer i)
(%%cexp ((buffer (struct kevent)) int -> int) "%0[%1].filter" changes.buffer i)
;; eventually fill in with flags/data/udata
))
(define (kevent kqfd changes-in changes-out)
(%%cexp (int (buffer (struct kevent)) int (buffer (struct kevent)) int -> int)
"kevent (%0, %1, %2, %3, %4, NULL)"
kqfd
changes-in.buffer changes-in.index
changes-out.buffer changes-out.size
))
(cinclude "sys/types.h")
(cinclude "sys/socket.h")
(cinclude "netinet/in.h")
(cinclude "arpa/inet.h")
;; better than trying to muck about with the variadic (and evil) fcntl.
(cverbatim "
void
set_nonblocking (int fd)
{
int flag;
flag = fcntl (fd, F_GETFL, 0);
flag |= (O_NDELAY);
fcntl (fd, F_SETFL, flag);
}
")
(define SOCK_STREAM (%%cexp int "SOCK_STREAM"))
(define AF_INET (%%cexp int "AF_INET"))
(define (socket family type protocol)
(%%cexp (int int int -> int)
"socket (%0, %1, %2)"
family type protocol))
(define (set-nonblocking fd)
(%%cexp (int -> undefined) "set_nonblocking (%0)" fd))
(define (inet_pton af ascii buf)
(%%cexp (int string (buffer (struct sockaddr_in)) -> int)
"inet_pton (%0, %1, &(%2->sin_addr))"
af ascii buf))
(define (inet_ntop af buf)
(let ((ascii (make-string 100))
(r (%%cexp (int (buffer (struct sockaddr_in)) string int -> int)
"inet_ntop (%0, &(%1->sin_addr), %2, %3)"
af buf ascii (string-length ascii))))
;; should strip this to NUL
ascii))
(define (make-in-addr ip port)
(let ((ss (%callocate (struct sockaddr_in) 1)))
(%%cexp ((buffer (struct sockaddr_in)) -> undefined) "(%0->sin_family = PF_INET, PXLL_UNDEFINED)" ss)
(%%cexp ((buffer (struct sockaddr_in)) int -> undefined) "(%0->sin_port = htons(%1), PXLL_UNDEFINED)" ss port)
(trysys (inet_pton AF_INET ip ss))
ss))
(define (bind fd addr)
(%%cexp (int (buffer (struct sockaddr_in)) -> int)
"bind (%0, (struct sockaddr *) %1, sizeof(struct sockaddr_in))"
fd addr))
(define (listen fd backlog)
(%%cexp (int int -> int) "listen (%0, %1)" fd backlog))
(define (accept fd)
(let ((sockaddr (%callocate (struct sockaddr_in) 1))
(address-len (%callocate socklen_t 1)))
(%%cexp ((buffer socklen_t) -> undefined) "*%0 = sizeof(struct sockaddr_in)" address-len)
(%%cexp (int (buffer (struct sockaddr_in)) (buffer socklen_t) -> int)
"accept (%0, (struct sockaddr *) %1, %2)"
fd sockaddr address-len)))
(define (connect fd addr)
(%%cexp (int (buffer (struct sockaddr_in)) -> int)
"connect (%0, (struct sockaddr *) %1, sizeof (struct sockaddr_in))"
fd addr))
(define EAGAIN (%%cexp int "EAGAIN"))
(cinclude "sys/errno.h")
(define (trysys retval)
(if (< retval 0)
(error1 "system error" (copy-cstring (%%cexp (-> cstring) "strerror(errno)" )))
retval))
(define run-queue (queue/make))
(define (enqueue k)
(queue/add run-queue k))
(define (dispatch)
(match (queue/pop run-queue) with
(maybe:yes k) -> (putcc k #u)
(maybe:no) -> #u))
(define (fork f)
(enqueue (getcc))
(f)
(dispatch))
(define (yield)
(enqueue (getcc))
(dispatch))
(define (dispatch-kevent p)
(match (queue/pop run-queue) with
(maybe:yes k) -> (putcc k #u)
(maybe:no) -> (poller/wait-and-schedule p)))
(define (make-poller)
{ kqfd = (kqueue)
nwait = 0 ;; how many events are waiting?
filters = (make-vector EVFILT_SYSCOUNT (tree/empty))
ievents = (make-changelist 1000)
oevents = (make-changelist 1000)
})
;; these funs know that EVFILT values are consecutive small negative ints
(define (poller/lookup-event p ident filter)
(tree/member p.filters[(- 0 filter)] < ident))
(define (poller/add-event p ident filter k)
(set! p.nwait (+ 1 p.nwait))
(set! p.filters[(- 0 filter)]
(tree/insert p.filters[(- 0 filter)]
< ident k)))
(define (poller/delete-event p ident filter)
(set! p.filters[(- 0 filter)]
(tree/delete p.filters[(- 0 filter)] ident < =))
(set! p.nwait (- p.nwait 1)))
(define (poller/wait-for p ident filter)
(let ((k (getcc)))
(match (poller/lookup-event p ident filter) with
(maybe:no)
-> (begin
(print-string (format "adding kevent: ident=" (int ident) " filter=" (int filter) "\n"))
(add-kevent p.ievents ident filter EV_ADDONE)
(poller/add-event p ident filter k)
(dispatch-kevent p))
(maybe:yes _) -> (error "poller/wait-for: event already present")
)))
(define (poller/wait-for-read p fd)
(poller/wait-for p fd EVFILT_READ))
(define (poller/wait-for-write p fd)
(poller/wait-for p fd EVFILT_WRITE))
(define poller/enqueue-waiting-thread
p (:kev ident filter)
-> (match (poller/lookup-event p ident filter) with
(maybe:yes k) -> (begin
(poller/delete-event p ident filter)
(enqueue k))
(maybe:no) -> (error "poller/get-waiting-thread: no thread")))
(define (poller/wait-and-schedule p)
;; all the runnable threads have done their bit, now
;; throw it to kevent().
(if (= p.nwait 0)
(print-string "no events, will wait forever!\n"))
(let ((n (kevent p.kqfd p.ievents p.oevents)))
(if (< n 0)
(error "kevent() failed")
(begin
(print-string (format "poller/wait-and-schedule: got " (int n) " events\n"))
(set! p.ievents.index 0)
(for-range
i n
(poller/enqueue-waiting-thread
p (get-kevent p.oevents i)))
(dispatch-kevent p)
))))
(define (fetch-head p ip)
(let ((sfd (socket AF_INET SOCK_STREAM 0))
(addr (make-in-addr ip 80)))
(set-nonblocking sfd)
(connect sfd addr)
(poller/wait-for-write p sfd)
(printn (write sfd "HEAD / HTTP/1.0\r\n\r\n"))
(print-string "sent request, waiting for read...\n")
(poller/wait-for-read p sfd)
(print-string (read sfd 1024))
(print-string "done!\n")
(close sfd)
))
(let ((p (make-poller))
(ip "72.52.84.226"))
(fork (lambda () (fetch-head p ip)))
(fork (lambda () (fetch-head p ip)))
(fork (lambda () (fetch-head p ip)))
(fetch-head p ip)
)