当前位置 博文首页 > Linux C线程池简单实现实例

    Linux C线程池简单实现实例

    作者:admin 时间:2021-02-20 12:41

    Linux C线程池

    三个文件 

    1 tpool.h

    typedef struct tpool_work { 
      void        (*routine)(void *); 
      void        *arg; 
      struct tpool_work  *next; 
    } tpool_work_t; 
     
    typedef struct tpool { 
      /* pool characteristics */ 
      int         num_threads; 
      int         max_queue_size; 
      /* pool state */ 
      pthread_t      *tpid; 
      tpool_work_t    *queue; 
      int         front, rear; 
      /* 剩下的任务可以做完, 但不能再加新的任务 */ 
      int         queue_closed;   
      /* 剩下的任务都不做了, 直接关闭 */ 
      int         shutdown;     
      /* pool synchronization */ 
      pthread_mutex_t   queue_lock; 
      pthread_cond_t   queue_has_task; 
      pthread_cond_t   queue_has_space; 
      pthread_cond_t   queue_empty; 
    } *tpool_t; 
     
    void tpool_init(tpool_t *tpoolp,int num_threads, int max_queue_size); 
     
    int tpool_add_work(tpool_t tpool,void(*routine)(void *), void *arg); 
     
    int tpool_destroy(tpool_t tpool,int finish); 
    

     2 tpool.c

    #include <stdlib.h> 
    #include <stdio.h> 
    #include <unistd.h> 
    #include <sys/types.h> 
    #include <string.h> 
    #include <pthread.h> 
    #include "tpool.h" 
     
    #define DEBUG 
     
    #if defined(DEBUG) 
    #define debug(...) do { \ 
      flockfile(stdout); \ 
      printf("###%p.%s: ", (void *)pthread_self(), __func__); \ 
      printf(__VA_ARGS__); \ 
      putchar('\n'); \ 
      fflush(stdout); \ 
      funlockfile(stdout); \ 
    } while (0) 
    #else 
    #define debug(...) 
    #endif 
     
    void *tpool_thread(void *); 
     
    void tpool_init(tpool_t *tpoolp, int num_worker_threads, int max_queue_size) 
    { 
      int i; 
      tpool_t pool; 
     
      pool = (tpool_t)malloc(sizeof(struct tpool)); 
      if (pool == NULL) { 
        perror("malloc"); 
        exit(0); 
      } 
     
      pool->num_threads = 0; 
      pool->max_queue_size = max_queue_size + 1; 
      pool->num_threads = num_worker_threads; 
      pool->tpid = NULL; 
      pool->front = 0; 
      pool->rear = 0; 
      pool->queue_closed = 0; 
      pool->shutdown = 0; 
     
      if (pthread_mutex_init(&pool->queue_lock, NULL) == -1) { 
        perror("pthread_mutex_init"); 
        free(pool); 
        exit(0); 
      } 
      if (pthread_cond_init(&pool->queue_has_space, NULL) == -1) { 
        perror("pthread_mutex_init"); 
        free(pool); 
        exit(0); 
      } 
      if (pthread_cond_init(&pool->queue_has_task, NULL) == -1) { 
        perror("pthread_mutex_init"); 
        free(pool); 
        exit(0); 
      } 
      if (pthread_cond_init(&pool->queue_empty, NULL) == -1) { 
        perror("pthread_mutex_init"); 
        free(pool); 
        exit(0); 
      } 
     
      if ((pool->queue = malloc(sizeof(struct tpool_work) *  
              pool->max_queue_size)) == NULL) { 
        perror("malloc"); 
        free(pool); 
        exit(0); 
      } 
     
      if ((pool->tpid = malloc(sizeof(pthread_t) * num_worker_threads)) == NULL) { 
        perror("malloc"); 
        free(pool); 
        free(pool->queue); 
        exit(0); 
      } 
     
      for (i = 0; i < num_worker_threads; i++) { 
        if (pthread_create(&pool->tpid[i], NULL, tpool_thread,  
              (void *)pool) != 0) { 
          perror("pthread_create"); 
          exit(0); 
        } 
      } 
     
      *tpoolp = pool; 
    } 
     
     
    int empty(tpool_t pool) 
    { 
      return pool->front == pool->rear; 
    } 
     
    int full(tpool_t pool) 
    { 
      return ((pool->rear + 1) % pool->max_queue_size == pool->front); 
    } 
     
    int size(tpool_t pool) 
    { 
      return (pool->rear + pool->max_queue_size - 
            pool->front) % pool->max_queue_size; 
    } 
     
    int tpool_add_work(tpool_t tpool, void(*routine)(void *), void *arg) 
    { 
      tpool_work_t *temp; 
     
      pthread_mutex_lock(&tpool->queue_lock); 
     
      while (full(tpool) && !tpool->shutdown && !tpool->queue_closed) { 
        pthread_cond_wait(&tpool->queue_has_space, &tpool->queue_lock); 
      } 
     
      if (tpool->shutdown || tpool->queue_closed) { 
        pthread_mutex_unlock(&tpool->queue_lock); 
        return -1; 
      } 
     
      int is_empty = empty(tpool); 
     
      temp = tpool->queue + tpool->rear; 
      temp->routine = routine; 
      temp->arg = arg; 
      tpool->rear = (tpool->rear + 1) % tpool->max_queue_size; 
     
      if (is_empty) { 
        debug("signal has task"); 
        pthread_cond_broadcast(&tpool->queue_has_task); 
      } 
     
      pthread_mutex_unlock(&tpool->queue_lock);   
     
      return 0; 
    } 
     
    void *tpool_thread(void *arg) 
    { 
      tpool_t pool = (tpool_t)(arg); 
      tpool_work_t *work; 
     
      for (;;) { 
        pthread_mutex_lock(&pool->queue_lock); 
     
        while (empty(pool) && !pool->shutdown) { 
          debug("I'm sleep"); 
          pthread_cond_wait(&pool->queue_has_task, &pool->queue_lock); 
        } 
        debug("I'm awake"); 
     
        if (pool->shutdown == 1) { 
          debug("exit"); 
          pthread_mutex_unlock(&pool->queue_lock); 
          pthread_exit(NULL); 
        } 
     
        int is_full = full(pool); 
        work = pool->queue + pool->front; 
        pool->front = (pool->front + 1) % pool->max_queue_size; 
     
        if (is_full) { 
          pthread_cond_broadcast(&pool->queue_has_space); 
        } 
     
        if (empty(pool)) { 
          pthread_cond_signal(&pool->queue_empty); 
        } 
     
        pthread_mutex_unlock(&pool->queue_lock);   
     
        (*(work->routine))(work->arg); 
      } 
    } 
     
    int tpool_destroy(tpool_t tpool, int finish) 
    { 
      int   i; 
     
      pthread_mutex_lock(&tpool->queue_lock); 
     
      tpool->queue_closed = 1; 
     
      if (finish == 1) { 
        debug("wait all work done"); 
        while (!empty(tpool)) { 
          pthread_cond_wait(&tpool->queue_empty, &tpool->queue_lock); 
        } 
      } 
      tpool->shutdown = 1; 
     
      pthread_mutex_unlock(&tpool->queue_lock); 
     
      pthread_cond_broadcast(&tpool->queue_has_task); 
     
      debug("wait worker thread exit"); 
      for (i = 0; i < tpool->num_threads; i++) { 
        pthread_join(tpool->tpid[i], NULL); 
      } 
     
      debug("free thread pool"); 
      free(tpool->tpid); 
      free(tpool->queue); 
      free(tpool); 
    } 
     
    

    3 tpooltest.c

    #include <stdio.h> 
    #include <pthread.h> 
    #include "tpool.h" 
     
    char *str[]={"string 0", "string 1", "string 2",  
            "string 3", "string 4", "string 5"}; 
     
    void job(void * jobstr) 
    { 
      long i, x; 
     
      for (i = 0; i < 100000000; i++) { 
        x = x +i; 
      } 
      printf("%s\n", (char *)jobstr); 
    } 
     
    int main(void) 
    { 
      int i;  
      tpool_t test_pool; 
     
      tpool_init(&test_pool, 8, 20); 
     
      for ( i = 0; i < 5; i++) { 
        tpool_add_work(test_pool, job, str[i]); 
      } 
     
      tpool_destroy(test_pool, 1); 
     
      return 0; 
    } 
    

    感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

    js
    下一篇:没有了