pthread で タイマスレッドの実装
pthread で 指定したタイムアウト時間にコールバック関数を実行するタイマスレッドを実装。 nanosleep で 10ms 毎に起床しタイマアウトをチェックする。
起床タイミングは nanosleep の性能 によるので、古い Linux とかだと正確に 10ms にならない。 10 ms ごとに起床するので、タイマアウト 20ms としても、実際は 20 から 30 ms のタイマ満了になる。タイムアウト 10 ms を指定すると、0ms でタイムアウトしてしまう可能性があるので、10 ms 以下のタイマ値ははじく。
コールバック関数が非常に時間がかる場合も問題になる。主に コールバック関数として snd_msg でスレッドにメッセージを送って、 rcv_msg でタイマアウトを知ることに使う。
pthread で itron ライクな sng_msg と rcv_msg (ソースコード) - l1o0の日記
(POSIX の timer_create のほうが性能がいいのかなぁ)
#if 0 gcc -g -Wall $0 -o a.out -lpthread && ./a.out $1; exit; #endif #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/time.h> #include <time.h> #include <pthread.h> #include <errno.h> #define TRUE 1 #define FALSE 0 #define E_OK 0 /* success */ #define E_NOEXS (-2) /* no objcet */ #define E_SYS (-6) /* system error */ #define E_PAR (-7) /* parameter error */ #define NUM_OF_TIMER 3 #define TIMER_INTERVAL 10 /* msec */ /* macro */ #define NELEMS(array) (sizeof(array) / sizeof(array[0])) typedef struct TimerId { u_int index; u_int sequence_number; } TimerId; typedef void (*timer_callback)(void *, int); typedef struct TimerInfo { int use; int timeout; u_int sequence_number; timer_callback callback; void *data; u_int len; } TimerInfo; typedef struct TimerTable { pthread_mutex_t mutex; u_int sequence_number; TimerInfo timer_info[NUM_OF_TIMER]; } TimerTable; int do_timer_callback(TimerInfo *timer_info, u_int index); int calc_elapsed(struct timeval *tv_before, struct timeval *tv_after); int init_timer(pthread_t *thread); int create_timer(TimerId *timer_id); int start_timer(TimerId *timer_id, int timeout, timer_callback callback, void *data, u_int len); int stop_timer_no_mutex(TimerId *timer_id); int stop_timer(TimerId *timer_id); int dump_timer(FILE *fp); char *test_print_msec(char *buf); void test_callback(void *data, int len); /* global */ TimerTable g_timer_table; int do_timer_callback(TimerInfo *timer_info, u_int index) { TimerId timer_id; int ercd; timer_info->callback(timer_info->data, timer_info->len); timer_id.index = index; timer_id.sequence_number = timer_info->sequence_number; ercd = stop_timer_no_mutex(&timer_id); return ercd; } /* milli sec */ int calc_elapsed(struct timeval *tv_before, struct timeval *tv_after) { int sec; int usec; sec = tv_after->tv_sec - tv_before->tv_sec; usec = tv_after->tv_usec - tv_before->tv_usec; return (sec * 1000) + (usec / 1000); } void *timer_thread(void *arg) { struct timespec ts; struct timespec rem; struct timeval tv_before; struct timeval tv_after; int i; TimerInfo *timer_info; int elapsed; /* milli sec */ /* char buf[256]; */ elapsed = 0; ts.tv_sec = 0; ts.tv_nsec = TIMER_INTERVAL * 1000 * 1000; while(1) { while (EINTR == nanosleep(&ts, &rem)) { ts.tv_sec = rem.tv_sec; ts.tv_nsec = rem.tv_nsec; } gettimeofday(&tv_before, NULL); pthread_mutex_lock(&(g_timer_table.mutex)); for (i = 0; i < NELEMS(g_timer_table.timer_info); i++) { timer_info = &(g_timer_table.timer_info[i]); if (TRUE == timer_info->use) { timer_info->timeout -= TIMER_INTERVAL - elapsed; if (timer_info->timeout <= 0) { do_timer_callback(timer_info, i); } } } pthread_mutex_unlock(&(g_timer_table.mutex)); gettimeofday(&tv_after, NULL); elapsed = calc_elapsed(&tv_before, &tv_after); if (elapsed > TIMER_INTERVAL) { fprintf(stderr, "!!! timer handler elapsed %d over %d !!!\n", elapsed, TIMER_INTERVAL); elapsed = TIMER_INTERVAL; } ts.tv_sec = 0; ts.tv_nsec = (TIMER_INTERVAL - elapsed) * 1000 * 1000; /* printf("timer tick end %s elapsed = %d\n", */ /* test_print_msec(buf), elapsed); */ } } int init_timer(pthread_t *thread) { int i; pthread_attr_t attr; #ifdef __CYGWIN32__ g_timer_table.mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; #else pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&(g_timer_table.mutex), &mutex_attr); #endif g_timer_table.sequence_number = 0; for (i = 0; i < NELEMS(g_timer_table.timer_info); i++) { g_timer_table.timer_info[i].use = FALSE; g_timer_table.timer_info[i].sequence_number = 0; g_timer_table.timer_info[i].callback = NULL; g_timer_table.timer_info[i].data = NULL; g_timer_table.timer_info[i].len = 0; } pthread_attr_init(&attr); pthread_create(thread, &attr, &timer_thread, NULL); return E_OK; } int create_timer(TimerId *timer_id) { int i; int ercd; ercd = E_NOEXS; for (i = 0; i < NELEMS(g_timer_table.timer_info); i++) { if (FALSE == g_timer_table.timer_info[i].use) { timer_id->index = i; timer_id->sequence_number = g_timer_table.sequence_number; g_timer_table.timer_info[i].use = TRUE; g_timer_table.timer_info[i].sequence_number = g_timer_table.sequence_number; g_timer_table.sequence_number++; ercd = E_OK; break; } } return ercd; } int start_timer(TimerId *timer_id, int timeout, timer_callback callback, void *data, u_int len) { int ercd; TimerInfo *timer_info; if (timeout <= TIMER_INTERVAL) { fprintf(stderr, "timeout too small : %d %d\n", timeout, TIMER_INTERVAL); return E_PAR; } pthread_mutex_lock(&(g_timer_table.mutex)); ercd = create_timer(timer_id); if (E_OK == ercd) { timer_info = &(g_timer_table.timer_info[timer_id->index]); timer_info->callback = callback; timer_info->len = len; timer_info->timeout = timeout; if ((len > 0) && (NULL != data)) { timer_info->data = malloc(len); if (NULL == timer_info->data) { stop_timer_no_mutex(timer_id); ercd = E_SYS; } else { memcpy(timer_info->data, data, len); ercd = E_OK; } } } pthread_mutex_unlock(&(g_timer_table.mutex)); return ercd; } int stop_timer_no_mutex(TimerId *timer_id) { int ercd; TimerInfo *timer_info; if (NUM_OF_TIMER <= timer_id->index) { return E_PAR; } timer_info = &(g_timer_table.timer_info[timer_id->index]); if (FALSE == timer_info->use) { ercd = E_NOEXS; } else if (timer_id->sequence_number != timer_info->sequence_number) { ercd = E_NOEXS; } else { if ((timer_info->len > 0) && (NULL != timer_info->data)) { free(timer_info->data); timer_info->data = NULL; } timer_info->use = FALSE; timer_info->timeout = 0; timer_info->callback = NULL; timer_info->len = 0; ercd= E_OK; } return ercd; } int stop_timer(TimerId *timer_id) { int ercd; pthread_mutex_lock(&(g_timer_table.mutex)); ercd = stop_timer_no_mutex(timer_id); pthread_mutex_unlock(&(g_timer_table.mutex)); return ercd; } int dump_timer(FILE *fp) { int i; TimerInfo *timer_info; if (NULL == fp) { fp = stdout; } fprintf(fp, "sequence_number = %d\n", g_timer_table.sequence_number); for (i = 0; i < NELEMS(g_timer_table.timer_info); i++) { fprintf(fp, "----\n"); timer_info = &(g_timer_table.timer_info[i]); fprintf(fp, "use = %d\n", timer_info->use); fprintf(fp, "timeout = %d\n", timer_info->timeout); fprintf(fp, "sequence_number = %d\n", timer_info->sequence_number); fprintf(fp, "callback = %p\n", timer_info->callback); fprintf(fp, "data = %p\n", timer_info->data); fprintf(fp, "len = %d\n", timer_info->len); } return E_OK; } char *test_print_msec(char *buf) { struct timeval tv; struct tm tm; gettimeofday(&tv, NULL); localtime_r(&(tv.tv_sec), &tm); sprintf(buf, "%04d/%02d/%02d %02d:%02d:%02d.%03ld", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, tv.tv_usec / 1000); return buf; } void test_callback(void *data, int len) { char buf[256]; test_print_msec(buf); printf("\nexpired = %s, data = %d, len = %d\n", buf, *(int *)data, len); dump_timer(NULL); } int main(int argc, char *argv[]) { TimerId timer_id[NUM_OF_TIMER + 1]; int data; int ret; pthread_t thread; char buf[256]; int i; printf("#### init_timer\n"); init_timer(&thread); dump_timer(NULL); printf("#### start timer (NUM_OF_TIMER + 1)\n"); for (i = 0; i < NUM_OF_TIMER + 1; i++) { data = i + 100; test_print_msec(buf); ret = start_timer(&(timer_id[i]), 100, test_callback, (void * )&data, sizeof(int)); printf("start = %s, ret = %d\n", buf, ret); } dump_timer(NULL); printf("#### wait to expire\n"); sleep(2); printf("##### start timer (NUM_OF_TIMER)\n"); for (i = 0; i < NELEMS(g_timer_table.timer_info); i++) { data = i + 200; test_print_msec(buf); ret = start_timer(&(timer_id[i]), 100, test_callback, (void * )&data, sizeof(int)); printf("start = %s, ret = %d\n", buf, ret); } dump_timer(NULL); printf("#### stop timer\n"); ret = stop_timer(&(timer_id[1])); printf("stop_timer ret = %d\n", ret); dump_timer(NULL); printf("#### wait to expire\n"); sleep(2); dump_timer(NULL); data = 123; ret = start_timer(&(timer_id[0]), 10, test_callback, (void * )&data, sizeof(int)); printf("start = %s, ret = %d\n", buf, ret); return 0; }