From d95caa51a4489e3ddacc3ea69e39f997dfbb8245 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Tue, 3 Oct 2023 10:01:32 -0500 Subject: [PATCH] sys: Add a lockfree mpsc and spsc queues Moves the rtio_ prefixed lockfree queues to sys alongside existing mpsc/spsc pbuf, ringbuf, and similar queue-like data structures. Signed-off-by: Tom Burdick --- doc/kernel/data_structures/index.rst | 2 + doc/kernel/data_structures/mpsc_lockfree.rst | 14 ++ doc/kernel/data_structures/spsc_lockfree.rst | 12 ++ doc/services/rtio/index.rst | 10 -- drivers/i2c/i2c_rtio.c | 10 +- drivers/spi/spi_mcux_lpspi.c | 6 +- drivers/spi/spi_sam.c | 6 +- include/zephyr/drivers/i2c/rtio.h | 2 +- include/zephyr/rtio/rtio.h | 56 ++++---- .../{rtio/rtio_mpsc.h => sys/mpsc_lockfree.h} | 96 ++++++------- .../{rtio/rtio_spsc.h => sys/spsc_lockfree.h} | 136 +++++++++--------- .../sensor_batch_processing/src/vnd_sensor.c | 6 +- subsys/rtio/rtio_executor.c | 8 +- subsys/rtio/rtio_init.c | 5 +- tests/lib/cpp/cxx/src/main.cpp | 4 +- tests/lib/lockfree/CMakeLists.txt | 12 ++ tests/lib/lockfree/prj.conf | 2 + .../lockfree/src/test_mpsc.c} | 100 +++++++------ .../lockfree/src/test_spsc.c} | 112 ++++++++------- tests/lib/lockfree/testcase.yaml | 7 + tests/subsys/rtio/rtio_api/CMakeLists.txt | 2 +- tests/subsys/rtio/rtio_api/src/rtio_api.h | 19 --- .../rtio/rtio_api/src/rtio_iodev_test.h | 12 +- .../subsys/rtio/rtio_api/src/test_rtio_api.c | 1 - 24 files changed, 337 insertions(+), 303 deletions(-) create mode 100644 doc/kernel/data_structures/mpsc_lockfree.rst create mode 100644 doc/kernel/data_structures/spsc_lockfree.rst rename include/zephyr/{rtio/rtio_mpsc.h => sys/mpsc_lockfree.h} (71%) rename include/zephyr/{rtio/rtio_spsc.h => sys/spsc_lockfree.h} (66%) create mode 100644 tests/lib/lockfree/CMakeLists.txt create mode 100644 tests/lib/lockfree/prj.conf rename tests/{subsys/rtio/rtio_api/src/test_rtio_mpsc.c => lib/lockfree/src/test_mpsc.c} (71%) rename tests/{subsys/rtio/rtio_api/src/test_rtio_spsc.c => lib/lockfree/src/test_spsc.c} (63%) create mode 100644 tests/lib/lockfree/testcase.yaml delete mode 100644 tests/subsys/rtio/rtio_api/src/rtio_api.h diff --git a/doc/kernel/data_structures/index.rst b/doc/kernel/data_structures/index.rst index ccdc2349225..f7e7c5ad354 100644 --- a/doc/kernel/data_structures/index.rst +++ b/doc/kernel/data_structures/index.rst @@ -34,3 +34,5 @@ needed will be provided by the user. spsc_pbuf.rst rbtree.rst ring_buffers.rst + mpsc_lockfree.rst + spsc_lockfree.rst diff --git a/doc/kernel/data_structures/mpsc_lockfree.rst b/doc/kernel/data_structures/mpsc_lockfree.rst new file mode 100644 index 00000000000..6b919125166 --- /dev/null +++ b/doc/kernel/data_structures/mpsc_lockfree.rst @@ -0,0 +1,14 @@ +.. _mpsc_lockfree: + +Multi Producer Single Consumer Lock Free Queue +============================================== + +A :dfn:`Multi Producer Single Consumer Lock Free Queue (MPSC)` is an lockfree +intrusive queue based on atomic pointer swaps as described by Dmitry Vyukov +at `1024cores `_. + + +API Reference +************* + +.. doxygengroup:: mpsc_lockfree diff --git a/doc/kernel/data_structures/spsc_lockfree.rst b/doc/kernel/data_structures/spsc_lockfree.rst new file mode 100644 index 00000000000..dcd9939112c --- /dev/null +++ b/doc/kernel/data_structures/spsc_lockfree.rst @@ -0,0 +1,12 @@ +.. _spsc_lockfree: + +Single Producer Single Consumer Lock Free Queue +=============================================== + +A :dfn:`Single Producer Single Consumer Lock Free Queue (MPSC)` is a lock free +atomic ring buffer based queue. + +API Reference +************* + +.. doxygengroup:: spsc_lockfree diff --git a/doc/services/rtio/index.rst b/doc/services/rtio/index.rst index df6e75d7702..cb33a3ffbeb 100644 --- a/doc/services/rtio/index.rst +++ b/doc/services/rtio/index.rst @@ -216,13 +216,3 @@ API Reference ************* .. doxygengroup:: rtio - -MPSC Lock-free Queue API -======================== - -.. doxygengroup:: rtio_mpsc - -SPSC Lock-free Queue API -======================== - -.. doxygengroup:: rtio_spsc diff --git a/drivers/i2c/i2c_rtio.c b/drivers/i2c/i2c_rtio.c index 7fd90c3b89d..e31c64bedf6 100644 --- a/drivers/i2c/i2c_rtio.c +++ b/drivers/i2c/i2c_rtio.c @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #define LOG_LEVEL CONFIG_I2C_LOG_LEVEL @@ -55,14 +55,14 @@ struct rtio_sqe *i2c_rtio_copy(struct rtio *r, struct rtio_iodev *iodev, const s void i2c_rtio_init(struct i2c_rtio *ctx, const struct device *dev) { k_sem_init(&ctx->lock, 1, 1); - rtio_mpsc_init(&ctx->io_q); + mpsc_init(&ctx->io_q); ctx->txn_curr = NULL; ctx->txn_head = NULL; ctx->dt_spec.bus = dev; ctx->iodev.data = &ctx->dt_spec; ctx->iodev.api = &i2c_iodev_api; /* TODO drop the builtin submission queue? */ - rtio_mpsc_init(&ctx->iodev.iodev_sq); + mpsc_init(&ctx->iodev.iodev_sq); } /** @@ -82,7 +82,7 @@ static bool i2c_rtio_next(struct i2c_rtio *ctx, bool completion) return false; } - struct rtio_mpsc_node *next = rtio_mpsc_pop(&ctx->io_q); + struct mpsc_node *next = mpsc_pop(&ctx->io_q); /* Nothing left to do */ if (next == NULL) { @@ -119,7 +119,7 @@ bool i2c_rtio_complete(struct i2c_rtio *ctx, int status) } bool i2c_rtio_submit(struct i2c_rtio *ctx, struct rtio_iodev_sqe *iodev_sqe) { - rtio_mpsc_push(&ctx->io_q, &iodev_sqe->q); + mpsc_push(&ctx->io_q, &iodev_sqe->q); return i2c_rtio_next(ctx, false); } diff --git a/drivers/spi/spi_mcux_lpspi.c b/drivers/spi/spi_mcux_lpspi.c index 5432debd68a..b0038fddfd5 100644 --- a/drivers/spi/spi_mcux_lpspi.c +++ b/drivers/spi/spi_mcux_lpspi.c @@ -702,7 +702,7 @@ static int spi_mcux_init(const struct device *dev) data->dt_spec.bus = dev; data->iodev.api = &spi_iodev_api; data->iodev.data = &data->dt_spec; - rtio_mpsc_init(&data->iodev.iodev_sq); + mpsc_init(&data->iodev.iodev_sq); #endif err = pinctrl_apply_state(config->pincfg, PINCTRL_STATE_DEFAULT); @@ -803,7 +803,7 @@ static void spi_mcux_iodev_next(const struct device *dev, bool completion) return; } - struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->iodev.iodev_sq); + struct mpsc_node *next = mpsc_pop(&data->iodev.iodev_sq); if (next != NULL) { struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); @@ -832,7 +832,7 @@ static void spi_mcux_iodev_submit(const struct device *dev, { struct spi_mcux_data *data = dev->data; - rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); + mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); spi_mcux_iodev_next(dev, false); } diff --git a/drivers/spi/spi_sam.c b/drivers/spi/spi_sam.c index 7cb6e8dc90c..7b8dd2ae35e 100644 --- a/drivers/spi/spi_sam.c +++ b/drivers/spi/spi_sam.c @@ -691,7 +691,7 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion) return; } - struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->iodev.iodev_sq); + struct mpsc_node *next = mpsc_pop(&data->iodev.iodev_sq); if (next != NULL) { struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); @@ -736,7 +736,7 @@ static void spi_sam_iodev_submit(const struct device *dev, { struct spi_sam_data *data = dev->data; - rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); + mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); spi_sam_iodev_next(dev, false); } #endif @@ -866,7 +866,7 @@ static int spi_sam_init(const struct device *dev) data->dt_spec.bus = dev; data->iodev.api = &spi_iodev_api; data->iodev.data = &data->dt_spec; - rtio_mpsc_init(&data->iodev.iodev_sq); + mpsc_init(&data->iodev.iodev_sq); #endif spi_context_unlock_unconditionally(&data->ctx); diff --git a/include/zephyr/drivers/i2c/rtio.h b/include/zephyr/drivers/i2c/rtio.h index d4dc6c32ae2..2f094540f70 100644 --- a/include/zephyr/drivers/i2c/rtio.h +++ b/include/zephyr/drivers/i2c/rtio.h @@ -22,7 +22,7 @@ struct i2c_rtio { struct k_sem lock; struct k_spinlock slock; struct rtio *r; - struct rtio_mpsc io_q; + struct mpsc io_q; struct rtio_iodev iodev; struct rtio_iodev_sqe *txn_head; struct rtio_iodev_sqe *txn_curr; diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index 75ab879f949..f44c09450b6 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -31,12 +31,12 @@ #include #include #include -#include #include #include #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -292,7 +292,7 @@ BUILD_ASSERT(sizeof(struct rtio_sqe) <= 64); * @brief A completion queue event */ struct rtio_cqe { - struct rtio_mpsc_node q; + struct mpsc_node q; int32_t result; /**< Result from operation */ void *userdata; /**< Associated userdata with operation */ @@ -300,14 +300,14 @@ struct rtio_cqe { }; struct rtio_sqe_pool { - struct rtio_mpsc free_q; + struct mpsc free_q; const uint16_t pool_size; uint16_t pool_free; struct rtio_iodev_sqe *pool; }; struct rtio_cqe_pool { - struct rtio_mpsc free_q; + struct mpsc free_q; const uint16_t pool_size; uint16_t pool_free; struct rtio_cqe *pool; @@ -362,10 +362,10 @@ struct rtio { #endif /* Submission queue */ - struct rtio_mpsc sq; + struct mpsc sq; /* Completion queue */ - struct rtio_mpsc cq; + struct mpsc cq; }; /** The memory partition associated with all RTIO context information */ @@ -422,7 +422,7 @@ static inline uint16_t __rtio_compute_mempool_block_index(const struct rtio *r, */ struct rtio_iodev_sqe { struct rtio_sqe sqe; - struct rtio_mpsc_node q; + struct mpsc_node q; struct rtio_iodev_sqe *next; struct rtio *r; }; @@ -450,7 +450,7 @@ struct rtio_iodev { const struct rtio_iodev_api *api; /* Queue of RTIO contexts with requests */ - struct rtio_mpsc iodev_sq; + struct mpsc iodev_sq; /* Data associated with this iodev */ void *data; @@ -625,7 +625,7 @@ static inline void rtio_sqe_prep_transceive(struct rtio_sqe *sqe, static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool) { - struct rtio_mpsc_node *node = rtio_mpsc_pop(&pool->free_q); + struct mpsc_node *node = mpsc_pop(&pool->free_q); if (node == NULL) { return NULL; @@ -640,14 +640,14 @@ static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *p static inline void rtio_sqe_pool_free(struct rtio_sqe_pool *pool, struct rtio_iodev_sqe *iodev_sqe) { - rtio_mpsc_push(&pool->free_q, &iodev_sqe->q); + mpsc_push(&pool->free_q, &iodev_sqe->q); pool->pool_free++; } static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool) { - struct rtio_mpsc_node *node = rtio_mpsc_pop(&pool->free_q); + struct mpsc_node *node = mpsc_pop(&pool->free_q); if (node == NULL) { return NULL; @@ -664,7 +664,7 @@ static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool) static inline void rtio_cqe_pool_free(struct rtio_cqe_pool *pool, struct rtio_cqe *cqe) { - rtio_mpsc_push(&pool->free_q, &cqe->q); + mpsc_push(&pool->free_q, &cqe->q); pool->pool_free++; } @@ -732,14 +732,14 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_ #define RTIO_IODEV_DEFINE(name, iodev_api, iodev_data) \ STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \ .api = (iodev_api), \ - .iodev_sq = RTIO_MPSC_INIT((name.iodev_sq)), \ + .iodev_sq = MPSC_INIT((name.iodev_sq)), \ .data = (iodev_data), \ } #define Z_RTIO_SQE_POOL_DEFINE(name, sz) \ static struct rtio_iodev_sqe CONCAT(_sqe_pool_, name)[sz]; \ STRUCT_SECTION_ITERABLE(rtio_sqe_pool, name) = { \ - .free_q = RTIO_MPSC_INIT((name.free_q)), \ + .free_q = MPSC_INIT((name.free_q)), \ .pool_size = sz, \ .pool_free = sz, \ .pool = CONCAT(_sqe_pool_, name), \ @@ -749,7 +749,7 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_ #define Z_RTIO_CQE_POOL_DEFINE(name, sz) \ static struct rtio_cqe CONCAT(_cqe_pool_, name)[sz]; \ STRUCT_SECTION_ITERABLE(rtio_cqe_pool, name) = { \ - .free_q = RTIO_MPSC_INIT((name.free_q)), \ + .free_q = MPSC_INIT((name.free_q)), \ .pool_size = sz, \ .pool_free = sz, \ .pool = CONCAT(_cqe_pool_, name), \ @@ -797,8 +797,8 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_ .sqe_pool = _sqe_pool, \ .cqe_pool = _cqe_pool, \ IF_ENABLED(CONFIG_RTIO_SYS_MEM_BLOCKS, (.block_pool = _block_pool,)) \ - .sq = RTIO_MPSC_INIT((name.sq)), \ - .cq = RTIO_MPSC_INIT((name.cq)), \ + .sq = MPSC_INIT((name.sq)), \ + .cq = MPSC_INIT((name.cq)), \ } /** @@ -910,7 +910,7 @@ static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r) return NULL; } - rtio_mpsc_push(&r->sq, &iodev_sqe->q); + mpsc_push(&r->sq, &iodev_sqe->q); return &iodev_sqe->sqe; } @@ -923,12 +923,12 @@ static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r) static inline void rtio_sqe_drop_all(struct rtio *r) { struct rtio_iodev_sqe *iodev_sqe; - struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq); + struct mpsc_node *node = mpsc_pop(&r->sq); while (node != NULL) { iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); rtio_sqe_pool_free(r->sqe_pool, iodev_sqe); - node = rtio_mpsc_pop(&r->sq); + node = mpsc_pop(&r->sq); } } @@ -953,7 +953,7 @@ static inline struct rtio_cqe *rtio_cqe_acquire(struct rtio *r) */ static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe) { - rtio_mpsc_push(&r->cq, &cqe->q); + mpsc_push(&r->cq, &cqe->q); } /** @@ -969,7 +969,7 @@ static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe) */ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) { - struct rtio_mpsc_node *node; + struct mpsc_node *node; struct rtio_cqe *cqe = NULL; #ifdef CONFIG_RTIO_CONSUME_SEM @@ -978,7 +978,7 @@ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) } #endif - node = rtio_mpsc_pop(&r->cq); + node = mpsc_pop(&r->cq); if (node == NULL) { return NULL; } @@ -999,16 +999,16 @@ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) */ static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r) { - struct rtio_mpsc_node *node; + struct mpsc_node *node; struct rtio_cqe *cqe; #ifdef CONFIG_RTIO_CONSUME_SEM k_sem_take(r->consume_sem, K_FOREVER); #endif - node = rtio_mpsc_pop(&r->cq); + node = mpsc_pop(&r->cq); while (node == NULL) { Z_SPIN_DELAY(1); - node = rtio_mpsc_pop(&r->cq); + node = mpsc_pop(&r->cq); } cqe = CONTAINER_OF(node, struct rtio_cqe, q); @@ -1136,13 +1136,13 @@ static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int resu static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev) { /* Clear pending requests as -ENODATA */ - struct rtio_mpsc_node *node = rtio_mpsc_pop(&iodev->iodev_sq); + struct mpsc_node *node = mpsc_pop(&iodev->iodev_sq); while (node != NULL) { struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); rtio_iodev_sqe_err(iodev_sqe, -ECANCELED); - node = rtio_mpsc_pop(&iodev->iodev_sq); + node = mpsc_pop(&iodev->iodev_sq); } } diff --git a/include/zephyr/rtio/rtio_mpsc.h b/include/zephyr/sys/mpsc_lockfree.h similarity index 71% rename from include/zephyr/rtio/rtio_mpsc.h rename to include/zephyr/sys/mpsc_lockfree.h index 9551d1c5cc4..66f513072e4 100644 --- a/include/zephyr/rtio/rtio_mpsc.h +++ b/include/zephyr/sys/mpsc_lockfree.h @@ -1,12 +1,12 @@ /* * Copyright (c) 2010-2011 Dmitry Vyukov - * Copyright (c) 2022 Intel Corporation + * Copyright (c) 2023 Intel Corporation * * SPDX-License-Identifier: Apache-2.0 */ -#ifndef ZEPHYR_RTIO_MPSC_H_ -#define ZEPHYR_RTIO_MPSC_H_ +#ifndef ZEPHYR_SYS_MPSC_LOCKFREE_H_ +#define ZEPHYR_SYS_MPSC_LOCKFREE_H_ #include #include @@ -18,12 +18,28 @@ extern "C" { #endif /** - * @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API - * @defgroup rtio_mpsc RTIO MPSC API - * @ingroup rtio + * @brief Multiple Producer Single Consumer (MPSC) Lockfree Queue API + * @defgroup mpsc_lockfree MPSC Lockfree Queue API + * @ingroup datastructure_apis * @{ */ +/** + * @file mpsc_lockfree.h + * + * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using + * a singly linked list. Ordering is First-In-First-Out. + * + * Based on the well known and widely used wait-free MPSC queue described by + * Dmitry Vyukov with some slight changes to account for needs of an + * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS + * loop or lock is needed. + * + * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop. + * + * @warning MPSC is *not* safe to consume in multiple execution contexts. + */ + /* * On single core systems atomics are unnecessary * and cause a lot of unnecessary cache invalidation @@ -35,17 +51,17 @@ extern "C" { * are updated in the correct order and the values are * updated core caches correctly. */ -#if defined(CONFIG_SMP) +#if IS_ENABLED(CONFIG_SMP) typedef atomic_ptr_t mpsc_ptr_t; -#define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr)) -#define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val) +#define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr)) +#define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val) #define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val) -#else +#else /* IS_ENABLED(CONFIG_SMP) */ -typedef struct rtio_mpsc_node *mpsc_ptr_t; +typedef struct mpsc_node *mpsc_ptr_t; #define mpsc_ptr_get(ptr) ptr #define mpsc_ptr_set(ptr, val) ptr = val @@ -56,38 +72,22 @@ typedef struct rtio_mpsc_node *mpsc_ptr_t; tmp; \ }) -#endif - -/** - * @file rtio_mpsc.h - * - * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using - * a singly linked list. Ordering is First-In-First-Out. - * - * Based on the well known and widely used wait-free MPSC queue described by - * Dmitry Vyukov with some slight changes to account for needs of an - * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS - * loop or lock is needed. - * - * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop. - * - * @warning MPSC is *not* safe to consume in multiple execution contexts. - */ +#endif /* IS_ENABLED(CONFIG_SMP) */ /** * @brief Queue member */ -struct rtio_mpsc_node { +struct mpsc_node { mpsc_ptr_t next; }; /** * @brief MPSC Queue */ -struct rtio_mpsc { +struct mpsc { mpsc_ptr_t head; - struct rtio_mpsc_node *tail; - struct rtio_mpsc_node stub; + struct mpsc_node *tail; + struct mpsc_node stub; }; /** @@ -97,10 +97,10 @@ struct rtio_mpsc { * * @param symbol name of the queue */ -#define RTIO_MPSC_INIT(symbol) \ +#define MPSC_INIT(symbol) \ { \ - .head = (struct rtio_mpsc_node *)&symbol.stub, \ - .tail = (struct rtio_mpsc_node *)&symbol.stub, \ + .head = (struct mpsc_node *)&symbol.stub, \ + .tail = (struct mpsc_node *)&symbol.stub, \ .stub = { \ .next = NULL, \ }, \ @@ -111,7 +111,7 @@ struct rtio_mpsc { * * @param q Queue to initialize or reset */ -static inline void rtio_mpsc_init(struct rtio_mpsc *q) +static inline void mpsc_init(struct mpsc *q) { mpsc_ptr_set(q->head, &q->stub); q->tail = &q->stub; @@ -124,15 +124,15 @@ static inline void rtio_mpsc_init(struct rtio_mpsc *q) * @param q Queue to push the node to * @param n Node to push into the queue */ -static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n) +static ALWAYS_INLINE void mpsc_push(struct mpsc *q, struct mpsc_node *n) { - struct rtio_mpsc_node *prev; + struct mpsc_node *prev; int key; mpsc_ptr_set(n->next, NULL); key = arch_irq_lock(); - prev = (struct rtio_mpsc_node *)mpsc_ptr_set_get(q->head, n); + prev = (struct mpsc_node *)mpsc_ptr_set_get(q->head, n); mpsc_ptr_set(prev->next, n); arch_irq_unlock(key); } @@ -143,11 +143,11 @@ static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_n * @retval NULL When no node is available * @retval node When node is available */ -static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) +static inline struct mpsc_node *mpsc_pop(struct mpsc *q) { - struct rtio_mpsc_node *head; - struct rtio_mpsc_node *tail = q->tail; - struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next); + struct mpsc_node *head; + struct mpsc_node *tail = q->tail; + struct mpsc_node *next = (struct mpsc_node *)mpsc_ptr_get(tail->next); /* Skip over the stub/sentinel */ if (tail == &q->stub) { @@ -157,7 +157,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) q->tail = next; tail = next; - next = (struct rtio_mpsc_node *)mpsc_ptr_get(next->next); + next = (struct mpsc_node *)mpsc_ptr_get(next->next); } /* If next is non-NULL then a valid node is found, return it */ @@ -166,7 +166,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) return tail; } - head = (struct rtio_mpsc_node *)mpsc_ptr_get(q->head); + head = (struct mpsc_node *)mpsc_ptr_get(q->head); /* If next is NULL, and the tail != HEAD then the queue has pending * updates that can't yet be accessed. @@ -175,9 +175,9 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) return NULL; } - rtio_mpsc_push(q, &q->stub); + mpsc_push(q, &q->stub); - next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next); + next = (struct mpsc_node *)mpsc_ptr_get(tail->next); if (next != NULL) { q->tail = next; @@ -195,4 +195,4 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) } #endif -#endif /* ZEPHYR_RTIO_MPSC_H_ */ +#endif /* ZEPHYR_SYS_MPSC_LOCKFREE_H_ */ diff --git a/include/zephyr/rtio/rtio_spsc.h b/include/zephyr/sys/spsc_lockfree.h similarity index 66% rename from include/zephyr/rtio/rtio_spsc.h rename to include/zephyr/sys/spsc_lockfree.h index 07872e7d4b0..3f769bfbd16 100644 --- a/include/zephyr/rtio/rtio_spsc.h +++ b/include/zephyr/sys/spsc_lockfree.h @@ -1,12 +1,11 @@ /* - * Copyright (c) 2022 Intel Corporation + * Copyright (c) 2023 Intel Corporation * * SPDX-License-Identifier: Apache-2.0 */ - -#ifndef ZEPHYR_RTIO_SPSC_H_ -#define ZEPHYR_RTIO_SPSC_H_ +#ifndef ZEPHYR_SYS_SPSC_LOCKFREE_H_ +#define ZEPHYR_SYS_SPSC_LOCKFREE_H_ #include #include @@ -15,14 +14,14 @@ #include /** - * @brief RTIO Single Producer Single Consumer (SPSC) Queue API - * @defgroup rtio_spsc RTIO SPSC API - * @ingroup rtio + * @brief Single Producer Single Consumer (SPSC) Lockfree Queue API + * @defgroup spsc_lockfree SPSC API + * @ingroup datastructure_apis * @{ */ /** - * @file rtio_spsc.h + * @file spsc_lockfree.h * * @brief A lock-free and type safe power of 2 fixed sized single producer * single consumer (SPSC) queue using a ringbuffer and atomics to ensure @@ -57,7 +56,7 @@ * * @warning Not to be manipulated without the macros! */ -struct rtio_spsc { +struct spsc { /* private value only the producer thread should mutate */ unsigned long acquire; @@ -75,53 +74,54 @@ struct rtio_spsc { }; /** - * @brief Statically initialize an rtio_spsc + * @brief Statically initialize an spsc * * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) * @param buf Buffer pointer */ -#define RTIO_SPSC_INITIALIZER(sz, buf) \ - { \ - ._spsc = { \ - .acquire = 0, \ - .consume = 0, \ - .in = ATOMIC_INIT(0), \ - .out = ATOMIC_INIT(0), \ - .mask = sz - 1, \ - }, \ - .buffer = buf, \ +#define SPSC_INITIALIZER(sz, buf) \ + { \ + ._spsc = \ + { \ + .acquire = 0, \ + .consume = 0, \ + .in = ATOMIC_INIT(0), \ + .out = ATOMIC_INIT(0), \ + .mask = sz - 1, \ + }, \ + .buffer = buf, \ } /** - * @brief Declare an anonymous struct type for an rtio_spsc + * @brief Declare an anonymous struct type for an spsc * * @param name Name of the spsc symbol to be provided * @param type Type stored in the spsc */ -#define RTIO_SPSC_DECLARE(name, type) \ - static struct rtio_spsc_##name { \ - struct rtio_spsc _spsc; \ - type * const buffer; \ +#define SPSC_DECLARE(name, type) \ + static struct spsc_##name { \ + struct spsc _spsc; \ + type * const buffer; \ } /** - * @brief Define an rtio_spsc with a fixed size + * @brief Define an spsc with a fixed size * * @param name Name of the spsc symbol to be provided * @param type Type stored in the spsc * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) */ -#define RTIO_SPSC_DEFINE(name, type, sz) \ - BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \ - static type __spsc_buf_##name[sz]; \ - RTIO_SPSC_DECLARE(name, type) name = RTIO_SPSC_INITIALIZER(sz, __spsc_buf_##name); +#define SPSC_DEFINE(name, type, sz) \ + BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \ + static type __spsc_buf_##name[sz]; \ + SPSC_DECLARE(name, type) name = SPSC_INITIALIZER(sz, __spsc_buf_##name); /** * @brief Size of the SPSC queue * * @param spsc SPSC reference */ -#define rtio_spsc_size(spsc) ((spsc)->_spsc.mask + 1) +#define spsc_size(spsc) ((spsc)->_spsc.mask + 1) /** * @private @@ -130,20 +130,19 @@ struct rtio_spsc { * @param spsc SPSC reference * @param i Value to modulo to the size of the spsc */ -#define z_rtio_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask) - +#define z_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask) /** * @private * @brief Load the current "in" index from the spsc as an unsigned long */ -#define z_rtio_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in) +#define z_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in) /** * @private * @brief Load the current "out" index from the spsc as an unsigned long */ -#define z_rtio_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out) +#define z_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out) /** * @brief Initialize/reset a spsc such that its empty @@ -153,7 +152,7 @@ struct rtio_spsc { * * @param spsc SPSC to initialize/reset */ -#define rtio_spsc_reset(spsc) \ +#define spsc_reset(spsc) \ ({ \ (spsc)->_spsc.consume = 0; \ (spsc)->_spsc.acquire = 0; \ @@ -168,14 +167,14 @@ struct rtio_spsc { * * @return A pointer to the acquired element or null if the spsc is full */ -#define rtio_spsc_acquire(spsc) \ +#define spsc_acquire(spsc) \ ({ \ - unsigned long idx = z_rtio_spsc_in(spsc) + (spsc)->_spsc.acquire; \ - bool spsc_acq = (idx - z_rtio_spsc_out(spsc)) < rtio_spsc_size(spsc); \ + unsigned long idx = z_spsc_in(spsc) + (spsc)->_spsc.acquire; \ + bool spsc_acq = (idx - z_spsc_out(spsc)) < spsc_size(spsc); \ if (spsc_acq) { \ (spsc)->_spsc.acquire += 1; \ } \ - spsc_acq ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + spsc_acq ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \ }) /** @@ -185,7 +184,7 @@ struct rtio_spsc { * * @param spsc SPSC to produce the previously acquired element or do nothing */ -#define rtio_spsc_produce(spsc) \ +#define spsc_produce(spsc) \ ({ \ if ((spsc)->_spsc.acquire > 0) { \ (spsc)->_spsc.acquire -= 1; \ @@ -201,7 +200,7 @@ struct rtio_spsc { * * @param spsc SPSC to produce all previously acquired elements or do nothing */ -#define rtio_spsc_produce_all(spsc) \ +#define spsc_produce_all(spsc) \ ({ \ if ((spsc)->_spsc.acquire > 0) { \ unsigned long acquired = (spsc)->_spsc.acquire; \ @@ -217,9 +216,9 @@ struct rtio_spsc { * * @param spsc SPSC to drop all previously acquired elements or do nothing */ -#define rtio_spsc_drop_all(spsc) \ - do { \ - (spsc)->_spsc.acquire = 0; \ +#define spsc_drop_all(spsc) \ + do { \ + (spsc)->_spsc.acquire = 0; \ } while (false) /** @@ -229,14 +228,14 @@ struct rtio_spsc { * * @return Pointer to element or null if no consumable elements left */ -#define rtio_spsc_consume(spsc) \ +#define spsc_consume(spsc) \ ({ \ - unsigned long idx = z_rtio_spsc_out(spsc) + (spsc)->_spsc.consume; \ - bool has_consumable = (idx != z_rtio_spsc_in(spsc)); \ + unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \ + bool has_consumable = (idx != z_spsc_in(spsc)); \ if (has_consumable) { \ (spsc)->_spsc.consume += 1; \ } \ - has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \ }) /** @@ -244,7 +243,7 @@ struct rtio_spsc { * * @param spsc SPSC to release consumed element or do nothing */ -#define rtio_spsc_release(spsc) \ +#define spsc_release(spsc) \ ({ \ if ((spsc)->_spsc.consume > 0) { \ (spsc)->_spsc.consume -= 1; \ @@ -252,13 +251,12 @@ struct rtio_spsc { } \ }) - /** * @brief Release all consumed elements * * @param spsc SPSC to release consumed elements or do nothing */ -#define rtio_spsc_release_all(spsc) \ +#define spsc_release_all(spsc) \ ({ \ if ((spsc)->_spsc.consume > 0) { \ unsigned long consumed = (spsc)->_spsc.consume; \ @@ -272,19 +270,15 @@ struct rtio_spsc { * * @param spsc SPSC to get item count for */ -#define rtio_spsc_acquirable(spsc) \ - ({ \ - (((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - \ - rtio_spsc_size(spsc); \ - }) +#define spsc_acquirable(spsc) \ + ({ (((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - spsc_size(spsc); }) /** * @brief Count of consumables in spsc * * @param spsc SPSC to get item count for */ -#define rtio_spsc_consumable(spsc) \ - ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; }) +#define spsc_consumable(spsc) ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; }) /** * @brief Peek at the first available item in queue @@ -293,11 +287,11 @@ struct rtio_spsc { * * @return Pointer to element or null if no consumable elements left */ -#define rtio_spsc_peek(spsc) \ +#define spsc_peek(spsc) \ ({ \ - unsigned long idx = z_rtio_spsc_out(spsc) + (spsc)->_spsc.consume; \ - bool has_consumable = (idx != z_rtio_spsc_in(spsc)); \ - has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \ + bool has_consumable = (idx != z_spsc_in(spsc)); \ + has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \ }) /** @@ -309,12 +303,12 @@ struct rtio_spsc { * * @return Pointer to element or null if none left */ -#define rtio_spsc_next(spsc, item) \ +#define spsc_next(spsc, item) \ ({ \ unsigned long idx = ((item) - (spsc)->buffer); \ - bool has_next = z_rtio_spsc_mask(spsc, (idx + 1)) != \ - (z_rtio_spsc_mask(spsc, z_rtio_spsc_in(spsc))); \ - has_next ? &((spsc)->buffer[z_rtio_spsc_mask((spsc), idx + 1)]) : NULL; \ + bool has_next = \ + z_spsc_mask(spsc, (idx + 1)) != (z_spsc_mask(spsc, z_spsc_in(spsc))); \ + has_next ? &((spsc)->buffer[z_spsc_mask((spsc), idx + 1)]) : NULL; \ }) /** @@ -325,15 +319,15 @@ struct rtio_spsc { * * @return Pointer to element or null if none left */ -#define rtio_spsc_prev(spsc, item) \ +#define spsc_prev(spsc, item) \ ({ \ unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \ - bool has_prev = idx != z_rtio_spsc_mask(spsc, z_rtio_spsc_out(spsc)); \ - has_prev ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx - 1)]) : NULL; \ + bool has_prev = idx != z_spsc_mask(spsc, z_spsc_out(spsc)); \ + has_prev ? &((spsc)->buffer[z_spsc_mask(spsc, idx - 1)]) : NULL; \ }) /** * @} */ -#endif /* ZEPHYR_RTIO_SPSC_H_ */ +#endif /* ZEPHYR_SYS_SPSC_LOCKFREE_H_ */ diff --git a/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c b/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c index a8662c04998..ead3c64a648 100644 --- a/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c +++ b/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c @@ -83,13 +83,13 @@ static void vnd_sensor_iodev_submit(struct rtio_iodev_sqe *iodev_sqe) { struct vnd_sensor_data *data = (struct vnd_sensor_data *) iodev_sqe->sqe.iodev; - rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); + mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); } static void vnd_sensor_handle_int(const struct device *dev) { struct vnd_sensor_data *data = dev->data; - struct rtio_mpsc_node *node = rtio_mpsc_pop(&data->iodev.iodev_sq); + struct mpsc_node *node = mpsc_pop(&data->iodev.iodev_sq); if (node != NULL) { struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); @@ -116,7 +116,7 @@ static int vnd_sensor_init(const struct device *dev) data->dev = dev; - rtio_mpsc_init(&data->iodev.iodev_sq); + mpsc_init(&data->iodev.iodev_sq); k_timer_init(&data->timer, vnd_sensor_timer_expiry, NULL); diff --git a/subsys/rtio/rtio_executor.c b/subsys/rtio/rtio_executor.c index 13fd6a7e4dd..651ab64d5d2 100644 --- a/subsys/rtio/rtio_executor.c +++ b/subsys/rtio/rtio_executor.c @@ -61,7 +61,7 @@ static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe) void rtio_executor_submit(struct rtio *r) { const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE); - struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq); + struct mpsc_node *node = mpsc_pop(&r->sq); while (node != NULL) { struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); @@ -83,7 +83,7 @@ void rtio_executor_submit(struct rtio *r) __ASSERT(transaction != chained, "Expected chained or transaction flag, not both"); #endif - node = rtio_mpsc_pop(&iodev_sqe->r->sq); + node = mpsc_pop(&iodev_sqe->r->sq); next = CONTAINER_OF(node, struct rtio_iodev_sqe, q); /* If the current submission was cancelled before submit, @@ -106,7 +106,7 @@ void rtio_executor_submit(struct rtio *r) rtio_iodev_submit(iodev_sqe); - node = rtio_mpsc_pop(&r->sq); + node = mpsc_pop(&r->sq); } } @@ -134,7 +134,7 @@ static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_io } if (!is_canceled) { /* Request was not canceled, put the SQE back in the queue */ - rtio_mpsc_push(&r->sq, &curr->q); + mpsc_push(&r->sq, &curr->q); rtio_executor_submit(r); } } diff --git a/subsys/rtio/rtio_init.c b/subsys/rtio/rtio_init.c index e4deb14b723..1332529907b 100644 --- a/subsys/rtio/rtio_init.c +++ b/subsys/rtio/rtio_init.c @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -17,13 +18,13 @@ int rtio_init(void) { STRUCT_SECTION_FOREACH(rtio_sqe_pool, sqe_pool) { for (int i = 0; i < sqe_pool->pool_size; i++) { - rtio_mpsc_push(&sqe_pool->free_q, &sqe_pool->pool[i].q); + mpsc_push(&sqe_pool->free_q, &sqe_pool->pool[i].q); } } STRUCT_SECTION_FOREACH(rtio_cqe_pool, cqe_pool) { for (int i = 0; i < cqe_pool->pool_size; i++) { - rtio_mpsc_push(&cqe_pool->free_q, &cqe_pool->pool[i].q); + mpsc_push(&cqe_pool->free_q, &cqe_pool->pool[i].q); } } diff --git a/tests/lib/cpp/cxx/src/main.cpp b/tests/lib/cpp/cxx/src/main.cpp index dd377c015b8..b8b77c6c26a 100644 --- a/tests/lib/cpp/cxx/src/main.cpp +++ b/tests/lib/cpp/cxx/src/main.cpp @@ -79,8 +79,8 @@ /* Add RTIO headers to make sure they're CXX compatible */ #include -#include -#include +#include +#include #include diff --git a/tests/lib/lockfree/CMakeLists.txt b/tests/lib/lockfree/CMakeLists.txt new file mode 100644 index 00000000000..4cc3b1f3d97 --- /dev/null +++ b/tests/lib/lockfree/CMakeLists.txt @@ -0,0 +1,12 @@ +# Copyright (c) 2023 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(lockfree_test) + +target_sources(app PRIVATE src/test_spsc.c src/test_mpsc.c) + +target_include_directories(app PRIVATE + ${ZEPHYR_BASE}/include + ${ZEPHYR_BASE}/arch/${ARCH}/include) diff --git a/tests/lib/lockfree/prj.conf b/tests/lib/lockfree/prj.conf new file mode 100644 index 00000000000..49b37026ea3 --- /dev/null +++ b/tests/lib/lockfree/prj.conf @@ -0,0 +1,2 @@ +CONFIG_ZTEST=y +CONFIG_TIMING_FUNCTIONS=y diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c b/tests/lib/lockfree/src/test_mpsc.c similarity index 71% rename from tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c rename to tests/lib/lockfree/src/test_mpsc.c index a656c119b9e..91381d58d2c 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c +++ b/tests/lib/lockfree/src/test_mpsc.c @@ -4,32 +4,31 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include "zephyr/irq.h" #include #include #include #include -#include -#include +#include +#include -#include "rtio_api.h" - -static struct rtio_mpsc push_pop_q; -static struct rtio_mpsc_node push_pop_nodes[2]; +static struct mpsc push_pop_q; +static struct mpsc_node push_pop_nodes[2]; /* * @brief Push and pop one element * - * @see rtio_mpsc_push(), rtio_mpsc_pop() + * @see mpsc_push(), mpsc_pop() * - * @ingroup rtio_tests + * @ingroup tests */ -ZTEST(rtio_mpsc, test_push_pop) +ZTEST(mpsc, test_push_pop) { mpsc_ptr_t node, head; - struct rtio_mpsc_node *stub, *next, *tail; + struct mpsc_node *stub, *next, *tail; - rtio_mpsc_init(&push_pop_q); + mpsc_init(&push_pop_q); head = mpsc_ptr_get(push_pop_q.head); tail = push_pop_q.tail; @@ -40,10 +39,10 @@ ZTEST(rtio_mpsc, test_push_pop) zassert_equal(tail, stub, "Tail should point at stub"); zassert_is_null(next, "Next should be null"); - node = rtio_mpsc_pop(&push_pop_q); + node = mpsc_pop(&push_pop_q); zassert_is_null(node, "Pop on empty queue should return null"); - rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); + mpsc_push(&push_pop_q, &push_pop_nodes[0]); head = mpsc_ptr_get(push_pop_q.head); @@ -56,7 +55,7 @@ ZTEST(rtio_mpsc, test_push_pop) stub = &push_pop_q.stub; zassert_equal(tail, stub, "Tail should point at stub"); - node = rtio_mpsc_pop(&push_pop_q); + node = mpsc_pop(&push_pop_q); stub = &push_pop_q.stub; zassert_not_equal(node, stub, "Pop should not return stub"); @@ -65,7 +64,7 @@ ZTEST(rtio_mpsc, test_push_pop) "Pop should return push_pop_node %p, instead was %p", &push_pop_nodes[0], node); - node = rtio_mpsc_pop(&push_pop_q); + node = mpsc_pop(&push_pop_q); zassert_is_null(node, "Pop on empty queue should return null"); } @@ -74,31 +73,38 @@ ZTEST(rtio_mpsc, test_push_pop) #define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) #define MPSC_THREADS_NUM 4 +struct thread_info { + k_tid_t tid; + int executed; + int priority; + int cpu_id; +}; + static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); -struct mpsc_node { +struct test_mpsc_node { uint32_t id; - struct rtio_mpsc_node n; + struct mpsc_node n; }; -struct rtio_spsc_node_sq { - struct rtio_spsc _spsc; - struct mpsc_node *const buffer; +struct spsc_node_sq { + struct spsc _spsc; + struct test_mpsc_node *const buffer; }; -#define SPSC_DEFINE(n, sz) RTIO_SPSC_DEFINE(_spsc_##n, struct mpsc_node, sz) -#define SPSC_NAME(n, _) (struct rtio_spsc_node_sq *)&_spsc_##n +#define TEST_SPSC_DEFINE(n, sz) SPSC_DEFINE(_spsc_##n, struct test_mpsc_node, sz) +#define SPSC_NAME(n, _) (struct spsc_node_sq *)&_spsc_##n -LISTIFY(MPSC_THREADS_NUM, SPSC_DEFINE, (;), MPSC_FREEQ_SZ) +LISTIFY(MPSC_THREADS_NUM, TEST_SPSC_DEFINE, (;), MPSC_FREEQ_SZ) -struct rtio_spsc_node_sq *node_q[MPSC_THREADS_NUM] = { +struct spsc_node_sq *node_q[MPSC_THREADS_NUM] = { LISTIFY(MPSC_THREADS_NUM, SPSC_NAME, (,)) }; -static struct rtio_mpsc mpsc_q; +static struct mpsc mpsc_q; static void mpsc_consumer(void *p1, void *p2, void *p3) { @@ -106,12 +112,12 @@ static void mpsc_consumer(void *p1, void *p2, void *p3) ARG_UNUSED(p2); ARG_UNUSED(p3); - struct rtio_mpsc_node *n; - struct mpsc_node *nn; + struct mpsc_node *n; + struct test_mpsc_node *nn; for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { do { - n = rtio_mpsc_pop(&mpsc_q); + n = mpsc_pop(&mpsc_q); if (n == NULL) { k_yield(); } @@ -119,10 +125,10 @@ static void mpsc_consumer(void *p1, void *p2, void *p3) zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); - nn = CONTAINER_OF(n, struct mpsc_node, n); + nn = CONTAINER_OF(n, struct test_mpsc_node, n); - rtio_spsc_acquire(node_q[nn->id]); - rtio_spsc_produce(node_q[nn->id]); + spsc_acquire(node_q[nn->id]); + spsc_produce(node_q[nn->id]); } } @@ -132,20 +138,20 @@ static void mpsc_producer(void *p1, void *p2, void *p3) ARG_UNUSED(p2); ARG_UNUSED(p3); - struct mpsc_node *n; + struct test_mpsc_node *n; uint32_t id = (uint32_t)(uintptr_t)p1; for (int i = 0; i < MPSC_ITERATIONS; i++) { do { - n = rtio_spsc_consume(node_q[id]); + n = spsc_consume(node_q[id]); if (n == NULL) { k_yield(); } } while (n == NULL); - rtio_spsc_release(node_q[id]); + spsc_release(node_q[id]); n->id = id; - rtio_mpsc_push(&mpsc_q, &n->n); + mpsc_push(&mpsc_q, &n->n); } } @@ -155,17 +161,17 @@ static void mpsc_producer(void *p1, void *p2, void *p3) * This can and should be validated on SMP machines where incoherent * memory could cause issues. */ -ZTEST(rtio_mpsc, test_mpsc_threaded) +ZTEST(mpsc, test_mpsc_threaded) { - rtio_mpsc_init(&mpsc_q); + mpsc_init(&mpsc_q); TC_PRINT("setting up mpsc producer free queues\n"); /* Setup node free queues */ for (int i = 0; i < MPSC_THREADS_NUM; i++) { for (int j = 0; j < MPSC_FREEQ_SZ; j++) { - rtio_spsc_acquire(node_q[i]); + spsc_acquire(node_q[i]); } - rtio_spsc_produce_all(node_q[i]); + spsc_produce_all(node_q[i]); } TC_PRINT("starting consumer\n"); @@ -194,23 +200,27 @@ ZTEST(rtio_mpsc, test_mpsc_threaded) #define THROUGHPUT_ITERS 100000 -ZTEST(rtio_mpsc, test_mpsc_throughput) +ZTEST(mpsc, test_mpsc_throughput) { - struct rtio_mpsc_node node; + struct mpsc_node node; timing_t start_time, end_time; - rtio_mpsc_init(&mpsc_q); + mpsc_init(&mpsc_q); timing_init(); timing_start(); start_time = timing_counter_get(); + int key = irq_lock(); + for (int i = 0; i < THROUGHPUT_ITERS; i++) { - rtio_mpsc_push(&mpsc_q, &node); + mpsc_push(&mpsc_q, &node); - rtio_mpsc_pop(&mpsc_q); + mpsc_pop(&mpsc_q); } + irq_unlock(key); + end_time = timing_counter_get(); uint64_t cycles = timing_cycles_get(&start_time, &end_time); @@ -220,4 +230,4 @@ ZTEST(rtio_mpsc, test_mpsc_throughput) THROUGHPUT_ITERS, ns/THROUGHPUT_ITERS); } -ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); +ZTEST_SUITE(mpsc, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c b/tests/lib/lockfree/src/test_spsc.c similarity index 63% rename from tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c rename to tests/lib/lockfree/src/test_spsc.c index 12fe9476f3f..e2539ca6ccd 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c +++ b/tests/lib/lockfree/src/test_spsc.c @@ -6,61 +6,60 @@ #include #include -#include -#include "rtio_api.h" +#include /* * @brief Produce and Consume a single uint32_t in the same execution context * - * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * @see spsc_acquire(), spsc_produce(), spsc_consume(), spsc_release() * - * @ingroup rtio_tests + * @ingroup tests */ -ZTEST(rtio_spsc, test_produce_consume_size1) +ZTEST(spsc, test_produce_consume_size1) { - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); + SPSC_DEFINE(ezspsc, uint32_t, 1); const uint32_t magic = 43219876; - uint32_t *acq = rtio_spsc_acquire(&ezspsc); + uint32_t *acq = spsc_acquire(&ezspsc); zassert_not_null(acq, "Acquire should succeed"); *acq = magic; - uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); + uint32_t *acq2 = spsc_acquire(&ezspsc); zassert_is_null(acq2, "Acquire should fail"); - uint32_t *cons = rtio_spsc_consume(&ezspsc); + uint32_t *cons = spsc_consume(&ezspsc); zassert_is_null(cons, "Consume should fail"); - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - rtio_spsc_produce(&ezspsc); + spsc_produce(&ezspsc); - zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); + zassert_equal(spsc_consumable(&ezspsc), 1, "Consumables should be 1"); - uint32_t *cons2 = rtio_spsc_consume(&ezspsc); + uint32_t *cons2 = spsc_consume(&ezspsc); - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0"); zassert_not_null(cons2, "Consume should not fail"); zassert_equal(*cons2, magic, "Consume value should equal magic"); - uint32_t *cons3 = rtio_spsc_consume(&ezspsc); + uint32_t *cons3 = spsc_consume(&ezspsc); zassert_is_null(cons3, "Consume should fail"); - uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); + uint32_t *acq3 = spsc_acquire(&ezspsc); zassert_is_null(acq3, "Acquire should not succeed"); - rtio_spsc_release(&ezspsc); + spsc_release(&ezspsc); - uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); + uint32_t *acq4 = spsc_acquire(&ezspsc); zassert_not_null(acq4, "Acquire should succeed"); } @@ -69,34 +68,34 @@ ZTEST(rtio_spsc, test_produce_consume_size1) * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking * and wrap around reads/writes. * - * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * @see spsc_acquire(), spsc_produce(), spsc_consume(), spsc_release() * - * @ingroup rtio_tests + * @ingroup tests */ -ZTEST(rtio_spsc, test_produce_consume_wrap_around) +ZTEST(spsc, test_produce_consume_wrap_around) { - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + SPSC_DEFINE(ezspsc, uint32_t, 4); for (int i = 0; i < 10; i++) { - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0"); for (int j = 0; j < 3; j++) { - uint32_t *entry = rtio_spsc_acquire(&ezspsc); + uint32_t *entry = spsc_acquire(&ezspsc); zassert_not_null(entry, "Acquire should succeed"); *entry = i * 3 + j; - rtio_spsc_produce(&ezspsc); + spsc_produce(&ezspsc); } - zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); + zassert_equal(spsc_consumable(&ezspsc), 3, "Consumables should be 3"); for (int k = 0; k < 3; k++) { - uint32_t *entry = rtio_spsc_consume(&ezspsc); + uint32_t *entry = spsc_consume(&ezspsc); zassert_not_null(entry, "Consume should succeed"); zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); - rtio_spsc_release(&ezspsc); + spsc_release(&ezspsc); } - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0"); } } @@ -107,28 +106,28 @@ ZTEST(rtio_spsc, test_produce_consume_wrap_around) * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough * to ensure integer wraps occur. */ -ZTEST(rtio_spsc, test_int_wrap_around) +ZTEST(spsc, test_int_wrap_around) { - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + SPSC_DEFINE(ezspsc, uint32_t, 4); ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); for (int j = 0; j < 3; j++) { - uint32_t *entry = rtio_spsc_acquire(&ezspsc); + uint32_t *entry = spsc_acquire(&ezspsc); zassert_not_null(entry, "Acquire should succeed"); *entry = j; - rtio_spsc_produce(&ezspsc); + spsc_produce(&ezspsc); } zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); for (int k = 0; k < 3; k++) { - uint32_t *entry = rtio_spsc_consume(&ezspsc); + uint32_t *entry = spsc_consume(&ezspsc); zassert_not_null(entry, "Consume should succeed"); zassert_equal(*entry, k, "Consume value should equal i*3+k"); - rtio_spsc_release(&ezspsc); + spsc_release(&ezspsc); } zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); @@ -137,14 +136,14 @@ ZTEST(rtio_spsc, test_int_wrap_around) #define MAX_RETRIES 5 #define SMP_ITERATIONS 100 -RTIO_SPSC_DEFINE(spsc, uint32_t, 4); +SPSC_DEFINE(spsc, uint32_t, 4); static void t1_consume(void *p1, void *p2, void *p3) { ARG_UNUSED(p2); ARG_UNUSED(p3); - struct rtio_spsc_spsc *ezspsc = p1; + struct spsc_spsc *ezspsc = p1; uint32_t retries = 0; uint32_t *val = NULL; @@ -152,11 +151,11 @@ static void t1_consume(void *p1, void *p2, void *p3) val = NULL; retries = 0; while (val == NULL && retries < MAX_RETRIES) { - val = rtio_spsc_consume(ezspsc); + val = spsc_consume(ezspsc); retries++; } if (val != NULL) { - rtio_spsc_release(ezspsc); + spsc_release(ezspsc); } else { k_yield(); } @@ -168,7 +167,7 @@ static void t2_produce(void *p1, void *p2, void *p3) ARG_UNUSED(p2); ARG_UNUSED(p3); - struct rtio_spsc_spsc *ezspsc = p1; + struct spsc_spsc *ezspsc = p1; uint32_t retries = 0; uint32_t *val = NULL; @@ -176,12 +175,12 @@ static void t2_produce(void *p1, void *p2, void *p3) val = NULL; retries = 0; while (val == NULL && retries < MAX_RETRIES) { - val = rtio_spsc_acquire(ezspsc); + val = spsc_acquire(ezspsc); retries++; } if (val != NULL) { *val = SMP_ITERATIONS; - rtio_spsc_produce(ezspsc); + spsc_produce(ezspsc); } else { k_yield(); } @@ -192,6 +191,13 @@ static void t2_produce(void *p1, void *p2, void *p3) #define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) #define THREADS_NUM 2 +struct thread_info { + k_tid_t tid; + int executed; + int priority; + int cpu_id; +}; + static struct thread_info tinfo[THREADS_NUM]; static struct k_thread tthread[THREADS_NUM]; static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); @@ -202,7 +208,7 @@ static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); * This can and should be validated on SMP machines where incoherent * memory could cause issues. */ -ZTEST(rtio_spsc, test_spsc_threaded) +ZTEST(spsc, test_spsc_threaded) { tinfo[0].tid = @@ -224,7 +230,7 @@ ZTEST(rtio_spsc, test_spsc_threaded) #define THROUGHPUT_ITERS 100000 -ZTEST(rtio_spsc, test_spsc_throughput) +ZTEST(spsc, test_spsc_throughput) { timing_t start_time, end_time; @@ -235,15 +241,19 @@ ZTEST(rtio_spsc, test_spsc_throughput) uint32_t *x, *y; + int key = irq_lock(); + for (int i = 0; i < THROUGHPUT_ITERS; i++) { - x = rtio_spsc_acquire(&spsc); + x = spsc_acquire(&spsc); *x = i; - rtio_spsc_produce(&spsc); + spsc_produce(&spsc); - y = rtio_spsc_consume(&spsc); - rtio_spsc_release(&spsc); + y = spsc_consume(&spsc); + spsc_release(&spsc); } + irq_unlock(key); + end_time = timing_counter_get(); uint64_t cycles = timing_cycles_get(&start_time, &end_time); @@ -253,11 +263,11 @@ ZTEST(rtio_spsc, test_spsc_throughput) THROUGHPUT_ITERS, ns/THROUGHPUT_ITERS); } -static void rtio_spsc_before(void *data) +static void spsc_before(void *data) { ARG_UNUSED(data); - rtio_spsc_reset(&spsc); + spsc_reset(&spsc); } -ZTEST_SUITE(rtio_spsc, NULL, NULL, rtio_spsc_before, NULL, NULL); +ZTEST_SUITE(spsc, NULL, NULL, spsc_before, NULL, NULL); diff --git a/tests/lib/lockfree/testcase.yaml b/tests/lib/lockfree/testcase.yaml new file mode 100644 index 00000000000..e541a6904a7 --- /dev/null +++ b/tests/lib/lockfree/testcase.yaml @@ -0,0 +1,7 @@ +tests: + libraries.lockfree: + platform_exclude: + - m5stack_core2 # renode times out + - m2gl025_miv # renode times out + tags: + - lockfree diff --git a/tests/subsys/rtio/rtio_api/CMakeLists.txt b/tests/subsys/rtio/rtio_api/CMakeLists.txt index 3ece5f0c51e..88d15b5f0da 100644 --- a/tests/subsys/rtio/rtio_api/CMakeLists.txt +++ b/tests/subsys/rtio/rtio_api/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.20.0) find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) project(rtio_api_test) -target_sources(app PRIVATE src/test_rtio_spsc.c src/test_rtio_mpsc.c src/test_rtio_api.c) +target_sources(app PRIVATE src/test_rtio_api.c) target_include_directories(app PRIVATE ${ZEPHYR_BASE}/include diff --git a/tests/subsys/rtio/rtio_api/src/rtio_api.h b/tests/subsys/rtio/rtio_api/src/rtio_api.h deleted file mode 100644 index df8d46ae1f7..00000000000 --- a/tests/subsys/rtio/rtio_api/src/rtio_api.h +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2023 Intel Corporation. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#ifndef ZEPHYR_TEST_RTIO_API_H_ -#define ZEPHYR_TEST_RTIO_API_H_ - -#include - -struct thread_info { - k_tid_t tid; - int executed; - int priority; - int cpu_id; -}; - -#endif /* ZEPHYR_TEST_RTIO_API_H_ */ diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h index f478ec695cc..fab099e7158 100644 --- a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -5,8 +5,8 @@ */ #include -#include #include +#include #include #ifndef RTIO_IODEV_TEST_H_ @@ -17,7 +17,7 @@ struct rtio_iodev_test_data { struct k_timer timer; /* Queue of requests */ - struct rtio_mpsc io_q; + struct mpsc io_q; /* Currently executing transaction */ struct rtio_iodev_sqe *txn_head; @@ -40,7 +40,7 @@ static void rtio_iodev_test_next(struct rtio_iodev_test_data *data, bool complet goto out; } - struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->io_q); + struct mpsc_node *next = mpsc_pop(&data->io_q); /* Nothing left to do, cleanup */ if (next == NULL) { @@ -110,8 +110,8 @@ static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) atomic_inc(&data->submit_count); - /* The only safe operation is enqueuing without a lock */ - rtio_mpsc_push(&data->io_q, &iodev_sqe->q); + /* The only safe operation is enqueuing */ + mpsc_push(&data->io_q, &iodev_sqe->q); rtio_iodev_test_next(data, false); } @@ -124,7 +124,7 @@ void rtio_iodev_test_init(struct rtio_iodev *test) { struct rtio_iodev_test_data *data = test->data; - rtio_mpsc_init(&data->io_q); + mpsc_init(&data->io_q); data->txn_head = NULL; data->txn_curr = NULL; k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index 7a92859eba1..d101f3bcc29 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -672,7 +672,6 @@ void rtio_callback_chaining_cb(struct rtio *r, const struct rtio_sqe *sqe, void */ void test_rtio_callback_chaining_(struct rtio *r) { - int res; int32_t userdata[4] = {0, 1, 2, 3}; int32_t ordering[4] = { -1, -1, -1, -1};