You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
434 lines
9.1 KiB
434 lines
9.1 KiB
/* |
|
* Copyright (c) 2018 Intel Corporation |
|
* |
|
* SPDX-License-Identifier: Apache-2.0 |
|
*/ |
|
#include <kernel.h> |
|
#include <errno.h> |
|
#include <string.h> |
|
#include <sys/atomic.h> |
|
#include <posix/time.h> |
|
#include <posix/mqueue.h> |
|
|
|
typedef struct mqueue_object { |
|
sys_snode_t snode; |
|
char *mem_buffer; |
|
char *mem_obj; |
|
struct k_msgq queue; |
|
atomic_t ref_count; |
|
char *name; |
|
} mqueue_object; |
|
|
|
typedef struct mqueue_desc { |
|
char *mem_desc; |
|
mqueue_object *mqueue; |
|
u32_t flags; |
|
} mqueue_desc; |
|
|
|
K_SEM_DEFINE(mq_sem, 1, 1); |
|
|
|
/* Initialize the list */ |
|
sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list); |
|
|
|
s64_t timespec_to_timeoutms(const struct timespec *abstime); |
|
static mqueue_object *find_in_list(const char *name); |
|
static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, |
|
s32_t timeout); |
|
static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, |
|
s32_t timeout); |
|
static void remove_mq(mqueue_object *msg_queue); |
|
|
|
/** |
|
* @brief Open a message queue. |
|
* |
|
* Number of message queue and descriptor to message queue are limited by |
|
* heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
mqd_t mq_open(const char *name, int oflags, ...) |
|
{ |
|
va_list va; |
|
mode_t mode; |
|
mq_attr *attrs = NULL; |
|
long msg_size = 0U, max_msgs = 0U; |
|
mqueue_object *msg_queue; |
|
mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1); |
|
char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr; |
|
|
|
va_start(va, oflags); |
|
if ((oflags & O_CREAT) != 0) { |
|
mode = va_arg(va, mode_t); |
|
attrs = va_arg(va, mq_attr*); |
|
} |
|
va_end(va); |
|
|
|
if (attrs != NULL) { |
|
msg_size = attrs->mq_msgsize; |
|
max_msgs = attrs->mq_maxmsg; |
|
} |
|
|
|
if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 || |
|
max_msgs <= 0))) { |
|
errno = EINVAL; |
|
return (mqd_t)mqd; |
|
} |
|
|
|
if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) { |
|
errno = ENAMETOOLONG; |
|
return (mqd_t)mqd; |
|
} |
|
|
|
/* Check if queue already exists */ |
|
k_sem_take(&mq_sem, K_FOREVER); |
|
msg_queue = find_in_list(name); |
|
k_sem_give(&mq_sem); |
|
|
|
if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 && |
|
(oflags & O_EXCL) != 0) { |
|
/* Message queue has alreadey been opened and O_EXCL is set */ |
|
errno = EEXIST; |
|
return (mqd_t)mqd; |
|
} |
|
|
|
if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) { |
|
errno = ENOENT; |
|
return (mqd_t)mqd; |
|
} |
|
|
|
mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc)); |
|
if (mq_desc_ptr != NULL) { |
|
(void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc)); |
|
msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr; |
|
msg_queue_desc->mem_desc = mq_desc_ptr; |
|
} else { |
|
goto free_mq_desc; |
|
} |
|
|
|
|
|
/* Allocate mqueue object for new message queue */ |
|
if (msg_queue == NULL) { |
|
|
|
/* Check for message quantity and size in message queue */ |
|
if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX && |
|
attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) { |
|
goto free_mq_desc; |
|
} |
|
|
|
mq_obj_ptr = k_malloc(sizeof(mqueue_object)); |
|
if (mq_obj_ptr != NULL) { |
|
(void)memset(mq_obj_ptr, 0, sizeof(mqueue_object)); |
|
msg_queue = (mqueue_object *)mq_obj_ptr; |
|
msg_queue->mem_obj = mq_obj_ptr; |
|
|
|
} else { |
|
goto free_mq_object; |
|
} |
|
|
|
mq_name_ptr = k_malloc(strlen(name) + 1); |
|
if (mq_name_ptr != NULL) { |
|
(void)memset(mq_name_ptr, 0, strlen(name) + 1); |
|
msg_queue->name = mq_name_ptr; |
|
|
|
} else { |
|
goto free_mq_name; |
|
} |
|
|
|
strcpy(msg_queue->name, name); |
|
|
|
mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(u8_t)); |
|
if (mq_buf_ptr != NULL) { |
|
(void)memset(mq_buf_ptr, 0, |
|
msg_size * max_msgs * sizeof(u8_t)); |
|
msg_queue->mem_buffer = mq_buf_ptr; |
|
} else { |
|
goto free_mq_buffer; |
|
} |
|
|
|
(void)atomic_set(&msg_queue->ref_count, 1); |
|
/* initialize zephyr message queue */ |
|
k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size, |
|
max_msgs); |
|
k_sem_take(&mq_sem, K_FOREVER); |
|
sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode)); |
|
k_sem_give(&mq_sem); |
|
|
|
} else { |
|
atomic_inc(&msg_queue->ref_count); |
|
} |
|
|
|
msg_queue_desc->mqueue = msg_queue; |
|
msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0; |
|
return (mqd_t)msg_queue_desc; |
|
|
|
free_mq_buffer: |
|
k_free(mq_name_ptr); |
|
free_mq_name: |
|
k_free(mq_obj_ptr); |
|
free_mq_object: |
|
k_free(mq_desc_ptr); |
|
free_mq_desc: |
|
errno = ENOSPC; |
|
return (mqd_t)mqd; |
|
} |
|
|
|
/** |
|
* @brief Close a message queue descriptor. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_close(mqd_t mqdes) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
|
|
if (mqd == NULL) { |
|
errno = EBADF; |
|
return -1; |
|
} |
|
|
|
atomic_dec(&mqd->mqueue->ref_count); |
|
|
|
/* remove mq if marked for unlink */ |
|
if (mqd->mqueue->name == NULL) { |
|
remove_mq(mqd->mqueue); |
|
} |
|
|
|
k_free(mqd->mem_desc); |
|
return 0; |
|
} |
|
|
|
/** |
|
* @brief Remove a message queue. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_unlink(const char *name) |
|
{ |
|
mqueue_object *msg_queue; |
|
|
|
k_sem_take(&mq_sem, K_FOREVER); |
|
msg_queue = find_in_list(name); |
|
|
|
if (msg_queue == NULL) { |
|
k_sem_give(&mq_sem); |
|
errno = EBADF; |
|
return -1; |
|
} |
|
|
|
k_free(msg_queue->name); |
|
msg_queue->name = NULL; |
|
k_sem_give(&mq_sem); |
|
remove_mq(msg_queue); |
|
return 0; |
|
} |
|
|
|
/** |
|
* @brief Send a message to a message queue. |
|
* |
|
* All messages in message queue are of equal priority. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, |
|
unsigned int msg_prio) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
s32_t timeout = K_FOREVER; |
|
|
|
return send_message(mqd, msg_ptr, msg_len, timeout); |
|
} |
|
|
|
/** |
|
* @brief Send message to a message queue within abstime time. |
|
* |
|
* All messages in message queue are of equal priority. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, |
|
unsigned int msg_prio, const struct timespec *abstime) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
s32_t timeout; |
|
|
|
timeout = (s32_t) timespec_to_timeoutms(abstime); |
|
return send_message(mqd, msg_ptr, msg_len, timeout); |
|
} |
|
|
|
/** |
|
* @brief Receive a message from a message queue. |
|
* |
|
* All messages in message queue are of equal priority. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, |
|
unsigned int *msg_prio) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
s32_t timeout = K_FOREVER; |
|
|
|
return receive_message(mqd, msg_ptr, msg_len, timeout); |
|
|
|
} |
|
|
|
/** |
|
* @brief Receive message from a message queue within abstime time. |
|
* |
|
* All messages in message queue are of equal priority. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, |
|
unsigned int *msg_prio, const struct timespec *abstime) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
s32_t timeout = K_NO_WAIT; |
|
|
|
timeout = (s32_t) timespec_to_timeoutms(abstime); |
|
return receive_message(mqd, msg_ptr, msg_len, timeout); |
|
} |
|
|
|
/** |
|
* @brief Get message queue attributes. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
struct k_msgq_attrs attrs; |
|
|
|
if (mqd == NULL) { |
|
errno = EBADF; |
|
return -1; |
|
} |
|
|
|
k_sem_take(&mq_sem, K_FOREVER); |
|
k_msgq_get_attrs(&mqd->mqueue->queue, &attrs); |
|
mqstat->mq_flags = mqd->flags; |
|
mqstat->mq_maxmsg = attrs.max_msgs; |
|
mqstat->mq_msgsize = attrs.msg_size; |
|
mqstat->mq_curmsgs = attrs.used_msgs; |
|
k_sem_give(&mq_sem); |
|
return 0; |
|
} |
|
|
|
/** |
|
* @brief Set message queue attributes. |
|
* |
|
* See IEEE 1003.1 |
|
*/ |
|
int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, |
|
struct mq_attr *omqstat) |
|
{ |
|
mqueue_desc *mqd = (mqueue_desc *)mqdes; |
|
|
|
if (mqd == NULL) { |
|
errno = EBADF; |
|
return -1; |
|
} |
|
|
|
if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) { |
|
errno = EINVAL; |
|
return -1; |
|
} |
|
|
|
if (omqstat != NULL) { |
|
mq_getattr(mqdes, omqstat); |
|
} |
|
|
|
k_sem_take(&mq_sem, K_FOREVER); |
|
mqd->flags = mqstat->mq_flags; |
|
k_sem_give(&mq_sem); |
|
|
|
return 0; |
|
} |
|
|
|
/* Internal functions */ |
|
static mqueue_object *find_in_list(const char *name) |
|
{ |
|
sys_snode_t *mq; |
|
mqueue_object *msg_queue; |
|
|
|
mq = mq_list.head; |
|
|
|
while (mq != NULL) { |
|
msg_queue = (mqueue_object *)mq; |
|
if (strcmp(msg_queue->name, name) == 0) { |
|
return msg_queue; |
|
} |
|
|
|
mq = mq->next; |
|
} |
|
|
|
return NULL; |
|
} |
|
|
|
static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, |
|
s32_t timeout) |
|
{ |
|
s32_t ret = -1; |
|
|
|
if (mqd == NULL) { |
|
errno = EBADF; |
|
return ret; |
|
} |
|
|
|
if ((mqd->flags & O_NONBLOCK) != 0U) { |
|
timeout = K_NO_WAIT; |
|
} |
|
|
|
if (msg_len > mqd->mqueue->queue.msg_size) { |
|
errno = EMSGSIZE; |
|
return ret; |
|
} |
|
|
|
if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { |
|
errno = (timeout == K_NO_WAIT) ? EAGAIN : ETIMEDOUT; |
|
return ret; |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
static s32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, |
|
s32_t timeout) |
|
{ |
|
int ret = -1; |
|
|
|
if (mqd == NULL) { |
|
errno = EBADF; |
|
return ret; |
|
} |
|
|
|
if (msg_len < mqd->mqueue->queue.msg_size) { |
|
errno = EMSGSIZE; |
|
return ret; |
|
} |
|
|
|
if ((mqd->flags & O_NONBLOCK) != 0U) { |
|
timeout = K_NO_WAIT; |
|
} |
|
|
|
if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { |
|
errno = (timeout != K_NO_WAIT) ? ETIMEDOUT : EAGAIN; |
|
} else { |
|
ret = mqd->mqueue->queue.msg_size; |
|
} |
|
|
|
return ret; |
|
} |
|
|
|
static void remove_mq(mqueue_object *msg_queue) |
|
{ |
|
if (atomic_cas(&msg_queue->ref_count, 0, 0)) { |
|
k_sem_take(&mq_sem, K_FOREVER); |
|
sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue); |
|
k_sem_give(&mq_sem); |
|
|
|
/* Free mq buffer and pbject */ |
|
k_free(msg_queue->mem_buffer); |
|
k_free(msg_queue->mem_obj); |
|
} |
|
}
|
|
|