Libevhtp  1.2.13
thread.c
Go to the documentation of this file.
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <stdint.h>
5 #include <limits.h>
6 #ifndef WIN32
7 #include <sys/queue.h>
8 #endif
9 
10 #include <sys/ioctl.h>
11 #include <unistd.h>
12 #include <pthread.h>
13 
14 #include <event2/event.h>
15 #include <event2/thread.h>
16 
17 #include "internal.h"
18 #include "evhtp/thread.h"
19 
20 typedef struct evthr_cmd evthr_cmd_t;
21 typedef struct evthr_pool_slist evthr_pool_slist_t;
22 
23 struct evthr_cmd {
24  uint8_t stop;
25  void * args;
26  evthr_cb cb;
27 } __attribute__((packed));
28 
29 TAILQ_HEAD(evthr_pool_slist, evthr);
30 
31 struct evthr_pool {
32 #ifdef EVTHR_SHARED_PIPE
33  int rdr;
34  int wdr;
35 #endif
36  int nthreads;
38 };
39 
40 struct evthr {
41  int rdr;
42  int wdr;
43  char err;
44  ev_t * event;
45  evbase_t * evbase;
46  pthread_mutex_t lock;
47  pthread_t * thr;
48  evthr_init_cb init_cb;
49  evthr_exit_cb exit_cb;
50  void * arg;
51  void * aux;
52 
53 #ifdef EVTHR_SHARED_PIPE
54  int pool_rdr;
55  struct event * shared_pool_ev;
56 #endif
57  TAILQ_ENTRY(evthr) next;
58 };
59 
60 #define _evthr_read(thr, cmd, sock) \
61  (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0
62 
63 static void
64 _evthr_read_cmd(evutil_socket_t sock, short which, void * args)
65 {
66  evthr_t * thread;
67  evthr_cmd_t cmd;
68  int stopped;
69 
70  if (!(thread = (evthr_t *)args)) {
71  return;
72  }
73 
74  stopped = 0;
75 
76  if (evhtp_likely(_evthr_read(thread, &cmd, sock) == 1)) {
77  stopped = cmd.stop;
78 
79  if (evhtp_likely(cmd.cb != NULL)) {
80  (cmd.cb)(thread, cmd.args, thread->arg);
81  }
82  }
83 
84  if (evhtp_unlikely(stopped == 1)) {
85  event_base_loopbreak(thread->evbase);
86  }
87 
88  return;
89 } /* _evthr_read_cmd */
90 
91 static void *
93 {
94  evthr_t * thread;
95 
96  if (!(thread = (evthr_t *)args)) {
97  return NULL;
98  }
99 
100  if (thread == NULL || thread->thr == NULL) {
101  pthread_exit(NULL);
102  }
103 
104  thread->evbase = event_base_new();
105  thread->event = event_new(thread->evbase, thread->rdr,
106  EV_READ | EV_PERSIST, _evthr_read_cmd, args);
107 
108  event_add(thread->event, NULL);
109 
110 #ifdef EVTHR_SHARED_PIPE
111  if (thread->pool_rdr > 0) {
112  thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
113  EV_READ | EV_PERSIST, _evthr_read_cmd, args);
114  event_add(thread->shared_pool_ev, NULL);
115  }
116 
117 #endif
118 
119  pthread_mutex_lock(&thread->lock);
120  if (thread->init_cb != NULL) {
121  (thread->init_cb)(thread, thread->arg);
122  }
123 
124  pthread_mutex_unlock(&thread->lock);
125 
126  event_base_loop(thread->evbase, 0);
127 
128  pthread_mutex_lock(&thread->lock);
129  if (thread->exit_cb != NULL) {
130  (thread->exit_cb)(thread, thread->arg);
131  }
132 
133  pthread_mutex_unlock(&thread->lock);
134 
135  if (thread->err == 1) {
136  fprintf(stderr, "FATAL ERROR!\n");
137  }
138 
139  pthread_exit(NULL);
140 } /* _evthr_loop */
141 
142 evthr_res
143 evthr_defer(evthr_t * thread, evthr_cb cb, void * arg)
144 {
145  evthr_cmd_t cmd = {
146  .cb = cb,
147  .args = arg,
148  .stop = 0
149  };
150 
151  if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
152  return EVTHR_RES_RETRY;
153  }
154 
155  return EVTHR_RES_OK;
156 }
157 
158 evthr_res
159 evthr_stop(evthr_t * thread)
160 {
161  evthr_cmd_t cmd = {
162  .cb = NULL,
163  .args = NULL,
164  .stop = 1
165  };
166 
167  if (send(thread->wdr, &cmd, sizeof(evthr_cmd_t), 0) < 0) {
168  return EVTHR_RES_RETRY;
169  }
170 
171  pthread_join(*thread->thr, NULL);
172  return EVTHR_RES_OK;
173 }
174 
175 evbase_t *
176 evthr_get_base(evthr_t * thr)
177 {
178  return thr ? thr->evbase : NULL;
179 }
180 
181 void
182 evthr_set_aux(evthr_t * thr, void * aux)
183 {
184  if (thr) {
185  thr->aux = aux;
186  }
187 }
188 
189 void *
190 evthr_get_aux(evthr_t * thr)
191 {
192  return thr ? thr->aux : NULL;
193 }
194 
195 int
196 evthr_set_initcb(evthr_t * thr, evthr_init_cb cb)
197 {
198  if (thr == NULL) {
199  return -1;
200  }
201 
202  thr->init_cb = cb;
203 
204  return 0;
205 }
206 
207 int
208 evthr_set_exitcb(evthr_t * thr, evthr_exit_cb cb)
209 {
210  if (thr == NULL) {
211  return -1;
212  }
213 
214  thr->exit_cb = cb;
215 
216  return 0;
217 }
218 
219 static evthr_t *
220 _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args)
221 {
222  evthr_t * thread;
223  int fds[2];
224 
225  if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
226  return NULL;
227  }
228 
229  evutil_make_socket_nonblocking(fds[0]);
230  evutil_make_socket_nonblocking(fds[1]);
231 
232  if (!(thread = calloc(sizeof(evthr_t), 1))) {
233  return NULL;
234  }
235 
236  thread->thr = malloc(sizeof(pthread_t));
237  thread->arg = args;
238  thread->rdr = fds[0];
239  thread->wdr = fds[1];
240 
241  thread->init_cb = init_cb;
242  thread->exit_cb = exit_cb;
243 
244  if (pthread_mutex_init(&thread->lock, NULL)) {
245  evthr_free(thread);
246  return NULL;
247  }
248 
249  return thread;
250 } /* evthr_new */
251 
252 evthr_t *
253 evthr_new(evthr_init_cb init_cb, void * args)
254 {
255  return _evthr_new(init_cb, NULL, args);
256 }
257 
258 evthr_t *
259 evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args)
260 {
261  return _evthr_new(init_cb, exit_cb, args);
262 }
263 
264 int
265 evthr_start(evthr_t * thread)
266 {
267  if (thread == NULL || thread->thr == NULL) {
268  return -1;
269  }
270 
271  if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
272  return -1;
273  }
274 
275  return 0;
276 }
277 
278 void
279 evthr_free(evthr_t * thread)
280 {
281  if (thread == NULL) {
282  return;
283  }
284 
285  if (thread->rdr > 0) {
286  close(thread->rdr);
287  }
288 
289  if (thread->wdr > 0) {
290  close(thread->wdr);
291  }
292 
293  if (thread->thr) {
294  free(thread->thr);
295  }
296 
297  if (thread->event) {
298  event_free(thread->event);
299  }
300 
301 #ifdef EVTHR_SHARED_PIPE
302  if (thread->shared_pool_ev) {
303  event_free(thread->shared_pool_ev);
304  }
305 
306 #endif
307 
308  if (thread->evbase) {
309  event_base_free(thread->evbase);
310  }
311 
312  free(thread);
313 } /* evthr_free */
314 
315 void
316 evthr_pool_free(evthr_pool_t * pool)
317 {
318  evthr_t * thread;
319  evthr_t * save;
320 
321  if (pool == NULL) {
322  return;
323  }
324 
325  TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
326  TAILQ_REMOVE(&pool->threads, thread, next);
327 
328  evthr_free(thread);
329  }
330 
331  free(pool);
332 }
333 
334 evthr_res
335 evthr_pool_stop(evthr_pool_t * pool)
336 {
337  evthr_t * thr;
338  evthr_t * save;
339 
340  if (pool == NULL) {
341  return EVTHR_RES_FATAL;
342  }
343 
344  TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
345  evthr_stop(thr);
346  }
347 
348  return EVTHR_RES_OK;
349 }
350 
351 static inline int
352 get_backlog_(evthr_t * thread)
353 {
354  int backlog = 0;
355 
356  ioctl(thread->rdr, FIONREAD, &backlog);
357 
358  return (int)(backlog / sizeof(evthr_cmd_t));
359 }
360 
361 evthr_res
362 evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
363 {
364 #ifdef EVTHR_SHARED_PIPE
365  evthr_cmd_t cmd = {
366  .cb = cb,
367  .args = arg,
368  .stop = 0
369  };
370 
371  if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) {
372  return EVTHR_RES_RETRY;
373  }
374 
375  return EVTHR_RES_OK;
376 #endif
377  evthr_t * thread = NULL;
378  evthr_t * min_thread = NULL;
379  int min_backlog = 0;
380 
381  if (pool == NULL) {
382  return EVTHR_RES_FATAL;
383  }
384 
385  if (cb == NULL) {
386  return EVTHR_RES_NOCB;
387  }
388 
389 
390  TAILQ_FOREACH(thread, &pool->threads, next) {
391  int backlog = get_backlog_(thread);
392 
393  if (backlog == 0) {
394  min_thread = thread;
395  break;
396  }
397 
398  if (min_thread == NULL || backlog < min_backlog) {
399  min_thread = thread;
400  min_backlog = backlog;
401  }
402  }
403 
404  return evthr_defer(min_thread, cb, arg);
405 } /* evthr_pool_defer */
406 
407 static evthr_pool_t *
408 _evthr_pool_new(int nthreads,
409  evthr_init_cb init_cb,
410  evthr_exit_cb exit_cb,
411  void * shared)
412 {
413  evthr_pool_t * pool;
414  int i;
415 
416 #ifdef EVTHR_SHARED_PIPE
417  int fds[2];
418 #endif
419 
420  if (nthreads == 0) {
421  return NULL;
422  }
423 
424  if (!(pool = calloc(sizeof(evthr_pool_t), 1))) {
425  return NULL;
426  }
427 
428  pool->nthreads = nthreads;
429  TAILQ_INIT(&pool->threads);
430 
431 #ifdef EVTHR_SHARED_PIPE
432  if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
433  return NULL;
434  }
435 
436  evutil_make_socket_nonblocking(fds[0]);
437  evutil_make_socket_nonblocking(fds[1]);
438 
439  pool->rdr = fds[0];
440  pool->wdr = fds[1];
441 #endif
442 
443  for (i = 0; i < nthreads; i++) {
444  evthr_t * thread;
445 
446  if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
447  evthr_pool_free(pool);
448  return NULL;
449  }
450 
451 #ifdef EVTHR_SHARED_PIPE
452  thread->pool_rdr = fds[0];
453 #endif
454 
455  TAILQ_INSERT_TAIL(&pool->threads, thread, next);
456  }
457 
458  return pool;
459 } /* _evthr_pool_new */
460 
461 evthr_pool_t *
462 evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared)
463 {
464  return _evthr_pool_new(nthreads, init_cb, NULL, shared);
465 }
466 
467 evthr_pool_t *
468 evthr_pool_wexit_new(int nthreads,
469  evthr_init_cb init_cb,
470  evthr_exit_cb exit_cb, void * shared)
471 {
472  return _evthr_pool_new(nthreads, init_cb, exit_cb, shared);
473 }
474 
475 int
476 evthr_pool_start(evthr_pool_t * pool)
477 {
478  evthr_t * evthr = NULL;
479 
480  if (pool == NULL) {
481  return -1;
482  }
483 
484  TAILQ_FOREACH(evthr, &pool->threads, next) {
485  if (evthr_start(evthr) < 0) {
486  return -1;
487  }
488 
489  usleep(5000);
490  }
491 
492  return 0;
493 }
evthr_set_exitcb
int evthr_set_exitcb(evthr_t *thr, evthr_exit_cb cb)
Definition: thread.c:208
evthr_pool_start
int evthr_pool_start(evthr_pool_t *pool)
Definition: thread.c:476
evthr::evbase
evbase_t * evbase
Definition: thread.c:45
evthr_pool_free
void evthr_pool_free(evthr_pool_t *pool)
Definition: thread.c:316
evthr_stop
evthr_res evthr_stop(evthr_t *thread)
Definition: thread.c:159
evthr_get_base
evbase_t * evthr_get_base(evthr_t *thr)
Definition: thread.c:176
evthr_start
int evthr_start(evthr_t *thread)
Definition: thread.c:265
evthr::init_cb
evthr_init_cb init_cb
Definition: thread.c:48
evthr_cmd::args
void * args
Definition: thread.c:25
evthr::arg
void * arg
Definition: thread.c:50
evhtp_likely
#define evhtp_likely(x)
Definition: internal.h:17
cb
evthr_cb cb
Definition: thread.c:4
_evthr_pool_new
static evthr_pool_t * _evthr_pool_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
Definition: thread.c:408
evthr_set_aux
void evthr_set_aux(evthr_t *thr, void *aux)
Definition: thread.c:182
evthr_defer
evthr_res evthr_defer(evthr_t *thread, evthr_cb cb, void *arg)
Definition: thread.c:143
TAILQ_FOREACH_SAFE
#define TAILQ_FOREACH_SAFE(var, head, field, tvar)
Definition: internal.h:22
evthr_pool::threads
evthr_pool_slist_t threads
Definition: thread.c:37
evthr_cmd::cb
evthr_cb cb
Definition: thread.c:26
evthr_pool_stop
evthr_res evthr_pool_stop(evthr_pool_t *pool)
Definition: thread.c:335
evthr
Definition: thread.c:40
evhtp_unlikely
#define evhtp_unlikely(x)
Definition: internal.h:18
evthr_pool_new
evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb init_cb, void *shared)
Definition: thread.c:462
evthr::err
char err
Definition: thread.c:43
get_backlog_
static int get_backlog_(evthr_t *thread)
Definition: thread.c:352
evthr_free
void evthr_free(evthr_t *thread)
Definition: thread.c:279
evthr::rdr
int rdr
Definition: thread.c:41
evthr_cmd
Definition: thread.c:23
__attribute__
struct evthr_pool __attribute__
_evthr_new
static evthr_t * _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
Definition: thread.c:220
TAILQ_HEAD
TAILQ_HEAD(evthr_pool_slist, evthr)
evthr_pool
Definition: thread.c:31
evthr_cmd::stop
uint8_t stop
Definition: thread.c:24
_evthr_read_cmd
static void _evthr_read_cmd(evutil_socket_t sock, short which, void *args)
Definition: thread.c:64
evthr::lock
pthread_mutex_t lock
Definition: thread.c:46
internal.h
evthr::aux
void * aux
Definition: thread.c:51
evthr_get_aux
void * evthr_get_aux(evthr_t *thr)
Definition: thread.c:190
evthr_pool_defer
evthr_res evthr_pool_defer(evthr_pool_t *pool, evthr_cb cb, void *arg)
Definition: thread.c:362
evthr_pool_wexit_new
evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
Definition: thread.c:468
evthr::exit_cb
evthr_exit_cb exit_cb
Definition: thread.c:49
evthr::thr
pthread_t * thr
Definition: thread.c:47
_evthr_loop
static void * _evthr_loop(void *args)
Definition: thread.c:92
_evthr_read
#define _evthr_read(thr, cmd, sock)
Definition: thread.c:60
args
void * args
Definition: thread.c:3
evthr_pool::nthreads
int nthreads
Definition: thread.c:36
evthr::event
ev_t * event
Definition: thread.c:44
evthr_pool_slist_t
struct evthr_pool_slist evthr_pool_slist_t
Definition: thread.c:21
evthr_new
evthr_t * evthr_new(evthr_init_cb init_cb, void *args)
Definition: thread.c:253
evthr::wdr
int wdr
Definition: thread.c:42
evthr_wexit_new
evthr_t * evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
Definition: thread.c:259
evthr_set_initcb
int evthr_set_initcb(evthr_t *thr, evthr_init_cb cb)
Definition: thread.c:196