Browse Source

sys: Add a lockfree mpsc and spsc queues

Moves the rtio_ prefixed lockfree queues to sys alongside existing
mpsc/spsc pbuf, ringbuf, and similar queue-like data structures.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
pull/73830/head
Tom Burdick 2 years ago committed by Carles Cufí
parent
commit
d95caa51a4
  1. 2
      doc/kernel/data_structures/index.rst
  2. 14
      doc/kernel/data_structures/mpsc_lockfree.rst
  3. 12
      doc/kernel/data_structures/spsc_lockfree.rst
  4. 10
      doc/services/rtio/index.rst
  5. 10
      drivers/i2c/i2c_rtio.c
  6. 6
      drivers/spi/spi_mcux_lpspi.c
  7. 6
      drivers/spi/spi_sam.c
  8. 2
      include/zephyr/drivers/i2c/rtio.h
  9. 56
      include/zephyr/rtio/rtio.h
  10. 96
      include/zephyr/sys/mpsc_lockfree.h
  11. 136
      include/zephyr/sys/spsc_lockfree.h
  12. 6
      samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c
  13. 8
      subsys/rtio/rtio_executor.c
  14. 5
      subsys/rtio/rtio_init.c
  15. 4
      tests/lib/cpp/cxx/src/main.cpp
  16. 12
      tests/lib/lockfree/CMakeLists.txt
  17. 2
      tests/lib/lockfree/prj.conf
  18. 100
      tests/lib/lockfree/src/test_mpsc.c
  19. 112
      tests/lib/lockfree/src/test_spsc.c
  20. 7
      tests/lib/lockfree/testcase.yaml
  21. 2
      tests/subsys/rtio/rtio_api/CMakeLists.txt
  22. 19
      tests/subsys/rtio/rtio_api/src/rtio_api.h
  23. 12
      tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h
  24. 1
      tests/subsys/rtio/rtio_api/src/test_rtio_api.c

2
doc/kernel/data_structures/index.rst

@ -34,3 +34,5 @@ needed will be provided by the user.
spsc_pbuf.rst spsc_pbuf.rst
rbtree.rst rbtree.rst
ring_buffers.rst ring_buffers.rst
mpsc_lockfree.rst
spsc_lockfree.rst

14
doc/kernel/data_structures/mpsc_lockfree.rst

@ -0,0 +1,14 @@
.. _mpsc_lockfree:
Multi Producer Single Consumer Lock Free Queue
==============================================
A :dfn:`Multi Producer Single Consumer Lock Free Queue (MPSC)` is an lockfree
intrusive queue based on atomic pointer swaps as described by Dmitry Vyukov
at `1024cores <https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue>`_.
API Reference
*************
.. doxygengroup:: mpsc_lockfree

12
doc/kernel/data_structures/spsc_lockfree.rst

@ -0,0 +1,12 @@
.. _spsc_lockfree:
Single Producer Single Consumer Lock Free Queue
===============================================
A :dfn:`Single Producer Single Consumer Lock Free Queue (MPSC)` is a lock free
atomic ring buffer based queue.
API Reference
*************
.. doxygengroup:: spsc_lockfree

10
doc/services/rtio/index.rst

@ -216,13 +216,3 @@ API Reference
************* *************
.. doxygengroup:: rtio .. doxygengroup:: rtio
MPSC Lock-free Queue API
========================
.. doxygengroup:: rtio_mpsc
SPSC Lock-free Queue API
========================
.. doxygengroup:: rtio_spsc

10
drivers/i2c/i2c_rtio.c

