10 #include <sys/ioctl.h>
14 #include <event2/event.h>
15 #include <event2/thread.h>
18 #include "evhtp/thread.h"
32 #ifdef EVTHR_SHARED_PIPE
53 #ifdef EVTHR_SHARED_PIPE
55 struct event * shared_pool_ev;
57 TAILQ_ENTRY(
evthr) next;
60 #define _evthr_read(thr, cmd, sock) \
61 (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0
70 if (!(thread = (evthr_t *)
args)) {
80 (cmd.cb)(thread, cmd.args, thread->arg);
85 event_base_loopbreak(thread->evbase);
96 if (!(thread = (evthr_t *)
args)) {
100 if (thread == NULL || thread->thr == NULL) {
104 thread->evbase = event_base_new();
105 thread->event = event_new(thread->evbase, thread->rdr,
108 event_add(thread->event, NULL);
110 #ifdef EVTHR_SHARED_PIPE
111 if (thread->pool_rdr > 0) {
112 thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
114 event_add(thread->shared_pool_ev, NULL);
119 pthread_mutex_lock(&thread->lock);
120 if (thread->init_cb != NULL) {
121 (thread->init_cb)(thread, thread->arg);
124 pthread_mutex_unlock(&thread->lock);
126 event_base_loop(thread->evbase, 0);
128 pthread_mutex_lock(&thread->lock);
129 if (thread->exit_cb != NULL) {
130 (thread->exit_cb)(thread, thread->arg);
133 pthread_mutex_unlock(&thread->lock);
135 if (thread->err == 1) {
136 fprintf(stderr,
"FATAL ERROR!\n");
151 if (send(thread->wdr, &cmd,
sizeof(cmd), 0) <= 0) {
152 return EVTHR_RES_RETRY;
167 if (send(thread->wdr, &cmd,
sizeof(evthr_cmd_t), 0) < 0) {
168 return EVTHR_RES_RETRY;
171 pthread_join(*thread->thr, NULL);
178 return thr ? thr->evbase : NULL;
192 return thr ? thr->aux : NULL;
225 if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
229 evutil_make_socket_nonblocking(fds[0]);
230 evutil_make_socket_nonblocking(fds[1]);
232 if (!(thread = calloc(
sizeof(evthr_t), 1))) {
236 thread->thr = malloc(
sizeof(pthread_t));
238 thread->rdr = fds[0];
239 thread->wdr = fds[1];
241 thread->init_cb = init_cb;
242 thread->exit_cb = exit_cb;
244 if (pthread_mutex_init(&thread->lock, NULL)) {
267 if (thread == NULL || thread->thr == NULL) {
271 if (pthread_create(thread->thr, NULL,
_evthr_loop, (
void *)thread)) {
281 if (thread == NULL) {
285 if (thread->rdr > 0) {
289 if (thread->wdr > 0) {
298 event_free(thread->event);
301 #ifdef EVTHR_SHARED_PIPE
302 if (thread->shared_pool_ev) {
303 event_free(thread->shared_pool_ev);
308 if (thread->evbase) {
309 event_base_free(thread->evbase);
326 TAILQ_REMOVE(&pool->threads, thread, next);
341 return EVTHR_RES_FATAL;
356 ioctl(thread->rdr, FIONREAD, &backlog);
358 return (
int)(backlog /
sizeof(evthr_cmd_t));
364 #ifdef EVTHR_SHARED_PIPE
371 if (
evhtp_unlikely(send(pool->wdr, &cmd,
sizeof(cmd), 0) == -1)) {
372 return EVTHR_RES_RETRY;
377 evthr_t * thread = NULL;
378 evthr_t * min_thread = NULL;
382 return EVTHR_RES_FATAL;
386 return EVTHR_RES_NOCB;
390 TAILQ_FOREACH(thread, &pool->threads, next) {
398 if (min_thread == NULL || backlog < min_backlog) {
400 min_backlog = backlog;
407 static evthr_pool_t *
409 evthr_init_cb init_cb,
410 evthr_exit_cb exit_cb,
416 #ifdef EVTHR_SHARED_PIPE
424 if (!(pool = calloc(
sizeof(evthr_pool_t), 1))) {
428 pool->nthreads = nthreads;
429 TAILQ_INIT(&pool->threads);
431 #ifdef EVTHR_SHARED_PIPE
432 if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
436 evutil_make_socket_nonblocking(fds[0]);
437 evutil_make_socket_nonblocking(fds[1]);
443 for (i = 0; i < nthreads; i++) {
451 #ifdef EVTHR_SHARED_PIPE
452 thread->pool_rdr = fds[0];
455 TAILQ_INSERT_TAIL(&pool->threads, thread, next);
469 evthr_init_cb init_cb,
470 evthr_exit_cb exit_cb,
void * shared)
478 evthr_t *
evthr = NULL;
484 TAILQ_FOREACH(
evthr, &pool->threads, next) {