pthread で itron ライクな sng_msg と rcv_msg (ソースコード)

pthread で itron ライクなメッセージ機能の実装の続き
http://d.hatena.ne.jp/l1o0/20100118/1263812821

メモリブロックは malloc 使用。リングバッファをメールキューとして、リングバッファのアクセスを mutex でガード。 pthread_cond_signal で送信通知

使用例は
http://d.hatena.ne.jp/l1o0/20100122/1264171647

#include <mbox.h>

#define DEBUG_DUMP_RBUF

/* global */
pthread_mutex_t g_mbox_mutex;
Mbox            g_mbox[NUM_OF_MBOX];

/* write data to ring buffer */
int write_rbuf(MsgRingBuf *rbuf, void const *data, u_int len)
{
    int next;

    next = (rbuf->write_pos + 1) % (NUM_OF_MSG + 1);
    if (next == rbuf->read_pos) {
        return E_QOVR;
    }

    rbuf->msg[rbuf->write_pos].data = malloc(len);
    if (NULL == rbuf->msg[rbuf->write_pos].data) {
        return E_SYS;
    }
    memcpy(rbuf->msg[rbuf->write_pos].data, data, len);
    rbuf->msg[rbuf->write_pos].len = len;

    rbuf->write_pos = next;

    return E_OK;
}

/* read data from ring buffer */
int read_rbuf(MsgRingBuf *rbuf, void **data, u_int *len)
{
    if (rbuf->write_pos == rbuf->read_pos) {
        return E_NOEXS;
    }

    *len  = rbuf->msg[rbuf->read_pos].len;
    *data = rbuf->msg[rbuf->read_pos].data;

    rbuf->msg[rbuf->read_pos].data = NULL;
    rbuf->msg[rbuf->read_pos].len  = 0;

    rbuf->read_pos = (rbuf->read_pos + 1) % (NUM_OF_MSG + 1);

    return E_OK;
}

/* dump ring buffer */
void dump_rbuf(MsgRingBuf *rbuf, FILE *fp)
{
    if (NULL == fp) {
        fp = stdout;
    }

    fprintf(fp, "----\n");
    fprintf(fp, "read_pos  = %d\n", rbuf->read_pos);
    fprintf(fp, "write_pos = %d\n", rbuf->write_pos);

#ifdef DEBUG_DUMP_RBUF
    {
        u_int i;
        /* print msg regarded as string */
        for (i = 0; i < NELEMS(rbuf->msg); i++) {
            if(NULL == rbuf->msg[i].data) {
                fprintf(fp, "%-2d : len = %d, NULL\n", i, rbuf->msg[i].len);
            }
            else {
                fprintf(fp, "%-2d : len = %d, %s\n",
                        i, rbuf->msg[i].len, (char *)(rbuf->msg[i].data));
            }
        }
        fprintf(fp, "\n");
    }
#endif /* DEBUG_DUMP_RBUF */
}

void init_mutex(pthread_mutex_t *mutex)
{
#ifdef __CYGWIN32__
    *mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
#else
    pthread_mutexattr_t attr;

    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutex_init(mutex, &attr);
#endif
}

int init_mbox_table()
{
    u_int i;

    init_mutex(&g_mbox_mutex);
    for (i = 0; i < NELEMS(g_mbox); i++) {
        g_mbox[i].use = FALSE;
    }

    return E_OK;
}

int create_mbox(u_int *mbox_id)
{
    u_int i, j;
    int   ercd;

    pthread_mutex_lock(&g_mbox_mutex);

    ercd = E_NOEXS;
    for (i = 0; i < NELEMS(g_mbox); i++) {
        if (FALSE == g_mbox[i].use) {
            g_mbox[i].use = TRUE;

            init_mutex(&(g_mbox[i].mutex));
            pthread_cond_init(&(g_mbox[i].cond), NULL);

            g_mbox[i].rbuf.read_pos  = 0;
            g_mbox[i].rbuf.write_pos = 0;
            for (j = 0; j < NELEMS(g_mbox[0].rbuf.msg); j++) {
                g_mbox[i].rbuf.msg[j].data = NULL;
                g_mbox[i].rbuf.msg[j].len  = 0;
            }
            *mbox_id = i;
            ercd     = E_OK;
            break;
        }
    }

    pthread_mutex_unlock(&g_mbox_mutex);
    return ercd;
}