@ -7,7 +7,7 @@
#include <zephyr/drivers/i2c.h> #include <zephyr/drivers/i2c.h>
#include <zephyr/drivers/i2c/rtio.h> #include <zephyr/drivers/i2c/rtio.h>
#include <zephyr/rtio/rtio.h> #include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_mpsc.h> #include <zephyr/sys/mpsc_lockfree.h>
#include <zephyr/sys/__assert.h> #include <zephyr/sys/__assert.h>
#define LOG_LEVEL CONFIG_I2C_LOG_LEVEL #define LOG_LEVEL CONFIG_I2C_LOG_LEVEL
@ -55,14 +55,14 @@ struct rtio_sqe *i2c_rtio_copy(struct rtio *r, struct rtio_iodev *iodev, const s
void i2c_rtio_init(struct i2c_rtio *ctx, const struct device *dev) void i2c_rtio_init(struct i2c_rtio *ctx, const struct device *dev)
{ {
k_sem_init(&ctx->lock, 1, 1); k_sem_init(&ctx->lock, 1, 1);
rtio_mpsc_init(&ctx->io_q); mpsc_init(&ctx->io_q);
ctx->txn_curr = NULL; ctx->txn_curr = NULL;
ctx->txn_head = NULL; ctx->txn_head = NULL;
ctx->dt_spec.bus = dev; ctx->dt_spec.bus = dev;
ctx->iodev.data = &ctx->dt_spec; ctx->iodev.data = &ctx->dt_spec;
ctx->iodev.api = &i2c_iodev_api; ctx->iodev.api = &i2c_iodev_api;
/* TODO drop the builtin submission queue? */ /* TODO drop the builtin submission queue? */
rtio_mpsc_init(&ctx->iodev.iodev_sq); mpsc_init(&ctx->iodev.iodev_sq);
} }
/** /**
@ -82,7 +82,7 @@ static bool i2c_rtio_next(struct i2c_rtio *ctx, bool completion)
return false; return false;
} }
struct rtio_mpsc_node *next = rtio_mpsc_pop(&ctx->io_q); struct mpsc_node *next = mpsc_pop(&ctx->io_q);
/* Nothing left to do */ /* Nothing left to do */
if (next == NULL) { if (next == NULL) {
@ -119,7 +119,7 @@ bool i2c_rtio_complete(struct i2c_rtio *ctx, int status)
} }
bool i2c_rtio_submit(struct i2c_rtio *ctx, struct rtio_iodev_sqe *iodev_sqe) bool i2c_rtio_submit(struct i2c_rtio *ctx, struct rtio_iodev_sqe *iodev_sqe)
{ {
rtio_mpsc_push(&ctx->io_q, &iodev_sqe->q); mpsc_push(&ctx->io_q, &iodev_sqe->q);
return i2c_rtio_next(ctx, false); return i2c_rtio_next(ctx, false);
} }

6
drivers/spi/spi_mcux_lpspi.c

@ -702,7 +702,7 @@ static int spi_mcux_init(const struct device *dev)
data->dt_spec.bus = dev; data->dt_spec.bus = dev;
data->iodev.api = &spi_iodev_api; data->iodev.api = &spi_iodev_api;
data->iodev.data = &data->dt_spec; data->iodev.data = &data->dt_spec;
rtio_mpsc_init(&data->iodev.iodev_sq); mpsc_init(&data->iodev.iodev_sq);
#endif #endif
err = pinctrl_apply_state(config->pincfg, PINCTRL_STATE_DEFAULT); err = pinctrl_apply_state(config->pincfg, PINCTRL_STATE_DEFAULT);
@ -803,7 +803,7 @@ static void spi_mcux_iodev_next(const struct device *dev, bool completion)
return; return;
} }
struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->iodev.iodev_sq); struct mpsc_node *next = mpsc_pop(&data->iodev.iodev_sq);
if (next != NULL) { if (next != NULL) {
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
@ -832,7 +832,7 @@ static void spi_mcux_iodev_submit(const struct device *dev,
{ {
struct spi_mcux_data *data = dev->data; struct spi_mcux_data *data = dev->data;
rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
spi_mcux_iodev_next(dev, false); spi_mcux_iodev_next(dev, false);
} }

6
drivers/spi/spi_sam.c

@ -691,7 +691,7 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion)
return; return;
} }
struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->iodev.iodev_sq); struct mpsc_node *next = mpsc_pop(&data->iodev.iodev_sq);
if (next != NULL) { if (next != NULL) {
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
@ -736,7 +736,7 @@ static void spi_sam_iodev_submit(const struct device *dev,
{ {
struct spi_sam_data *data = dev->data; struct spi_sam_data *data = dev->data;
rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
spi_sam_iodev_next(dev, false); spi_sam_iodev_next(dev, false);
} }
#endif #endif
@ -866,7 +866,7 @@ static int spi_sam_init(const struct device *dev)
data->dt_spec.bus = dev; data->dt_spec.bus = dev;
data->iodev.api = &spi_iodev_api; data->iodev.api = &spi_iodev_api;
data->iodev.data = &data->dt_spec; data->iodev.data = &data->dt_spec;
rtio_mpsc_init(&data->iodev.iodev_sq); mpsc_init(&data->iodev.iodev_sq);
#endif #endif
spi_context_unlock_unconditionally(&data->ctx); spi_context_unlock_unconditionally(&data->ctx);

2
include/zephyr/drivers/i2c/rtio.h

@ -22,7 +22,7 @@ struct i2c_rtio {
struct k_sem lock; struct k_sem lock;
struct k_spinlock slock; struct k_spinlock slock;
struct rtio *r; struct rtio *r;
struct rtio_mpsc io_q; struct mpsc io_q;
struct rtio_iodev iodev; struct rtio_iodev iodev;
struct rtio_iodev_sqe *txn_head; struct rtio_iodev_sqe *txn_head;
struct rtio_iodev_sqe *txn_curr; struct rtio_iodev_sqe *txn_curr;

56
include/zephyr/rtio/rtio.h

@ -31,12 +31,12 @@
#include <zephyr/app_memory/app_memdomain.h> #include <zephyr/app_memory/app_memdomain.h>
#include <zephyr/device.h> #include <zephyr/device.h>
#include <zephyr/kernel.h> #include <zephyr/kernel.h>
#include <zephyr/rtio/rtio_mpsc.h>
#include <zephyr/sys/__assert.h> #include <zephyr/sys/__assert.h>
#include <zephyr/sys/atomic.h> #include <zephyr/sys/atomic.h>
#include <zephyr/sys/mem_blocks.h> #include <zephyr/sys/mem_blocks.h>
#include <zephyr/sys/util.h> #include <zephyr/sys/util.h>
#include <zephyr/sys/iterable_sections.h> #include <zephyr/sys/iterable_sections.h>
#include <zephyr/sys/mpsc_lockfree.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -292,7 +292,7 @@ BUILD_ASSERT(sizeof(struct rtio_sqe) <= 64);
* @brief A completion queue event * @brief A completion queue event
*/ */
struct rtio_cqe { struct rtio_cqe {
struct rtio_mpsc_node q; struct mpsc_node q;
int32_t result; /**< Result from operation */ int32_t result; /**< Result from operation */
void *userdata; /**< Associated userdata with operation */ void *userdata; /**< Associated userdata with operation */
@ -300,14 +300,14 @@ struct rtio_cqe {
}; };
struct rtio_sqe_pool { struct rtio_sqe_pool {
struct rtio_mpsc free_q; struct mpsc free_q;
const uint16_t pool_size; const uint16_t pool_size;
uint16_t pool_free; uint16_t pool_free;
struct rtio_iodev_sqe *pool; struct rtio_iodev_sqe *pool;
}; };
struct rtio_cqe_pool { struct rtio_cqe_pool {
struct rtio_mpsc free_q; struct mpsc free_q;
const uint16_t pool_size; const uint16_t pool_size;
uint16_t pool_free; uint16_t pool_free;
struct rtio_cqe *pool; struct rtio_cqe *pool;
@ -362,10 +362,10 @@ struct rtio {
#endif #endif
/* Submission queue */ /* Submission queue */
struct rtio_mpsc sq; struct mpsc sq;
/* Completion queue */ /* Completion queue */
struct rtio_mpsc cq; struct mpsc cq;
}; };
/** The memory partition associated with all RTIO context information */ /** The memory partition associated with all RTIO context information */
@ -422,7 +422,7 @@ static inline uint16_t __rtio_compute_mempool_block_index(const struct rtio *r,
*/ */
struct rtio_iodev_sqe { struct rtio_iodev_sqe {
struct rtio_sqe sqe; struct rtio_sqe sqe;
struct rtio_mpsc_node q; struct mpsc_node q;
struct rtio_iodev_sqe *next; struct rtio_iodev_sqe *next;
struct rtio *r; struct rtio *r;
}; };
@ -450,7 +450,7 @@ struct rtio_iodev {
const struct rtio_iodev_api *api; const struct rtio_iodev_api *api;
/* Queue of RTIO contexts with requests */ /* Queue of RTIO contexts with requests */
struct rtio_mpsc iodev_sq; struct mpsc iodev_sq;
/* Data associated with this iodev */ /* Data associated with this iodev */
void *data; void *data;
@ -625,7 +625,7 @@ static inline void rtio_sqe_prep_transceive(struct rtio_sqe *sqe,
static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool) static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool)
{ {
struct rtio_mpsc_node *node = rtio_mpsc_pop(&pool->free_q); struct mpsc_node *node = mpsc_pop(&pool->free_q);
if (node == NULL) { if (node == NULL) {
return NULL; return NULL;
@ -640,14 +640,14 @@ static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *p
static inline void rtio_sqe_pool_free(struct rtio_sqe_pool *pool, struct rtio_iodev_sqe *iodev_sqe) static inline void rtio_sqe_pool_free(struct rtio_sqe_pool *pool, struct rtio_iodev_sqe *iodev_sqe)
{ {
rtio_mpsc_push(&pool->free_q, &iodev_sqe->q); mpsc_push(&pool->free_q, &iodev_sqe->q);
pool->pool_free++; pool->pool_free++;
} }
static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool) static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool)
{ {
struct rtio_mpsc_node *node = rtio_mpsc_pop(&pool->free_q); struct mpsc_node *node = mpsc_pop(&pool->free_q);
if (node == NULL) { if (node == NULL) {
return NULL; return NULL;
@ -664,7 +664,7 @@ static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool)
static inline void rtio_cqe_pool_free(struct rtio_cqe_pool *pool, struct rtio_cqe *cqe) static inline void rtio_cqe_pool_free(struct rtio_cqe_pool *pool, struct rtio_cqe *cqe)
{ {
rtio_mpsc_push(&pool->free_q, &cqe->q); mpsc_push(&pool->free_q, &cqe->q);
pool->pool_free++; pool->pool_free++;
} }
@ -732,14 +732,14 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_
#define RTIO_IODEV_DEFINE(name, iodev_api, iodev_data) \ #define RTIO_IODEV_DEFINE(name, iodev_api, iodev_data) \
STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \ STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \
.api = (iodev_api), \ .api = (iodev_api), \
.iodev_sq = RTIO_MPSC_INIT((name.iodev_sq)), \ .iodev_sq = MPSC_INIT((name.iodev_sq)), \
.data = (iodev_data), \ .data = (iodev_data), \
} }
#define Z_RTIO_SQE_POOL_DEFINE(name, sz) \ #define Z_RTIO_SQE_POOL_DEFINE(name, sz) \
static struct rtio_iodev_sqe CONCAT(_sqe_pool_, name)[sz]; \ static struct rtio_iodev_sqe CONCAT(_sqe_pool_, name)[sz]; \
STRUCT_SECTION_ITERABLE(rtio_sqe_pool, name) = { \ STRUCT_SECTION_ITERABLE(rtio_sqe_pool, name) = { \
.free_q = RTIO_MPSC_INIT((name.free_q)), \ .free_q = MPSC_INIT((name.free_q)), \
.pool_size = sz, \ .pool_size = sz, \
.pool_free = sz, \ .pool_free = sz, \
.pool = CONCAT(_sqe_pool_, name), \ .pool = CONCAT(_sqe_pool_, name), \
@ -749,7 +749,7 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_
#define Z_RTIO_CQE_POOL_DEFINE(name, sz) \ #define Z_RTIO_CQE_POOL_DEFINE(name, sz) \
static struct rtio_cqe CONCAT(_cqe_pool_, name)[sz]; \ static struct rtio_cqe CONCAT(_cqe_pool_, name)[sz]; \
STRUCT_SECTION_ITERABLE(rtio_cqe_pool, name) = { \ STRUCT_SECTION_ITERABLE(rtio_cqe_pool, name) = { \
.free_q = RTIO_MPSC_INIT((name.free_q)), \ .free_q = MPSC_INIT((name.free_q)), \
.pool_size = sz, \ .pool_size = sz, \
.pool_free = sz, \ .pool_free = sz, \
.pool = CONCAT(_cqe_pool_, name), \ .pool = CONCAT(_cqe_pool_, name), \
@ -797,8 +797,8 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_
.sqe_pool = _sqe_pool, \ .sqe_pool = _sqe_pool, \
.cqe_pool = _cqe_pool, \ .cqe_pool = _cqe_pool, \
IF_ENABLED(CONFIG_RTIO_SYS_MEM_BLOCKS, (.block_pool = _block_pool,)) \ IF_ENABLED(CONFIG_RTIO_SYS_MEM_BLOCKS, (.block_pool = _block_pool,)) \
.sq = RTIO_MPSC_INIT((name.sq)), \ .sq = MPSC_INIT((name.sq)), \
.cq = RTIO_MPSC_INIT((name.cq)), \ .cq = MPSC_INIT((name.cq)), \
} }
/** /**
@ -910,7 +910,7 @@ static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
return NULL; return NULL;
} }
rtio_mpsc_push(&r->sq, &iodev_sqe->q); mpsc_push(&r->sq, &iodev_sqe->q);
return &iodev_sqe->sqe; return &iodev_sqe->sqe;
} }
@ -923,12 +923,12 @@ static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
static inline void rtio_sqe_drop_all(struct rtio *r) static inline void rtio_sqe_drop_all(struct rtio *r)
{ {
struct rtio_iodev_sqe *iodev_sqe; struct rtio_iodev_sqe *iodev_sqe;
struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq); struct mpsc_node *node = mpsc_pop(&r->sq);
while (node != NULL) { while (node != NULL) {
iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
rtio_sqe_pool_free(r->sqe_pool, iodev_sqe); rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
node = rtio_mpsc_pop(&r->sq); node = mpsc_pop(&r->sq);
} }
} }
@ -953,7 +953,7 @@ static inline struct rtio_cqe *rtio_cqe_acquire(struct rtio *r)
*/ */
static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe) static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe)
{ {
rtio_mpsc_push(&r->cq, &cqe->q); mpsc_push(&r->cq, &cqe->q);
} }
/** /**
@ -969,7 +969,7 @@ static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe)
*/ */
static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
{ {
struct rtio_mpsc_node *node; struct mpsc_node *node;
struct rtio_cqe *cqe = NULL; struct rtio_cqe *cqe = NULL;
#ifdef CONFIG_RTIO_CONSUME_SEM #ifdef CONFIG_RTIO_CONSUME_SEM
@ -978,7 +978,7 @@ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
} }
#endif #endif
node = rtio_mpsc_pop(&r->cq); node = mpsc_pop(&r->cq);
if (node == NULL) { if (node == NULL) {
return NULL; return NULL;
} }
@ -999,16 +999,16 @@ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
*/ */
static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r) static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
{ {
struct rtio_mpsc_node *node; struct mpsc_node *node;
struct rtio_cqe *cqe; struct rtio_cqe *cqe;
#ifdef CONFIG_RTIO_CONSUME_SEM #ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_take(r->consume_sem, K_FOREVER); k_sem_take(r->consume_sem, K_FOREVER);
#endif #endif
node = rtio_mpsc_pop(&r->cq); node = mpsc_pop(&r->cq);
while (node == NULL) { while (node == NULL) {
Z_SPIN_DELAY(1); Z_SPIN_DELAY(1);
node = rtio_mpsc_pop(&r->cq); node = mpsc_pop(&r->cq);
} }
cqe = CONTAINER_OF(node, struct rtio_cqe, q); cqe = CONTAINER_OF(node, struct rtio_cqe, q);
@ -1136,13 +1136,13 @@ static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int resu
static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev) static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev)
{ {
/* Clear pending requests as -ENODATA */ /* Clear pending requests as -ENODATA */
struct rtio_mpsc_node *node = rtio_mpsc_pop(&iodev->iodev_sq); struct mpsc_node *node = mpsc_pop(&iodev->iodev_sq);
while (node != NULL) { while (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
rtio_iodev_sqe_err(iodev_sqe, -ECANCELED); rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
node = rtio_mpsc_pop(&iodev->iodev_sq); node = mpsc_pop(&iodev->iodev_sq);
} }
} }

96
include/zephyr/rtio/rtio_mpsc.h → include/zephyr/sys/mpsc_lockfree.h

@ -1,12 +1,12 @@
/* /*
* Copyright (c) 2010-2011 Dmitry Vyukov * Copyright (c) 2010-2011 Dmitry Vyukov
* Copyright (c) 2022 Intel Corporation * Copyright (c) 2023 Intel Corporation
* *
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
#ifndef ZEPHYR_RTIO_MPSC_H_ #ifndef ZEPHYR_SYS_MPSC_LOCKFREE_H_
#define ZEPHYR_RTIO_MPSC_H_ #define ZEPHYR_SYS_MPSC_LOCKFREE_H_
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
@ -18,12 +18,28 @@ extern "C" {
#endif #endif
/** /**
* @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API * @brief Multiple Producer Single Consumer (MPSC) Lockfree Queue API
* @defgroup rtio_mpsc RTIO MPSC API * @defgroup mpsc_lockfree MPSC Lockfree Queue API
* @ingroup rtio * @ingroup datastructure_apis
* @{ * @{
*/ */
/**
* @file mpsc_lockfree.h
*
* @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
* a singly linked list. Ordering is First-In-First-Out.
*
* Based on the well known and widely used wait-free MPSC queue described by
* Dmitry Vyukov with some slight changes to account for needs of an
* RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
* loop or lock is needed.
*
* An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
*
* @warning MPSC is *not* safe to consume in multiple execution contexts.
*/
/* /*
* On single core systems atomics are unnecessary * On single core systems atomics are unnecessary
* and cause a lot of unnecessary cache invalidation * and cause a lot of unnecessary cache invalidation
@ -35,17 +51,17 @@ extern "C" {
* are updated in the correct order and the values are * are updated in the correct order and the values are
* updated core caches correctly. * updated core caches correctly.
*/ */
#if defined(CONFIG_SMP) #if IS_ENABLED(CONFIG_SMP)
typedef atomic_ptr_t mpsc_ptr_t; typedef atomic_ptr_t mpsc_ptr_t;
#define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr)) #define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr))
#define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val) #define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val)
#define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val) #define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)
#else #else /* IS_ENABLED(CONFIG_SMP) */
typedef struct rtio_mpsc_node *mpsc_ptr_t; typedef struct mpsc_node *mpsc_ptr_t;
#define mpsc_ptr_get(ptr) ptr #define mpsc_ptr_get(ptr) ptr
#define mpsc_ptr_set(ptr, val) ptr = val #define mpsc_ptr_set(ptr, val) ptr = val
@ -56,38 +72,22 @@ typedef struct rtio_mpsc_node *mpsc_ptr_t;
tmp; \ tmp; \
}) })
#endif #endif /* IS_ENABLED(CONFIG_SMP) */
/**
* @file rtio_mpsc.h
*
* @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
* a singly linked list. Ordering is First-In-First-Out.
*
* Based on the well known and widely used wait-free MPSC queue described by
* Dmitry Vyukov with some slight changes to account for needs of an
* RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
* loop or lock is needed.
*
* An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
*
* @warning MPSC is *not* safe to consume in multiple execution contexts.
*/
/** /**
* @brief Queue member * @brief Queue member
*/ */
struct rtio_mpsc_node { struct mpsc_node {
mpsc_ptr_t next; mpsc_ptr_t next;
}; };
/** /**
* @brief MPSC Queue * @brief MPSC Queue
*/ */
struct rtio_mpsc { struct mpsc {
mpsc_ptr_t head; mpsc_ptr_t head;
struct rtio_mpsc_node *tail; struct mpsc_node *tail;
struct rtio_mpsc_node stub; struct mpsc_node stub;
}; };
/** /**
@ -97,10 +97,10 @@ struct rtio_mpsc {
* *
* @param symbol name of the queue * @param symbol name of the queue
*/ */
#define RTIO_MPSC_INIT(symbol) \ #define MPSC_INIT(symbol) \
{ \ { \
.head = (struct rtio_mpsc_node *)&symbol.stub, \ .head = (struct mpsc_node *)&symbol.stub, \
.tail = (struct rtio_mpsc_node *)&symbol.stub, \ .tail = (struct mpsc_node *)&symbol.stub, \
.stub = { \ .stub = { \
.next = NULL, \ .next = NULL, \
}, \ }, \
@ -111,7 +111,7 @@ struct rtio_mpsc {
* *
* @param q Queue to initialize or reset * @param q Queue to initialize or reset
*/ */
static inline void rtio_mpsc_init(struct rtio_mpsc *q) static inline void mpsc_init(struct mpsc *q)
{ {
mpsc_ptr_set(q->head, &q->stub); mpsc_ptr_set(q->head, &q->stub);
q->tail = &q->stub; q->tail = &q->stub;
@ -124,15 +124,15 @@ static inline void rtio_mpsc_init(struct rtio_mpsc *q)
* @param q Queue to push the node to * @param q Queue to push the node to
* @param n Node to push into the queue * @param n Node to push into the queue
*/ */
static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n) static ALWAYS_INLINE void mpsc_push(struct mpsc *q, struct mpsc_node *n)
{ {
struct rtio_mpsc_node *prev; struct mpsc_node *prev;
int key; int key;
mpsc_ptr_set(n->next, NULL); mpsc_ptr_set(n->next, NULL);
key = arch_irq_lock(); key = arch_irq_lock();
prev = (struct rtio_mpsc_node *)mpsc_ptr_set_get(q->head, n); prev = (struct mpsc_node *)mpsc_ptr_set_get(q->head, n);
mpsc_ptr_set(prev->next, n); mpsc_ptr_set(prev->next, n);
arch_irq_unlock(key); arch_irq_unlock(key);
} }
@ -143,11 +143,11 @@ static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_n
* @retval NULL When no node is available * @retval NULL When no node is available
* @retval node When node is available * @retval node When node is available
*/ */
static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) static inline struct mpsc_node *mpsc_pop(struct mpsc *q)
{ {
struct rtio_mpsc_node *head; struct mpsc_node *head;
struct rtio_mpsc_node *tail = q->tail; struct mpsc_node *tail = q->tail;
struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next); struct mpsc_node *next = (struct mpsc_node *)mpsc_ptr_get(tail->next);
/* Skip over the stub/sentinel */ /* Skip over the stub/sentinel */
if (tail == &q->stub) { if (tail == &q->stub) {
@ -157,7 +157,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
q->tail = next; q->tail = next;
tail = next; tail = next;
next = (struct rtio_mpsc_node *)mpsc_ptr_get(next->next); next = (struct mpsc_node *)mpsc_ptr_get(next->next);
} }
/* If next is non-NULL then a valid node is found, return it */ /* If next is non-NULL then a valid node is found, return it */
@ -166,7 +166,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
return tail; return tail;
} }
head = (struct rtio_mpsc_node *)mpsc_ptr_get(q->head); head = (struct mpsc_node *)mpsc_ptr_get(q->head);
/* If next is NULL, and the tail != HEAD then the queue has pending /* If next is NULL, and the tail != HEAD then the queue has pending
* updates that can't yet be accessed. * updates that can't yet be accessed.
@ -175,9 +175,9 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
return NULL; return NULL;
} }
rtio_mpsc_push(q, &q->stub); mpsc_push(q, &q->stub);
next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next); next = (struct mpsc_node *)mpsc_ptr_get(tail->next);
if (next != NULL) { if (next != NULL) {
q->tail = next; q->tail = next;
@ -195,4 +195,4 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
} }
#endif #endif
#endif /* ZEPHYR_RTIO_MPSC_H_ */ #endif /* ZEPHYR_SYS_MPSC_LOCKFREE_H_ */

136
include/zephyr/rtio/rtio_spsc.h → include/zephyr/sys/spsc_lockfree.h

@ -1,12 +1,11 @@
/* /*
* Copyright (c) 2022 Intel Corporation * Copyright (c) 2023 Intel Corporation
* *
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
#ifndef ZEPHYR_SYS_SPSC_LOCKFREE_H_
#ifndef ZEPHYR_RTIO_SPSC_H_ #define ZEPHYR_SYS_SPSC_LOCKFREE_H_
#define ZEPHYR_RTIO_SPSC_H_
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
@ -15,14 +14,14 @@
#include <zephyr/sys/util_macro.h> #include <zephyr/sys/util_macro.h>
/** /**
* @brief RTIO Single Producer Single Consumer (SPSC) Queue API * @brief Single Producer Single Consumer (SPSC) Lockfree Queue API
* @defgroup rtio_spsc RTIO SPSC API * @defgroup spsc_lockfree SPSC API
* @ingroup rtio * @ingroup datastructure_apis
* @{ * @{
*/ */
/** /**
* @file rtio_spsc.h * @file spsc_lockfree.h
* *
* @brief A lock-free and type safe power of 2 fixed sized single producer * @brief A lock-free and type safe power of 2 fixed sized single producer
* single consumer (SPSC) queue using a ringbuffer and atomics to ensure * single consumer (SPSC) queue using a ringbuffer and atomics to ensure
@ -57,7 +56,7 @@
* *
* @warning Not to be manipulated without the macros! * @warning Not to be manipulated without the macros!
*/ */
struct rtio_spsc { struct spsc {
/* private value only the producer thread should mutate */ /* private value only the producer thread should mutate */
unsigned long acquire; unsigned long acquire;
@ -75,53 +74,54 @@ struct rtio_spsc {
}; };
/** /**
* @brief Statically initialize an rtio_spsc * @brief Statically initialize an spsc
* *
* @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
* @param buf Buffer pointer * @param buf Buffer pointer
*/ */
#define RTIO_SPSC_INITIALIZER(sz, buf) \ #define SPSC_INITIALIZER(sz, buf) \
{ \ { \
._spsc = { \ ._spsc = \
.acquire = 0, \ { \
.consume = 0, \ .acquire = 0, \
.in = ATOMIC_INIT(0), \ .consume = 0, \
.out = ATOMIC_INIT(0), \ .in = ATOMIC_INIT(0), \
.mask = sz - 1, \ .out = ATOMIC_INIT(0), \
}, \ .mask = sz - 1, \
.buffer = buf, \ }, \
.buffer = buf, \
} }
/** /**
* @brief Declare an anonymous struct type for an rtio_spsc * @brief Declare an anonymous struct type for an spsc
* *
* @param name Name of the spsc symbol to be provided * @param name Name of the spsc symbol to be provided
* @param type Type stored in the spsc * @param type Type stored in the spsc
*/ */
#define RTIO_SPSC_DECLARE(name, type) \ #define SPSC_DECLARE(name, type) \
static struct rtio_spsc_##name { \ static struct spsc_##name { \
struct rtio_spsc _spsc; \ struct spsc _spsc; \
type * const buffer; \ type * const buffer; \
} }
/** /**
* @brief Define an rtio_spsc with a fixed size * @brief Define an spsc with a fixed size
* *
* @param name Name of the spsc symbol to be provided * @param name Name of the spsc symbol to be provided
* @param type Type stored in the spsc * @param type Type stored in the spsc
* @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
*/ */
#define RTIO_SPSC_DEFINE(name, type, sz) \ #define SPSC_DEFINE(name, type, sz) \
BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \ BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \
static type __spsc_buf_##name[sz]; \ static type __spsc_buf_##name[sz]; \
RTIO_SPSC_DECLARE(name, type) name = RTIO_SPSC_INITIALIZER(sz, __spsc_buf_##name); SPSC_DECLARE(name, type) name = SPSC_INITIALIZER(sz, __spsc_buf_##name);
/** /**
* @brief Size of the SPSC queue * @brief Size of the SPSC queue
* *
* @param spsc SPSC reference * @param spsc SPSC reference
*/ */
#define rtio_spsc_size(spsc) ((spsc)->_spsc.mask + 1) #define spsc_size(spsc) ((spsc)->_spsc.mask + 1)
/** /**
* @private * @private
@ -130,20 +130,19 @@ struct rtio_spsc {
* @param spsc SPSC reference * @param spsc SPSC reference
* @param i Value to modulo to the size of the spsc * @param i Value to modulo to the size of the spsc
*/ */
#define z_rtio_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask) #define z_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask)
/** /**
* @private * @private
* @brief Load the current "in" index from the spsc as an unsigned long * @brief Load the current "in" index from the spsc as an unsigned long
*/ */
#define z_rtio_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in) #define z_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in)
/** /**
* @private * @private
* @brief Load the current "out" index from the spsc as an unsigned long * @brief Load the current "out" index from the spsc as an unsigned long
*/ */
#define z_rtio_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out) #define z_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out)
/** /**
* @brief Initialize/reset a spsc such that its empty * @brief Initialize/reset a spsc such that its empty
@ -153,7 +152,7 @@ struct rtio_spsc {
* *
* @param spsc SPSC to initialize/reset * @param spsc SPSC to initialize/reset
*/ */
#define rtio_spsc_reset(spsc) \ #define spsc_reset(spsc) \
({ \ ({ \
(spsc)->_spsc.consume = 0; \ (spsc)->_spsc.consume = 0; \
(spsc)->_spsc.acquire = 0; \ (spsc)->_spsc.acquire = 0; \
@ -168,14 +167,14 @@ struct rtio_spsc {
* *
* @return A pointer to the acquired element or null if the spsc is full * @return A pointer to the acquired element or null if the spsc is full
*/ */
#define rtio_spsc_acquire(spsc) \ #define spsc_acquire(spsc) \
({ \ ({ \
unsigned long idx = z_rtio_spsc_in(spsc) + (spsc)->_spsc.acquire; \ unsigned long idx = z_spsc_in(spsc) + (spsc)->_spsc.acquire; \
bool spsc_acq = (idx - z_rtio_spsc_out(spsc)) < rtio_spsc_size(spsc); \ bool spsc_acq = (idx - z_spsc_out(spsc)) < spsc_size(spsc); \
if (spsc_acq) { \ if (spsc_acq) { \
(spsc)->_spsc.acquire += 1; \ (spsc)->_spsc.acquire += 1; \
} \ } \
spsc_acq ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ spsc_acq ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \
}) })
/** /**
@ -185,7 +184,7 @@ struct rtio_spsc {
* *
* @param spsc SPSC to produce the previously acquired element or do nothing * @param spsc SPSC to produce the previously acquired element or do nothing
*/ */
#define rtio_spsc_produce(spsc) \ #define spsc_produce(spsc) \
({ \ ({ \
if ((spsc)->_spsc.acquire > 0) { \ if ((spsc)->_spsc.acquire > 0) { \
(spsc)->_spsc.acquire -= 1; \ (spsc)->_spsc.acquire -= 1; \
@ -201,7 +200,7 @@ struct rtio_spsc {
* *
* @param spsc SPSC to produce all previously acquired elements or do nothing * @param spsc SPSC to produce all previously acquired elements or do nothing
*/ */
#define rtio_spsc_produce_all(spsc) \ #define spsc_produce_all(spsc) \
({ \ ({ \
if ((spsc)->_spsc.acquire > 0) { \ if ((spsc)->_spsc.acquire > 0) { \
unsigned long acquired = (spsc)->_spsc.acquire; \ unsigned long acquired = (spsc)->_spsc.acquire; \
@ -217,9 +216,9 @@ struct rtio_spsc {
* *
* @param spsc SPSC to drop all previously acquired elements or do nothing * @param spsc SPSC to drop all previously acquired elements or do nothing
*/ */
#define rtio_spsc_drop_all(spsc) \ #define spsc_drop_all(spsc) \
do { \ do { \
(spsc)->_spsc.acquire = 0; \ (spsc)->_spsc.acquire = 0; \
} while (false) } while (false)
/** /**
@ -229,14 +228,14 @@ struct rtio_spsc {
* *
* @return Pointer to element or null if no consumable elements left * @return Pointer to element or null if no consumable elements left
*/ */
#define rtio_spsc_consume(spsc) \ #define spsc_consume(spsc) \
({ \ ({ \
unsigned long idx = z_rtio_spsc_out(spsc) + (spsc)->_spsc.consume; \ unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != z_rtio_spsc_in(spsc)); \ bool has_consumable = (idx != z_spsc_in(spsc)); \
if (has_consumable) { \ if (has_consumable) { \
(spsc)->_spsc.consume += 1; \ (spsc)->_spsc.consume += 1; \
} \ } \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \
}) })
/** /**
@ -244,7 +243,7 @@ struct rtio_spsc {
* *
* @param spsc SPSC to release consumed element or do nothing * @param spsc SPSC to release consumed element or do nothing
*/ */
#define rtio_spsc_release(spsc) \ #define spsc_release(spsc) \
({ \ ({ \
if ((spsc)->_spsc.consume > 0) { \ if ((spsc)->_spsc.consume > 0) { \
(spsc)->_spsc.consume -= 1; \ (spsc)->_spsc.consume -= 1; \
@ -252,13 +251,12 @@ struct rtio_spsc {
} \ } \
}) })
/** /**
* @brief Release all consumed elements * @brief Release all consumed elements
* *
* @param spsc SPSC to release consumed elements or do nothing * @param spsc SPSC to release consumed elements or do nothing
*/ */
#define rtio_spsc_release_all(spsc) \ #define spsc_release_all(spsc) \
({ \ ({ \
if ((spsc)->_spsc.consume > 0) { \ if ((spsc)->_spsc.consume > 0) { \
unsigned long consumed = (spsc)->_spsc.consume; \ unsigned long consumed = (spsc)->_spsc.consume; \
@ -272,19 +270,15 @@ struct rtio_spsc {
* *
* @param spsc SPSC to get item count for * @param spsc SPSC to get item count for
*/ */
#define rtio_spsc_acquirable(spsc) \ #define spsc_acquirable(spsc) \
({ \ ({ (((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - spsc_size(spsc); })
(((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - \
rtio_spsc_size(spsc); \
})
/** /**
* @brief Count of consumables in spsc * @brief Count of consumables in spsc
* *
* @param spsc SPSC to get item count for * @param spsc SPSC to get item count for
*/ */
#define rtio_spsc_consumable(spsc) \ #define spsc_consumable(spsc) ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })
({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })
/** /**
* @brief Peek at the first available item in queue * @brief Peek at the first available item in queue
@ -293,11 +287,11 @@ struct rtio_spsc {
* *
* @return Pointer to element or null if no consumable elements left * @return Pointer to element or null if no consumable elements left
*/ */
#define rtio_spsc_peek(spsc) \ #define spsc_peek(spsc) \
({ \ ({ \
unsigned long idx = z_rtio_spsc_out(spsc) + (spsc)->_spsc.consume; \ unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != z_rtio_spsc_in(spsc)); \ bool has_consumable = (idx != z_spsc_in(spsc)); \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \
}) })
/** /**
@ -309,12 +303,12 @@ struct rtio_spsc {
* *
* @return Pointer to element or null if none left * @return Pointer to element or null if none left
*/ */
#define rtio_spsc_next(spsc, item) \ #define spsc_next(spsc, item) \
({ \ ({ \
unsigned long idx = ((item) - (spsc)->buffer); \ unsigned long idx = ((item) - (spsc)->buffer); \
bool has_next = z_rtio_spsc_mask(spsc, (idx + 1)) != \ bool has_next = \
(z_rtio_spsc_mask(spsc, z_rtio_spsc_in(spsc))); \ z_spsc_mask(spsc, (idx + 1)) != (z_spsc_mask(spsc, z_spsc_in(spsc))); \
has_next ? &((spsc)->buffer[z_rtio_spsc_mask((spsc), idx + 1)]) : NULL; \ has_next ? &((spsc)->buffer[z_spsc_mask((spsc), idx + 1)]) : NULL; \
}) })
/** /**
@ -325,15 +319,15 @@ struct rtio_spsc {
* *
* @return Pointer to element or null if none left * @return Pointer to element or null if none left
*/ */
#define rtio_spsc_prev(spsc, item) \ #define spsc_prev(spsc, item) \
({ \ ({ \
unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \ unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \
bool has_prev = idx != z_rtio_spsc_mask(spsc, z_rtio_spsc_out(spsc)); \ bool has_prev = idx != z_spsc_mask(spsc, z_spsc_out(spsc)); \
has_prev ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx - 1)]) : NULL; \ has_prev ? &((spsc)->buffer[z_spsc_mask(spsc, idx - 1)]) : NULL; \
}) })
/** /**
* @} * @}
*/ */
#endif /* ZEPHYR_RTIO_SPSC_H_ */ #endif /* ZEPHYR_SYS_SPSC_LOCKFREE_H_ */

6
samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c

@ -83,13 +83,13 @@ static void vnd_sensor_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
{ {
struct vnd_sensor_data *data = (struct vnd_sensor_data *) iodev_sqe->sqe.iodev; struct vnd_sensor_data *data = (struct vnd_sensor_data *) iodev_sqe->sqe.iodev;
rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
} }
static void vnd_sensor_handle_int(const struct device *dev) static void vnd_sensor_handle_int(const struct device *dev)
{ {
struct vnd_sensor_data *data = dev->data; struct vnd_sensor_data *data = dev->data;
struct rtio_mpsc_node *node = rtio_mpsc_pop(&data->iodev.iodev_sq); struct mpsc_node *node = mpsc_pop(&data->iodev.iodev_sq);
if (node != NULL) { if (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
@ -116,7 +116,7 @@ static int vnd_sensor_init(const struct device *dev)
data->dev = dev; data->dev = dev;
rtio_mpsc_init(&data->iodev.iodev_sq); mpsc_init(&data->iodev.iodev_sq);
k_timer_init(&data->timer, vnd_sensor_timer_expiry, NULL); k_timer_init(&data->timer, vnd_sensor_timer_expiry, NULL);

8
subsys/rtio/rtio_executor.c

@ -61,7 +61,7 @@ static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
void rtio_executor_submit(struct rtio *r) void rtio_executor_submit(struct rtio *r)
{ {
const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE); const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq); struct mpsc_node *node = mpsc_pop(&r->sq);
while (node != NULL) { while (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
@ -83,7 +83,7 @@ void rtio_executor_submit(struct rtio *r)
__ASSERT(transaction != chained, __ASSERT(transaction != chained,
"Expected chained or transaction flag, not both"); "Expected chained or transaction flag, not both");
#endif #endif
node = rtio_mpsc_pop(&iodev_sqe->r->sq); node = mpsc_pop(&iodev_sqe->r->sq);
next = CONTAINER_OF(node, struct rtio_iodev_sqe, q); next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
/* If the current submission was cancelled before submit, /* If the current submission was cancelled before submit,
@ -106,7 +106,7 @@ void rtio_executor_submit(struct rtio *r)
rtio_iodev_submit(iodev_sqe); rtio_iodev_submit(iodev_sqe);
node = rtio_mpsc_pop(&r->sq); node = mpsc_pop(&r->sq);
} }
} }
@ -134,7 +134,7 @@ static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_io
} }
if (!is_canceled) { if (!is_canceled) {
/* Request was not canceled, put the SQE back in the queue */ /* Request was not canceled, put the SQE back in the queue */
rtio_mpsc_push(&r->sq, &curr->q); mpsc_push(&r->sq, &curr->q);
rtio_executor_submit(r); rtio_executor_submit(r);
} }
} }

5
subsys/rtio/rtio_init.c

@ -5,6 +5,7 @@
#include <zephyr/init.h> #include <zephyr/init.h>
#include <zephyr/rtio/rtio.h> #include <zephyr/rtio/rtio.h>
#include <zephyr/sys/mpsc_lockfree.h>
#include <zephyr/sys/util.h> #include <zephyr/sys/util.h>
#include <zephyr/app_memory/app_memdomain.h> #include <zephyr/app_memory/app_memdomain.h>
#include <zephyr/sys/iterable_sections.h> #include <zephyr/sys/iterable_sections.h>
@ -17,13 +18,13 @@ int rtio_init(void)
{ {
STRUCT_SECTION_FOREACH(rtio_sqe_pool, sqe_pool) { STRUCT_SECTION_FOREACH(rtio_sqe_pool, sqe_pool) {
for (int i = 0; i < sqe_pool->pool_size; i++) { for (int i = 0; i < sqe_pool->pool_size; i++) {
rtio_mpsc_push(&sqe_pool->free_q, &sqe_pool->pool[i].q); mpsc_push(&sqe_pool->free_q, &sqe_pool->pool[i].q);
} }
} }
STRUCT_SECTION_FOREACH(rtio_cqe_pool, cqe_pool) { STRUCT_SECTION_FOREACH(rtio_cqe_pool, cqe_pool) {
for (int i = 0; i < cqe_pool->pool_size; i++) { for (int i = 0; i < cqe_pool->pool_size; i++) {
rtio_mpsc_push(&cqe_pool->free_q, &cqe_pool->pool[i].q); mpsc_push(&cqe_pool->free_q, &cqe_pool->pool[i].q);
} }
} }

4
tests/lib/cpp/cxx/src/main.cpp

@ -79,8 +79,8 @@
/* Add RTIO headers to make sure they're CXX compatible */ /* Add RTIO headers to make sure they're CXX compatible */
#include <zephyr/rtio/rtio.h> #include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_spsc.h> #include <zephyr/sys/spsc_lockfree.h>
#include <zephyr/rtio/rtio_mpsc.h> #include <zephyr/sys/mpsc_lockfree.h>
#include <zephyr/ztest.h> #include <zephyr/ztest.h>

12
tests/lib/lockfree/CMakeLists.txt

@ -0,0 +1,12 @@
# Copyright (c) 2023 Intel Corporation.
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(lockfree_test)
target_sources(app PRIVATE src/test_spsc.c src/test_mpsc.c)
target_include_directories(app PRIVATE
${ZEPHYR_BASE}/include
${ZEPHYR_BASE}/arch/${ARCH}/include)

2
tests/lib/lockfree/prj.conf

@ -0,0 +1,2 @@
CONFIG_ZTEST=y
CONFIG_TIMING_FUNCTIONS=y

100
tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c → tests/lib/lockfree/src/test_mpsc.c

@ -4,32 +4,31 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
#include "zephyr/irq.h"
#include <zephyr/ztest.h> #include <zephyr/ztest.h>
#include <zephyr/kernel.h> #include <zephyr/kernel.h>
#include <zephyr/sys/util_loops.h> #include <zephyr/sys/util_loops.h>
#include <zephyr/timing/timing.h> #include <zephyr/timing/timing.h>
#include <zephyr/rtio/rtio_spsc.h> #include <zephyr/sys/spsc_lockfree.h>
#include <zephyr/rtio/rtio_mpsc.h> #include <zephyr/sys/mpsc_lockfree.h>
#include "rtio_api.h" static struct mpsc push_pop_q;
static struct mpsc_node push_pop_nodes[2];
static struct rtio_mpsc push_pop_q;
static struct rtio_mpsc_node push_pop_nodes[2];
/* /*
* @brief Push and pop one element * @brief Push and pop one element
* *
* @see rtio_mpsc_push(), rtio_mpsc_pop() * @see mpsc_push(), mpsc_pop()
* *
* @ingroup rtio_tests * @ingroup tests
*/ */
ZTEST(rtio_mpsc, test_push_pop) ZTEST(mpsc, test_push_pop)
{ {
mpsc_ptr_t node, head; mpsc_ptr_t node, head;
struct rtio_mpsc_node *stub, *next, *tail; struct mpsc_node *stub, *next, *tail;
rtio_mpsc_init(&push_pop_q); mpsc_init(&push_pop_q);
head = mpsc_ptr_get(push_pop_q.head); head = mpsc_ptr_get(push_pop_q.head);
tail = push_pop_q.tail; tail = push_pop_q.tail;
@ -40,10 +39,10 @@ ZTEST(rtio_mpsc, test_push_pop)
zassert_equal(tail, stub, "Tail should point at stub"); zassert_equal(tail, stub, "Tail should point at stub");
zassert_is_null(next, "Next should be null"); zassert_is_null(next, "Next should be null");
node = rtio_mpsc_pop(&push_pop_q); node = mpsc_pop(&push_pop_q);
zassert_is_null(node, "Pop on empty queue should return null"); zassert_is_null(node, "Pop on empty queue should return null");
rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); mpsc_push(&push_pop_q, &push_pop_nodes[0]);
head = mpsc_ptr_get(push_pop_q.head); head = mpsc_ptr_get(push_pop_q.head);
@ -56,7 +55,7 @@ ZTEST(rtio_mpsc, test_push_pop)
stub = &push_pop_q.stub; stub = &push_pop_q.stub;
zassert_equal(tail, stub, "Tail should point at stub"); zassert_equal(tail, stub, "Tail should point at stub");
node = rtio_mpsc_pop(&push_pop_q); node = mpsc_pop(&push_pop_q);
stub = &push_pop_q.stub; stub = &push_pop_q.stub;
zassert_not_equal(node, stub, "Pop should not return stub"); zassert_not_equal(node, stub, "Pop should not return stub");
@ -65,7 +64,7 @@ ZTEST(rtio_mpsc, test_push_pop)
"Pop should return push_pop_node %p, instead was %p", "Pop should return push_pop_node %p, instead was %p",
&push_pop_nodes[0], node); &push_pop_nodes[0], node);
node = rtio_mpsc_pop(&push_pop_q); node = mpsc_pop(&push_pop_q);
zassert_is_null(node, "Pop on empty queue should return null"); zassert_is_null(node, "Pop on empty queue should return null");
} }
@ -74,31 +73,38 @@ ZTEST(rtio_mpsc, test_push_pop)
#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) #define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE)
#define MPSC_THREADS_NUM 4 #define MPSC_THREADS_NUM 4
struct thread_info {
k_tid_t tid;
int executed;
int priority;
int cpu_id;
};
static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM];
static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; static struct k_thread mpsc_thread[MPSC_THREADS_NUM];
static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE);
struct mpsc_node { struct test_mpsc_node {
uint32_t id; uint32_t id;
struct rtio_mpsc_node n; struct mpsc_node n;
}; };
struct rtio_spsc_node_sq { struct spsc_node_sq {
struct rtio_spsc _spsc; struct spsc _spsc;
struct mpsc_node *const buffer; struct test_mpsc_node *const buffer;
}; };
#define SPSC_DEFINE(n, sz) RTIO_SPSC_DEFINE(_spsc_##n, struct mpsc_node, sz) #define TEST_SPSC_DEFINE(n, sz) SPSC_DEFINE(_spsc_##n, struct test_mpsc_node, sz)
#define SPSC_NAME(n, _) (struct rtio_spsc_node_sq *)&_spsc_##n #define SPSC_NAME(n, _) (struct spsc_node_sq *)&_spsc_##n
LISTIFY(MPSC_THREADS_NUM, SPSC_DEFINE, (;), MPSC_FREEQ_SZ) LISTIFY(MPSC_THREADS_NUM, TEST_SPSC_DEFINE, (;), MPSC_FREEQ_SZ)
struct rtio_spsc_node_sq *node_q[MPSC_THREADS_NUM] = { struct spsc_node_sq *node_q[MPSC_THREADS_NUM] = {
LISTIFY(MPSC_THREADS_NUM, SPSC_NAME, (,)) LISTIFY(MPSC_THREADS_NUM, SPSC_NAME, (,))
}; };
static struct rtio_mpsc mpsc_q; static struct mpsc mpsc_q;
static void mpsc_consumer(void *p1, void *p2, void *p3) static void mpsc_consumer(void *p1, void *p2, void *p3)
{ {
@ -106,12 +112,12 @@ static void mpsc_consumer(void *p1, void *p2, void *p3)
ARG_UNUSED(p2); ARG_UNUSED(p2);
ARG_UNUSED(p3); ARG_UNUSED(p3);
struct rtio_mpsc_node *n; struct mpsc_node *n;
struct mpsc_node *nn; struct test_mpsc_node *nn;
for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) {
do { do {
n = rtio_mpsc_pop(&mpsc_q); n = mpsc_pop(&mpsc_q);
if (n == NULL) { if (n == NULL) {
k_yield(); k_yield();
} }
@ -119,10 +125,10 @@ static void mpsc_consumer(void *p1, void *p2, void *p3)
zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub");
nn = CONTAINER_OF(n, struct mpsc_node, n); nn = CONTAINER_OF(n, struct test_mpsc_node, n);
rtio_spsc_acquire(node_q[nn->id]); spsc_acquire(node_q[nn->id]);
rtio_spsc_produce(node_q[nn->id]); spsc_produce(node_q[nn->id]);
} }
} }
@ -132,20 +138,20 @@ static void mpsc_producer(void *p1, void *p2, void *p3)
ARG_UNUSED(p2); ARG_UNUSED(p2);
ARG_UNUSED(p3); ARG_UNUSED(p3);
struct mpsc_node *n; struct test_mpsc_node *n;
uint32_t id = (uint32_t)(uintptr_t)p1; uint32_t id = (uint32_t)(uintptr_t)p1;
for (int i = 0; i < MPSC_ITERATIONS; i++) { for (int i = 0; i < MPSC_ITERATIONS; i++) {
do { do {
n = rtio_spsc_consume(node_q[id]); n = spsc_consume(node_q[id]);
if (n == NULL) { if (n == NULL) {
k_yield(); k_yield();
} }
} while (n == NULL); } while (n == NULL);
rtio_spsc_release(node_q[id]); spsc_release(node_q[id]);
n->id = id; n->id = id;
rtio_mpsc_push(&mpsc_q, &n->n); mpsc_push(&mpsc_q, &n->n);
} }
} }
@ -155,17 +161,17 @@ static void mpsc_producer(void *p1, void *p2, void *p3)
* This can and should be validated on SMP machines where incoherent * This can and should be validated on SMP machines where incoherent
* memory could cause issues. * memory could cause issues.
*/ */
ZTEST(rtio_mpsc, test_mpsc_threaded) ZTEST(mpsc, test_mpsc_threaded)
{ {
rtio_mpsc_init(&mpsc_q); mpsc_init(&mpsc_q);
TC_PRINT("setting up mpsc producer free queues\n"); TC_PRINT("setting up mpsc producer free queues\n");
/* Setup node free queues */ /* Setup node free queues */
for (int i = 0; i < MPSC_THREADS_NUM; i++) { for (int i = 0; i < MPSC_THREADS_NUM; i++) {
for (int j = 0; j < MPSC_FREEQ_SZ; j++) { for (int j = 0; j < MPSC_FREEQ_SZ; j++) {
rtio_spsc_acquire(node_q[i]); spsc_acquire(node_q[i]);
} }
rtio_spsc_produce_all(node_q[i]); spsc_produce_all(node_q[i]);
} }
TC_PRINT("starting consumer\n"); TC_PRINT("starting consumer\n");
@ -194,23 +200,27 @@ ZTEST(rtio_mpsc, test_mpsc_threaded)
#define THROUGHPUT_ITERS 100000 #define THROUGHPUT_ITERS 100000
ZTEST(rtio_mpsc, test_mpsc_throughput) ZTEST(mpsc, test_mpsc_throughput)
{ {
struct rtio_mpsc_node node; struct mpsc_node node;
timing_t start_time, end_time; timing_t start_time, end_time;
rtio_mpsc_init(&mpsc_q); mpsc_init(&mpsc_q);
timing_init(); timing_init();
timing_start(); timing_start();
start_time = timing_counter_get(); start_time = timing_counter_get();
int key = irq_lock();
for (int i = 0; i < THROUGHPUT_ITERS; i++) { for (int i = 0; i < THROUGHPUT_ITERS; i++) {
rtio_mpsc_push(&mpsc_q, &node); mpsc_push(&mpsc_q, &node);
rtio_mpsc_pop(&mpsc_q); mpsc_pop(&mpsc_q);
} }
irq_unlock(key);
end_time = timing_counter_get(); end_time = timing_counter_get();
uint64_t cycles = timing_cycles_get(&start_time, &end_time); uint64_t cycles = timing_cycles_get(&start_time, &end_time);
@ -220,4 +230,4 @@ ZTEST(rtio_mpsc, test_mpsc_throughput)
THROUGHPUT_ITERS, ns/THROUGHPUT_ITERS); THROUGHPUT_ITERS, ns/THROUGHPUT_ITERS);
} }
ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); ZTEST_SUITE(mpsc, NULL, NULL, NULL, NULL, NULL);

112
tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c → tests/lib/lockfree/src/test_spsc.c

@ -6,61 +6,60 @@
#include <zephyr/ztest.h> #include <zephyr/ztest.h>
#include <zephyr/timing/timing.h> #include <zephyr/timing/timing.h>
#include <zephyr/rtio/rtio_spsc.h> #include <zephyr/sys/spsc_lockfree.h>
#include "rtio_api.h"
/* /*
* @brief Produce and Consume a single uint32_t in the same execution context * @brief Produce and Consume a single uint32_t in the same execution context
* *
* @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() * @see spsc_acquire(), spsc_produce(), spsc_consume(), spsc_release()
* *
* @ingroup rtio_tests * @ingroup tests
*/ */
ZTEST(rtio_spsc, test_produce_consume_size1) ZTEST(spsc, test_produce_consume_size1)
{ {
RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); SPSC_DEFINE(ezspsc, uint32_t, 1);
const uint32_t magic = 43219876; const uint32_t magic = 43219876;
uint32_t *acq = rtio_spsc_acquire(&ezspsc); uint32_t *acq = spsc_acquire(&ezspsc);
zassert_not_null(acq, "Acquire should succeed"); zassert_not_null(acq, "Acquire should succeed");
*acq = magic; *acq = magic;
uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); uint32_t *acq2 = spsc_acquire(&ezspsc);
zassert_is_null(acq2, "Acquire should fail"); zassert_is_null(acq2, "Acquire should fail");
uint32_t *cons = rtio_spsc_consume(&ezspsc); uint32_t *cons = spsc_consume(&ezspsc);
zassert_is_null(cons, "Consume should fail"); zassert_is_null(cons, "Consume should fail");
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0");
rtio_spsc_produce(&ezspsc); spsc_produce(&ezspsc);
zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); zassert_equal(spsc_consumable(&ezspsc), 1, "Consumables should be 1");
uint32_t *cons2 = rtio_spsc_consume(&ezspsc); uint32_t *cons2 = spsc_consume(&ezspsc);
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0");
zassert_not_null(cons2, "Consume should not fail"); zassert_not_null(cons2, "Consume should not fail");
zassert_equal(*cons2, magic, "Consume value should equal magic"); zassert_equal(*cons2, magic, "Consume value should equal magic");
uint32_t *cons3 = rtio_spsc_consume(&ezspsc); uint32_t *cons3 = spsc_consume(&ezspsc);
zassert_is_null(cons3, "Consume should fail"); zassert_is_null(cons3, "Consume should fail");
uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); uint32_t *acq3 = spsc_acquire(&ezspsc);
zassert_is_null(acq3, "Acquire should not succeed"); zassert_is_null(acq3, "Acquire should not succeed");
rtio_spsc_release(&ezspsc); spsc_release(&ezspsc);
uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); uint32_t *acq4 = spsc_acquire(&ezspsc);
zassert_not_null(acq4, "Acquire should succeed"); zassert_not_null(acq4, "Acquire should succeed");
} }
@ -69,34 +68,34 @@ ZTEST(rtio_spsc, test_produce_consume_size1)
* @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking
* and wrap around reads/writes. * and wrap around reads/writes.
* *
* @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() * @see spsc_acquire(), spsc_produce(), spsc_consume(), spsc_release()
* *
* @ingroup rtio_tests * @ingroup tests
*/ */
ZTEST(rtio_spsc, test_produce_consume_wrap_around) ZTEST(spsc, test_produce_consume_wrap_around)
{ {
RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); SPSC_DEFINE(ezspsc, uint32_t, 4);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0");
for (int j = 0; j < 3; j++) { for (int j = 0; j < 3; j++) {
uint32_t *entry = rtio_spsc_acquire(&ezspsc); uint32_t *entry = spsc_acquire(&ezspsc);
zassert_not_null(entry, "Acquire should succeed"); zassert_not_null(entry, "Acquire should succeed");
*entry = i * 3 + j; *entry = i * 3 + j;
rtio_spsc_produce(&ezspsc); spsc_produce(&ezspsc);
} }
zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); zassert_equal(spsc_consumable(&ezspsc), 3, "Consumables should be 3");
for (int k = 0; k < 3; k++) { for (int k = 0; k < 3; k++) {
uint32_t *entry = rtio_spsc_consume(&ezspsc); uint32_t *entry = spsc_consume(&ezspsc);
zassert_not_null(entry, "Consume should succeed"); zassert_not_null(entry, "Consume should succeed");
zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k");
rtio_spsc_release(&ezspsc); spsc_release(&ezspsc);
} }
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); zassert_equal(spsc_consumable(&ezspsc), 0, "Consumables should be 0");
} }
} }
@ -107,28 +106,28 @@ ZTEST(rtio_spsc, test_produce_consume_wrap_around)
* Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough
* to ensure integer wraps occur. * to ensure integer wraps occur.
*/ */
ZTEST(rtio_spsc, test_int_wrap_around) ZTEST(spsc, test_int_wrap_around)
{ {
RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); SPSC_DEFINE(ezspsc, uint32_t, 4);
ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2);
ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2);
for (int j = 0; j < 3; j++) { for (int j = 0; j < 3; j++) {
uint32_t *entry = rtio_spsc_acquire(&ezspsc); uint32_t *entry = spsc_acquire(&ezspsc);
zassert_not_null(entry, "Acquire should succeed"); zassert_not_null(entry, "Acquire should succeed");
*entry = j; *entry = j;
rtio_spsc_produce(&ezspsc); spsc_produce(&ezspsc);
} }
zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap");
for (int k = 0; k < 3; k++) { for (int k = 0; k < 3; k++) {
uint32_t *entry = rtio_spsc_consume(&ezspsc); uint32_t *entry = spsc_consume(&ezspsc);
zassert_not_null(entry, "Consume should succeed"); zassert_not_null(entry, "Consume should succeed");
zassert_equal(*entry, k, "Consume value should equal i*3+k"); zassert_equal(*entry, k, "Consume value should equal i*3+k");
rtio_spsc_release(&ezspsc); spsc_release(&ezspsc);
} }
zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap");
@ -137,14 +136,14 @@ ZTEST(rtio_spsc, test_int_wrap_around)
#define MAX_RETRIES 5 #define MAX_RETRIES 5
#define SMP_ITERATIONS 100 #define SMP_ITERATIONS 100
RTIO_SPSC_DEFINE(spsc, uint32_t, 4); SPSC_DEFINE(spsc, uint32_t, 4);
static void t1_consume(void *p1, void *p2, void *p3) static void t1_consume(void *p1, void *p2, void *p3)
{ {
ARG_UNUSED(p2); ARG_UNUSED(p2);
ARG_UNUSED(p3); ARG_UNUSED(p3);
struct rtio_spsc_spsc *ezspsc = p1; struct spsc_spsc *ezspsc = p1;
uint32_t retries = 0; uint32_t retries = 0;
uint32_t *val = NULL; uint32_t *val = NULL;
@ -152,11 +151,11 @@ static void t1_consume(void *p1, void *p2, void *p3)
val = NULL; val = NULL;
retries = 0; retries = 0;
while (val == NULL && retries < MAX_RETRIES) { while (val == NULL && retries < MAX_RETRIES) {
val = rtio_spsc_consume(ezspsc); val = spsc_consume(ezspsc);
retries++; retries++;
} }
if (val != NULL) { if (val != NULL) {
rtio_spsc_release(ezspsc); spsc_release(ezspsc);
} else { } else {
k_yield(); k_yield();
} }
@ -168,7 +167,7 @@ static void t2_produce(void *p1, void *p2, void *p3)
ARG_UNUSED(p2); ARG_UNUSED(p2);
ARG_UNUSED(p3); ARG_UNUSED(p3);
struct rtio_spsc_spsc *ezspsc = p1; struct spsc_spsc *ezspsc = p1;
uint32_t retries = 0; uint32_t retries = 0;
uint32_t *val = NULL; uint32_t *val = NULL;
@ -176,12 +175,12 @@ static void t2_produce(void *p1, void *p2, void *p3)
val = NULL; val = NULL;
retries = 0; retries = 0;
while (val == NULL && retries < MAX_RETRIES) { while (val == NULL && retries < MAX_RETRIES) {
val = rtio_spsc_acquire(ezspsc); val = spsc_acquire(ezspsc);
retries++; retries++;
} }
if (val != NULL) { if (val != NULL) {
*val = SMP_ITERATIONS; *val = SMP_ITERATIONS;
rtio_spsc_produce(ezspsc); spsc_produce(ezspsc);
} else { } else {
k_yield(); k_yield();
} }
@ -192,6 +191,13 @@ static void t2_produce(void *p1, void *p2, void *p3)
#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) #define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE)
#define THREADS_NUM 2 #define THREADS_NUM 2
struct thread_info {
k_tid_t tid;
int executed;
int priority;
int cpu_id;
};
static struct thread_info tinfo[THREADS_NUM]; static struct thread_info tinfo[THREADS_NUM];
static struct k_thread tthread[THREADS_NUM]; static struct k_thread tthread[THREADS_NUM];
static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE);
@ -202,7 +208,7 @@ static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE);
* This can and should be validated on SMP machines where incoherent * This can and should be validated on SMP machines where incoherent
* memory could cause issues. * memory could cause issues.
*/ */
ZTEST(rtio_spsc, test_spsc_threaded) ZTEST(spsc, test_spsc_threaded)
{ {
tinfo[0].tid = tinfo[0].tid =
@ -224,7 +230,7 @@ ZTEST(rtio_spsc, test_spsc_threaded)
#define THROUGHPUT_ITERS 100000 #define THROUGHPUT_ITERS 100000
ZTEST(rtio_spsc, test_spsc_throughput) ZTEST(spsc, test_spsc_throughput)
{ {
timing_t start_time, end_time; timing_t start_time, end_time;
@ -235,15 +241,19 @@ ZTEST(rtio_spsc, test_spsc_throughput)
uint32_t *x, *y; uint32_t *x, *y;
int key = irq_lock();
for (int i = 0; i < THROUGHPUT_ITERS; i++) { for (int i = 0; i < THROUGHPUT_ITERS; i++) {
x = rtio_spsc_acquire(&spsc); x = spsc_acquire(&spsc);
*x = i; *x = i;
rtio_spsc_produce(&spsc); spsc_produce(&spsc);
y = rtio_spsc_consume(&spsc); y = spsc_consume(&spsc);
rtio_spsc_release(&spsc); spsc_release(&spsc);
} }
irq_unlock(key);
end_time = timing_counter_get(); end_time = timing_counter_get();
uint64_t cycles = timing_cycles_get(&start_time, &end_time); uint64_t cycles = timing_cycles_get(&start_time, &end_time);
@ -253,11 +263,11 @@ ZTEST(rtio_spsc, test_spsc_throughput)
THROUGHPUT_ITERS, ns/THROUGHPUT_ITERS); THROUGHPUT_ITERS, ns/THROUGHPUT_ITERS);
} }
static void rtio_spsc_before(void *data) static void spsc_before(void *data)
{ {
ARG_UNUSED(data); ARG_UNUSED(data);
rtio_spsc_reset(&spsc); spsc_reset(&spsc);
} }
ZTEST_SUITE(rtio_spsc, NULL, NULL, rtio_spsc_before, NULL, NULL); ZTEST_SUITE(spsc, NULL, NULL, spsc_before, NULL, NULL);

7
tests/lib/lockfree/testcase.yaml

@ -0,0 +1,7 @@
tests:
libraries.lockfree:
platform_exclude:
- m5stack_core2 # renode times out
- m2gl025_miv # renode times out
tags:
- lockfree

2
tests/subsys/rtio/rtio_api/CMakeLists.txt

@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(rtio_api_test) project(rtio_api_test)
target_sources(app PRIVATE src/test_rtio_spsc.c src/test_rtio_mpsc.c src/test_rtio_api.c) target_sources(app PRIVATE src/test_rtio_api.c)
target_include_directories(app PRIVATE target_include_directories(app PRIVATE
${ZEPHYR_BASE}/include ${ZEPHYR_BASE}/include

19
tests/subsys/rtio/rtio_api/src/rtio_api.h

@ -1,19 +0,0 @@
/*
* Copyright (c) 2023 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_TEST_RTIO_API_H_
#define ZEPHYR_TEST_RTIO_API_H_
#include <zephyr/kernel.h>
struct thread_info {
k_tid_t tid;
int executed;
int priority;
int cpu_id;
};
#endif /* ZEPHYR_TEST_RTIO_API_H_ */

12
tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h

@ -5,8 +5,8 @@
*/ */
#include <zephyr/ztest.h> #include <zephyr/ztest.h>
#include <zephyr/rtio/rtio_mpsc.h>
#include <zephyr/rtio/rtio.h> #include <zephyr/rtio/rtio.h>
#include <zephyr/sys/mpsc_lockfree.h>
#include <zephyr/kernel.h> #include <zephyr/kernel.h>
#ifndef RTIO_IODEV_TEST_H_ #ifndef RTIO_IODEV_TEST_H_
@ -17,7 +17,7 @@ struct rtio_iodev_test_data {
struct k_timer timer; struct k_timer timer;
/* Queue of requests */ /* Queue of requests */
struct rtio_mpsc io_q; struct mpsc io_q;
/* Currently executing transaction */ /* Currently executing transaction */
struct rtio_iodev_sqe *txn_head; struct rtio_iodev_sqe *txn_head;
@ -40,7 +40,7 @@ static void rtio_iodev_test_next(struct rtio_iodev_test_data *data, bool complet
goto out; goto out;
} }
struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->io_q); struct mpsc_node *next = mpsc_pop(&data->io_q);
/* Nothing left to do, cleanup */ /* Nothing left to do, cleanup */
if (next == NULL) { if (next == NULL) {
@ -110,8 +110,8 @@ static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe)
atomic_inc(&data->submit_count); atomic_inc(&data->submit_count);
/* The only safe operation is enqueuing without a lock */ /* The only safe operation is enqueuing */
rtio_mpsc_push(&data->io_q, &iodev_sqe->q); mpsc_push(&data->io_q, &iodev_sqe->q);
rtio_iodev_test_next(data, false); rtio_iodev_test_next(data, false);
} }
@ -124,7 +124,7 @@ void rtio_iodev_test_init(struct rtio_iodev *test)
{ {
struct rtio_iodev_test_data *data = test->data; struct rtio_iodev_test_data *data = test->data;
rtio_mpsc_init(&data->io_q); mpsc_init(&data->io_q);
data->txn_head = NULL; data->txn_head = NULL;
data->txn_curr = NULL; data->txn_curr = NULL;
k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL); k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL);

1
tests/subsys/rtio/rtio_api/src/test_rtio_api.c

@ -672,7 +672,6 @@ void rtio_callback_chaining_cb(struct rtio *r, const struct rtio_sqe *sqe, void
*/ */
void test_rtio_callback_chaining_(struct rtio *r) void test_rtio_callback_chaining_(struct rtio *r)
{ {
int res; int res;
int32_t userdata[4] = {0, 1, 2, 3}; int32_t userdata[4] = {0, 1, 2, 3};
int32_t ordering[4] = { -1, -1, -1, -1}; int32_t ordering[4] = { -1, -1, -1, -1};

Loading…
Cancel
Save