pthread で itron ライクな sng_msg と rcv_msg (ソースコード)
pthread で itron ライクなメッセージ機能の実装の続き
http://d.hatena.ne.jp/l1o0/20100118/1263812821
メモリブロックは malloc 使用。リングバッファをメールキューとして、リングバッファのアクセスを mutex でガード。 pthread_cond_signal で送信通知
使用例は
http://d.hatena.ne.jp/l1o0/20100122/1264171647
#include <mbox.h> #define DEBUG_DUMP_RBUF /* global */ pthread_mutex_t g_mbox_mutex; Mbox g_mbox[NUM_OF_MBOX]; /* write data to ring buffer */ int write_rbuf(MsgRingBuf *rbuf, void const *data, u_int len) { int next; next = (rbuf->write_pos + 1) % (NUM_OF_MSG + 1); if (next == rbuf->read_pos) { return E_QOVR; } rbuf->msg[rbuf->write_pos].data = malloc(len); if (NULL == rbuf->msg[rbuf->write_pos].data) { return E_SYS; } memcpy(rbuf->msg[rbuf->write_pos].data, data, len); rbuf->msg[rbuf->write_pos].len = len; rbuf->write_pos = next; return E_OK; } /* read data from ring buffer */ int read_rbuf(MsgRingBuf *rbuf, void **data, u_int *len) { if (rbuf->write_pos == rbuf->read_pos) { return E_NOEXS; } *len = rbuf->msg[rbuf->read_pos].len; *data = rbuf->msg[rbuf->read_pos].data; rbuf->msg[rbuf->read_pos].data = NULL; rbuf->msg[rbuf->read_pos].len = 0; rbuf->read_pos = (rbuf->read_pos + 1) % (NUM_OF_MSG + 1); return E_OK; } /* dump ring buffer */ void dump_rbuf(MsgRingBuf *rbuf, FILE *fp) { if (NULL == fp) { fp = stdout; } fprintf(fp, "----\n"); fprintf(fp, "read_pos = %d\n", rbuf->read_pos); fprintf(fp, "write_pos = %d\n", rbuf->write_pos); #ifdef DEBUG_DUMP_RBUF { u_int i; /* print msg regarded as string */ for (i = 0; i < NELEMS(rbuf->msg); i++) { if(NULL == rbuf->msg[i].data) { fprintf(fp, "%-2d : len = %d, NULL\n", i, rbuf->msg[i].len); } else { fprintf(fp, "%-2d : len = %d, %s\n", i, rbuf->msg[i].len, (char *)(rbuf->msg[i].data)); } } fprintf(fp, "\n"); } #endif /* DEBUG_DUMP_RBUF */ } void init_mutex(pthread_mutex_t *mutex) { #ifdef __CYGWIN32__ *mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; #else pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(mutex, &attr); #endif } int init_mbox_table() { u_int i; init_mutex(&g_mbox_mutex); for (i = 0; i < NELEMS(g_mbox); i++) { g_mbox[i].use = FALSE; } return E_OK; } int create_mbox(u_int *mbox_id) { u_int i, j; int ercd; pthread_mutex_lock(&g_mbox_mutex); ercd = E_NOEXS; for (i = 0; i < NELEMS(g_mbox); i++) { if (FALSE == g_mbox[i].use) { g_mbox[i].use = TRUE; init_mutex(&(g_mbox[i].mutex)); pthread_cond_init(&(g_mbox[i].cond), NULL); g_mbox[i].rbuf.read_pos = 0; g_mbox[i].rbuf.write_pos = 0; for (j = 0; j < NELEMS(g_mbox[0].rbuf.msg); j++) { g_mbox[i].rbuf.msg[j].data = NULL; g_mbox[i].rbuf.msg[j].len = 0; } *mbox_id = i; ercd = E_OK; break; } } pthread_mutex_unlock(&g_mbox_mutex); return ercd; } int delete_mbox(u_int mbox_id) { u_int i; int ercd; if (NUM_OF_MBOX <= mbox_id) { return E_PAR; } pthread_mutex_lock(&g_mbox_mutex); if (FALSE == g_mbox[mbox_id].use) { ercd = E_PAR; } else if (pthread_mutex_destroy(&(g_mbox[mbox_id].mutex)) != 0) { ercd = E_SYS; } else if (pthread_cond_destroy(&(g_mbox[mbox_id].cond)) != 0) { ercd = E_SYS; } else { g_mbox[mbox_id].use = FALSE; g_mbox[mbox_id].rbuf.read_pos = 0; g_mbox[mbox_id].rbuf.write_pos = 0; for (i = 0; i < NELEMS(g_mbox[0].rbuf.msg); i++) { if (g_mbox[mbox_id].rbuf.msg[i].data != NULL) { free(g_mbox[mbox_id].rbuf.msg[i].data); } g_mbox[mbox_id].rbuf.msg[i].len = 0; } ercd = E_OK; } pthread_mutex_unlock(&g_mbox_mutex); return ercd; } int snd_msg(u_int mbox_id, void const *msg, u_int len) { int ercd; if (NUM_OF_MBOX <= mbox_id) { return E_PAR; } pthread_mutex_lock(&(g_mbox[mbox_id].mutex)); ercd = write_rbuf(&(g_mbox[mbox_id].rbuf), msg, len); pthread_cond_signal(&(g_mbox[mbox_id].cond)); pthread_mutex_unlock(&(g_mbox[mbox_id].mutex)); return ercd; } int prcv_msg(u_int mbox_id, void **msg, u_int *len) { int ercd; if (NUM_OF_MBOX <= mbox_id) { return E_PAR; } pthread_mutex_lock(&(g_mbox[mbox_id].mutex)); ercd = read_rbuf(&(g_mbox[mbox_id].rbuf), msg, len); pthread_mutex_unlock(&(g_mbox[mbox_id].mutex)); return ercd; } int rcv_msg(u_int mbox_id, void **msg, u_int *len) { int ercd; if (NUM_OF_MBOX <= mbox_id) { return E_PAR; } pthread_mutex_lock(&(g_mbox[mbox_id].mutex)); while(E_NOEXS == (ercd = read_rbuf(&(g_mbox[mbox_id].rbuf), msg, len))) { pthread_cond_wait(&(g_mbox[mbox_id].cond), &(g_mbox[mbox_id].mutex)); } pthread_mutex_unlock(&(g_mbox[mbox_id].mutex)); return ercd; } int add_msec(struct timespec *ts, int msec) { struct timeval tv; int sec, nsec; /* divide msec to sec and nsec */ sec = msec / 1000; nsec = (msec % 1000) * 1000 * 1000; /* add current time */ gettimeofday(&tv, NULL); sec = tv.tv_sec + sec; nsec = tv.tv_usec * 1000 + nsec; /* move up and store */ ts->tv_sec = sec + nsec / (1000 * 1000 * 1000); ts->tv_nsec = nsec % (1000 * 1000 * 1000); return E_OK; } /* milli second timeout */ int trcv_msg(u_int mbox_id, void **msg, u_int *len, int timeout) { int ercd; int ret; struct timespec ts; if (NUM_OF_MBOX <= mbox_id) { return E_PAR; } pthread_mutex_lock(&(g_mbox[mbox_id].mutex)); add_msec(&ts, timeout); while (E_NOEXS == (ercd = read_rbuf(&(g_mbox[mbox_id].rbuf), msg, len))) { ret = pthread_cond_timedwait(&(g_mbox[mbox_id].cond), &(g_mbox[mbox_id].mutex), &ts); if (ETIMEDOUT == ret) { ercd = E_TMOUT; break; } else if (EINTR == ret) { /* continue (check ring buffer) */ } else if (0 != ret) { ercd = E_SYS; break; } else { /* continue (check ring buffer) */ } } pthread_mutex_unlock(&(g_mbox[mbox_id].mutex)); return ercd; }