int delete_mbox(u_int mbox_id)
{
    u_int i;
    int ercd;

    if (NUM_OF_MBOX <= mbox_id) {
        return E_PAR;
    }

    pthread_mutex_lock(&g_mbox_mutex);

    if (FALSE == g_mbox[mbox_id].use) {
        ercd = E_PAR;
    }
    else if (pthread_mutex_destroy(&(g_mbox[mbox_id].mutex)) != 0) {
        ercd = E_SYS;
    }
    else if (pthread_cond_destroy(&(g_mbox[mbox_id].cond)) != 0) {
        ercd = E_SYS;
    }
    else {
        g_mbox[mbox_id].use            = FALSE;
        g_mbox[mbox_id].rbuf.read_pos  = 0;
        g_mbox[mbox_id].rbuf.write_pos = 0;
        for (i = 0; i < NELEMS(g_mbox[0].rbuf.msg); i++) {
            if (g_mbox[mbox_id].rbuf.msg[i].data != NULL) {
                free(g_mbox[mbox_id].rbuf.msg[i].data);
            }
            g_mbox[mbox_id].rbuf.msg[i].len  = 0;
        }
        ercd = E_OK;
    }

    pthread_mutex_unlock(&g_mbox_mutex);
    return ercd;
}

int snd_msg(u_int mbox_id, void const *msg, u_int len)
{
    int ercd;

    if (NUM_OF_MBOX <= mbox_id) {
        return E_PAR;
    }

    pthread_mutex_lock(&(g_mbox[mbox_id].mutex));
    ercd = write_rbuf(&(g_mbox[mbox_id].rbuf), msg, len);
    pthread_cond_signal(&(g_mbox[mbox_id].cond));
    pthread_mutex_unlock(&(g_mbox[mbox_id].mutex));

    return ercd;
}

int prcv_msg(u_int mbox_id, void **msg, u_int *len)
{
    int ercd;

    if (NUM_OF_MBOX <= mbox_id) {
        return E_PAR;
    }

    pthread_mutex_lock(&(g_mbox[mbox_id].mutex));
    ercd = read_rbuf(&(g_mbox[mbox_id].rbuf), msg, len);
    pthread_mutex_unlock(&(g_mbox[mbox_id].mutex));

    return ercd;
}

int rcv_msg(u_int mbox_id, void **msg, u_int *len)
{
    int ercd;

    if (NUM_OF_MBOX <= mbox_id) {
        return E_PAR;
    }

    pthread_mutex_lock(&(g_mbox[mbox_id].mutex));
    while(E_NOEXS == (ercd = read_rbuf(&(g_mbox[mbox_id].rbuf), msg, len))) {
        pthread_cond_wait(&(g_mbox[mbox_id].cond), &(g_mbox[mbox_id].mutex));
    }
    pthread_mutex_unlock(&(g_mbox[mbox_id].mutex));

    return ercd;
}

int add_msec(struct timespec *ts, int msec)
{
    struct timeval tv;
    int            sec, nsec;

    /* divide msec to sec and nsec */
    sec  = msec / 1000;
    nsec = (msec % 1000) * 1000 * 1000;

    /* add current time */
    gettimeofday(&tv, NULL);
    sec  = tv.tv_sec + sec;
    nsec = tv.tv_usec * 1000 + nsec;

    /* move up and store */
    ts->tv_sec  = sec + nsec / (1000 * 1000 * 1000);
    ts->tv_nsec = nsec % (1000 * 1000 * 1000);

    return E_OK;
}

/* milli second timeout */
int trcv_msg(u_int mbox_id, void **msg, u_int *len, int timeout)
{
    int             ercd;
    int             ret;
    struct timespec ts;

    if (NUM_OF_MBOX <= mbox_id) {
        return E_PAR;
    }

    pthread_mutex_lock(&(g_mbox[mbox_id].mutex));

    add_msec(&ts, timeout);
    while (E_NOEXS == (ercd = read_rbuf(&(g_mbox[mbox_id].rbuf), msg, len))) {
        ret = pthread_cond_timedwait(&(g_mbox[mbox_id].cond),
                                     &(g_mbox[mbox_id].mutex), &ts);
        if (ETIMEDOUT == ret) {
            ercd = E_TMOUT;
            break;
        }
        else if (EINTR == ret) {
            /* continue (check ring buffer) */
        }
        else if (0 != ret) {
            ercd = E_SYS;
            break;
        }
        else {
            /* continue (check ring buffer) */
        }
    }

    pthread_mutex_unlock(&(g_mbox[mbox_id].mutex));
    return ercd;
}