当前位置:网站首页>utlis thread pool
utlis thread pool
2022-08-03 22:26:00 【Zip-List】
utlis 线程池
The thread pool structure must be mastered
万变不离其宗,Still the familiar producer-consumer pattern,Still the familiar mutex+Condition variables implement semaphore semantics,Notifies the worker thread to fetch data from the task queue,still familiarflagThe flag bit controls the worker thread to exit.非常熟悉!
struct NWORKER *workers; //执行队列
struct NJOB *waiting_jobs;//任务队列
pthread_mutex_t jobs_mtx; //互斥锁
pthread_cond_t jobs_cond; //条件变量
The interaction between the worker thread and the main thread is the task queue,
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//Why use macros,宏==模板了,Different types can be manipulated
//头插
#define LL_ADD(item, list) do {
\ item->prev = NULL; \ item->next = list; \ list = item; \ } while(0)
//删除某个节点
#define LL_REMOVE(item, list) do {
\ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ } while(0)
//执行
typedef struct NWORKER {
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue; //回指指针,指向所有workerShared task queue
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
//任务
typedef struct NJOB {
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
//Pool component management
typedef struct NWORKQUEUE {
struct NWORKER *workers; //执行队列
struct NJOB *waiting_jobs;//任务队列
pthread_mutex_t jobs_mtx; //互斥锁
pthread_cond_t jobs_cond; //条件变量
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr) {
nWorker *worker = (nWorker*)ptr;
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++) {
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL) {
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/************************** debug thread pool **************************/
//sdk --> software develop kit
// 提供SDK给其他开发者使用
#if 1
#define KING_MAX_THREAD 80
#define KING_COUNTER_SIZE 1000
void king_counter(nJob *job) {
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lu\n", index, pthread_self());
free(job->user_data);
free(job);
}
int main(int argc, char *argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, KING_MAX_THREAD);
int i = 0;
for (i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = king_counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
ntyThreadPoolQueue(&pool, job);
}
getchar();
printf("\n");
}
#endif
边栏推荐
猜你喜欢

pikachu Over permission

IO thread process -> thread synchronization mutual exclusion mechanism -> day6

Go开发工具GoLand V2022.2 来了——Go 工作区重大升级

.NET6之MiniAPI(十四):跨域CORS(上)

node连接mysql数据库报错:Client does not support authentication protocol requested by server

HCIP第十四天

LabVIEW代码生成错误 61056

深度学习和机器学习有什么区别?

Network basic learning series four (network layer, data link layer and some other important protocols or technologies)

获国际权威认可 | 云扩科技入选《RPA全球市场格局报告,Q3 2022》
随机推荐
Canvas App中点击图标生成PDF并保存到Dataverse中
Recognized by International Authorities | Yunzhuang Technology was selected in "RPA Global Market Pattern Report, Q3 2022"
PowerMockup 4.3.4::::Crack
win10系统下yolov5-V6.1版本的tensorrt部署细节教程及bug修改
Flutter 桌面探索 | 自定义可拖拽导航栏
October 2019 Twice SQL Injection
老板:公司系统太多,能不能实现账号互通?
Data_web(八)mysql增量同步到mongodb
趣链的产品构架
Teach a Man How to Fish - How to Query the Properties of Any SAP UI5 Control by Yourself Documentation and Technical Implementation Details Demo
for循环练习题
[N1CTF 2018] eating_cms
一个函数有多少种调用方式?
Testng监听器
CAS:908007-17-0_Biotin-azide_Biotin azide
What is Adobe?
[b01lers2020]Life on Mars
CAS:122567-66-2_DSPE-Biotin_DSPE-Biotin
DO280管理和监控OpenShift平台--资源限制
HCIP第十四天