Browse Source

net: zperf: Add concurrency when doing upload tests

If user has enabled CONFIG_ZPERF_SESSION_PER_THREAD, then if
user gives -a (async) option to upload command, then multiple
uploads can be run simultaneously. Each upload will be run
in a dedicated work queue. The work queue thread priority can
be set by -t option.

Signed-off-by: Jukka Rissanen <jukka.rissanen@nordicsemi.no>
pull/89079/head
Jukka Rissanen 3 months ago committed by Fabio Baltieri
parent
commit
563f05eb8b
  1. 76
      doc/connectivity/networking/api/zperf.rst
  2. 3
      include/zephyr/net/zperf.h
  3. 10
      subsys/net/lib/zperf/Kconfig
  4. 103
      subsys/net/lib/zperf/zperf_common.c
  5. 9
      subsys/net/lib/zperf/zperf_internal.h
  6. 73
      subsys/net/lib/zperf/zperf_session.c
  7. 23
      subsys/net/lib/zperf/zperf_session.h
  8. 411
      subsys/net/lib/zperf/zperf_shell.c
  9. 88
      subsys/net/lib/zperf/zperf_tcp_uploader.c
  10. 80
      subsys/net/lib/zperf/zperf_udp_uploader.c

76
doc/connectivity/networking/api/zperf.rst

@ -96,3 +96,79 @@ and this if you are testing TCP: @@ -96,3 +96,79 @@ and this if you are testing TCP:
iPerf output can be limited by using the -b option if Zephyr is not
able to receive all the packets in orderly manner.
Session Management
******************
If :kconfig:option:`CONFIG_ZPERF_SESSION_PER_THREAD` option is set, then
multiple upload sessions can be done at the same time if user supplies ``-a``
option when starting the upload. Each session will have their own work queue
to run the test. The session test results can be viewed also after the tests
have finished.
Following zperf shell commands are available for session management:
.. csv-table::
:header: "zperf shell command", "Description"
:widths: auto
"``jobs``", "Show currently active or finished sessions"
"``jobs all``", "Show statistics of finished sessions"
"``jobs clear``", "Clear finished session statistics"
Example:
.. code-block:: console
uart:~$ zperf udp upload -a -t 5 192.0.2.2 5001 10 1K 1M
Remote port is 5001
Connecting to 192.0.2.2
Duration: 10.00 s
Packet size: 1000 bytes
Rate: 1000 kbps
Starting...
Rate: 1.00 Mbps
Packet duration 7 ms
uart:~$ zperf jobs all
No sessions sessions found
uart:~$ zperf jobs
Thread Remaining
Id Proto Priority time (sec)
[1] UDP 5 4
Active sessions have not yet finished
-
Upload completed!
Statistics: server (client)
Duration: 30.01 s (30.01 s)
Num packets: 3799 (3799)
Num packets out order: 0
Num packets lost: 0
Jitter: 63 us
Rate: 1.01 Mbps (1.01 Mbps)
Thread priority: 5
Protocol: UDP
Session id: 1
uart:~$ zperf jobs all
-
Upload completed!
Statistics: server (client)
Duration: 30.01 s (30.01 s)
Num packets: 3799 (3799)
Num packets out order: 0
Num packets lost: 0
Jitter: 63 us
Rate: 1.01 Mbps (1.01 Mbps)
Thread priority: 5
Protocol: UDP
Session id: 1
Total 1 sessions done
uart:~$ zperf jobs clear
Cleared data from 1 sessions
uart:~$ zperf jobs
No active upload sessions
No finished sessions found

3
include/zephyr/net/zperf.h

@ -45,6 +45,9 @@ struct zperf_upload_params { @@ -45,6 +45,9 @@ struct zperf_upload_params {
uint8_t tos;
int tcp_nodelay;
int priority;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
int thread_priority;
#endif
uint32_t report_interval_ms;
} options;
};

10
subsys/net/lib/zperf/Kconfig

@ -22,6 +22,16 @@ config NET_ZPERF_LEGACY_HEADER_COMPAT @@ -22,6 +22,16 @@ config NET_ZPERF_LEGACY_HEADER_COMPAT
detected. This option reverts the header format for use with
iperf version 2.0.9 and earlier.
config ZPERF_SESSION_PER_THREAD
bool "Run each session in a separate thread"
help
Each session is started in its own thread. This means
that the system will use more memory because multiple
stack frames are needed, so this is not enabled by default.
User is also able to set each thread priority separately and
the ZPERF_WORK_Q_THREAD_PRIORITY is a default value if thread
priority is not set when starting the session.
config ZPERF_WORK_Q_THREAD_PRIORITY
int "zperf work queue thread priority"
default NUM_PREEMPT_PRIORITIES

103
subsys/net/lib/zperf/zperf_common.c

@ -41,13 +41,70 @@ struct sockaddr_in *zperf_get_sin(void) @@ -41,13 +41,70 @@ struct sockaddr_in *zperf_get_sin(void)
return &in4_addr_my;
}
#define ZPERF_WORK_Q_THREAD_PRIORITY \
CLAMP(CONFIG_ZPERF_WORK_Q_THREAD_PRIORITY, K_HIGHEST_APPLICATION_THREAD_PRIO, \
#define ZPERF_WORK_Q_THREAD_PRIORITY \
CLAMP(CONFIG_ZPERF_WORK_Q_THREAD_PRIORITY, \
K_HIGHEST_APPLICATION_THREAD_PRIO, \
K_LOWEST_APPLICATION_THREAD_PRIO)
K_THREAD_STACK_DEFINE(zperf_work_q_stack, CONFIG_ZPERF_WORK_Q_STACK_SIZE);
#if defined(CONFIG_ZPERF_SESSION_PER_THREAD)
struct zperf_work {
struct k_work_q *queue;
struct z_thread_stack_element *stack;
size_t stack_size;
};
#define CREATE_WORK_Q(i, _) \
static struct k_work_q zperf_work_q_##i; \
static K_KERNEL_STACK_DEFINE(zperf_work_q_stack_##i, \
CONFIG_ZPERF_WORK_Q_STACK_SIZE)
/* Both UDP and TCP can have separate sessions so multiply by 2 */
#if defined(CONFIG_NET_UDP) && defined(CONFIG_NET_TCP)
#define MAX_SESSION_COUNT UTIL_X2(CONFIG_NET_ZPERF_MAX_SESSIONS)
#define SESSION_INDEX CONFIG_NET_ZPERF_MAX_SESSIONS
#else
#define MAX_SESSION_COUNT CONFIG_NET_ZPERF_MAX_SESSIONS
#define SESSION_INDEX 0
#endif
LISTIFY(MAX_SESSION_COUNT, CREATE_WORK_Q, (;), _);
#define SET_WORK_Q(i, _) \
[i] = { \
.queue = &zperf_work_q_##i, \
.stack = zperf_work_q_stack_##i, \
.stack_size = K_THREAD_STACK_SIZEOF(zperf_work_q_stack_##i), \
}
static struct zperf_work zperf_work_q[] = {
LISTIFY(MAX_SESSION_COUNT, SET_WORK_Q, (,), _)
};
struct k_work_q *get_queue(enum session_proto proto, int session_id)
{
if (session_id < 0 || session_id >= CONFIG_NET_ZPERF_MAX_SESSIONS) {
return NULL;
}
if (proto < 0 || proto >= SESSION_PROTO_END) {
return NULL;
}
NET_DBG("%s using queue %d for session %d\n",
proto == SESSION_UDP ? "UDP" : "TCP",
proto * SESSION_INDEX + session_id,
session_id);
return zperf_work_q[proto * SESSION_INDEX + session_id].queue;
}
#else /* CONFIG_ZPERF_SESSION_PER_THREAD */
K_THREAD_STACK_DEFINE(zperf_work_q_stack, CONFIG_ZPERF_WORK_Q_STACK_SIZE);
static struct k_work_q zperf_work_q;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
int zperf_get_ipv6_addr(char *host, char *prefix_str, struct in6_addr *addr)
{
struct net_if_ipv6_prefix *prefix;
@ -220,20 +277,53 @@ uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps) @@ -220,20 +277,53 @@ uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps)
(rate_in_kbps * 1024U));
}
void zperf_async_work_submit(struct k_work *work)
void zperf_async_work_submit(enum session_proto proto, int session_id, struct k_work *work)
{
#if defined(CONFIG_ZPERF_SESSION_PER_THREAD)
k_work_submit_to_queue(zperf_work_q[proto * SESSION_INDEX + session_id].queue, work);
#else
ARG_UNUSED(proto);
ARG_UNUSED(session_id);
k_work_submit_to_queue(&zperf_work_q, work);
#endif
}
static int zperf_init(void)
{
#if defined(CONFIG_ZPERF_SESSION_PER_THREAD)
ARRAY_FOR_EACH(zperf_work_q, i) {
struct k_work_queue_config cfg = {
.no_yield = false,
};
#define MAX_NAME_LEN sizeof("zperf_work_q[xxx]")
char name[MAX_NAME_LEN];
snprintk(name, sizeof(name), "zperf_work_q[%d]", i);
cfg.name = name;
k_work_queue_init(zperf_work_q[i].queue);
k_work_queue_start(zperf_work_q[i].queue,
zperf_work_q[i].stack,
zperf_work_q[i].stack_size,
ZPERF_WORK_Q_THREAD_PRIORITY,
&cfg);
}
#else /* CONFIG_ZPERF_SESSION_PER_THREAD */
k_work_queue_init(&zperf_work_q);
k_work_queue_start(&zperf_work_q, zperf_work_q_stack,
K_THREAD_STACK_SIZEOF(zperf_work_q_stack), ZPERF_WORK_Q_THREAD_PRIORITY,
K_THREAD_STACK_SIZEOF(zperf_work_q_stack),
ZPERF_WORK_Q_THREAD_PRIORITY,
NULL);
k_thread_name_set(&zperf_work_q.thread, "zperf_work_q");
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
if (IS_ENABLED(CONFIG_NET_UDP)) {
zperf_udp_uploader_init();
}
@ -241,7 +331,8 @@ static int zperf_init(void) @@ -241,7 +331,8 @@ static int zperf_init(void)
zperf_tcp_uploader_init();
}
if (IS_ENABLED(CONFIG_NET_ZPERF_SERVER)) {
if (IS_ENABLED(CONFIG_NET_ZPERF_SERVER) ||
IS_ENABLED(CONFIG_ZPERF_SESSION_PER_THREAD)) {
zperf_session_init();
}

9
subsys/net/lib/zperf/zperf_internal.h

@ -53,6 +53,12 @@ @@ -53,6 +53,12 @@
#define ZPERF_VERSION "1.1"
enum session_proto {
SESSION_UDP = 0,
SESSION_TCP = 1,
SESSION_PROTO_END
};
struct zperf_udp_datagram {
uint32_t id;
uint32_t tv_sec;
@ -110,13 +116,14 @@ int zperf_get_ipv4_addr(char *host, struct in_addr *addr); @@ -110,13 +116,14 @@ int zperf_get_ipv4_addr(char *host, struct in_addr *addr);
struct sockaddr_in *zperf_get_sin(void);
extern void connect_ap(char *ssid);
extern struct k_work_q *get_queue(enum session_proto proto, int session_id);
int zperf_prepare_upload_sock(const struct sockaddr *peer_addr, uint8_t tos,
int priority, int tcp_nodelay, int proto);
uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps);
void zperf_async_work_submit(struct k_work *work);
void zperf_async_work_submit(enum session_proto proto, int session_id, struct k_work *work);
void zperf_udp_uploader_init(void);
void zperf_tcp_uploader_init(void);

73
subsys/net/lib/zperf/zperf_session.c

@ -19,6 +19,70 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); @@ -19,6 +19,70 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
static struct session sessions[SESSION_PROTO_END][SESSION_MAX];
struct session *get_free_session(const struct sockaddr *addr,
enum session_proto proto)
{
struct session *ptr;
uint64_t oldest = 0ULL;
int oldest_completed_index = -1, oldest_free_index = -1;
int i = 0;
const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
/* Check whether we already have an active session */
while (i < SESSION_MAX) {
ptr = &sessions[proto][i];
if (ptr->state == STATE_NULL ||
ptr->state == STATE_COMPLETED) {
if (oldest == 0ULL || ptr->last_time < oldest) {
oldest = ptr->last_time;
if (ptr->state == STATE_COMPLETED) {
if (oldest_completed_index < 0) {
oldest_completed_index = i;
}
} else {
/* Free session */
if (oldest_free_index < 0) {
oldest_free_index = i;
}
}
}
}
i++;
}
ptr = NULL;
if (oldest_free_index >= 0) {
ptr = &sessions[proto][oldest_free_index];
} else if (oldest_completed_index >= 0) {
ptr = &sessions[proto][oldest_completed_index];
}
if (ptr != NULL) {
if (IS_ENABLED(CONFIG_NET_IPV4) &&
addr->sa_family == AF_INET) {
ptr->port = addr4->sin_port;
ptr->ip.family = AF_INET;
net_ipaddr_copy(&ptr->ip.in_addr, &addr4->sin_addr);
} else if (IS_ENABLED(CONFIG_NET_IPV6) &&
addr->sa_family == AF_INET6) {
ptr->port = addr6->sin6_port;
ptr->ip.family = AF_INET6;
net_ipaddr_copy(&ptr->ip.in6_addr, &addr6->sin6_addr);
}
ptr->state = STATE_STARTING;
}
return ptr;
}
/* Get session from a given packet */
struct session *get_session(const struct sockaddr *addr,
enum session_proto proto)
@ -102,6 +166,14 @@ void zperf_reset_session_stats(struct session *session) @@ -102,6 +166,14 @@ void zperf_reset_session_stats(struct session *session)
session->last_transit_time = 0;
}
void zperf_session_foreach(enum session_proto proto, session_cb_t cb,
void *user_data)
{
ARRAY_FOR_EACH(sessions[proto], i) {
cb(&sessions[proto][i], proto, user_data);
}
}
void zperf_session_reset(enum session_proto proto)
{
int i, j;
@ -114,6 +186,7 @@ void zperf_session_reset(enum session_proto proto) @@ -114,6 +186,7 @@ void zperf_session_reset(enum session_proto proto)
for (j = 0; j < SESSION_MAX; j++) {
sessions[i][j].state = STATE_NULL;
sessions[i][j].id = j;
zperf_reset_session_stats(&(sessions[i][j]));
}
}

23
subsys/net/lib/zperf/zperf_session.h

@ -21,23 +21,21 @@ @@ -21,23 +21,21 @@
/* Type definition */
enum state {
STATE_NULL, /* Session has not yet started */
STATE_STARTING, /* Session is starting */
STATE_ONGOING, /* 1st packet has been received, last packet not yet */
STATE_LAST_PACKET_RECEIVED, /* Last packet has been received */
STATE_COMPLETED /* Session completed, stats pkt can be sent if needed */
};
enum session_proto {
SESSION_UDP = 0,
SESSION_TCP = 1,
SESSION_PROTO_END
};
struct session {
int id;
/* Tuple for UDP */
uint16_t port;
struct net_addr ip;
enum state state;
enum session_proto proto;
/* Stat data */
uint32_t counter;
@ -52,13 +50,26 @@ struct session { @@ -52,13 +50,26 @@ struct session {
/* Stats packet*/
struct zperf_server_hdr stat;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct zperf_results result;
struct zperf_async_upload_context async_upload_ctx;
bool in_progress; /* is this session finished or not */
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
};
typedef void (*session_cb_t)(struct session *ses, enum session_proto proto,
void *user_data);
struct session *get_session(const struct sockaddr *addr,
enum session_proto proto);
struct session *get_free_session(const struct sockaddr *addr,
enum session_proto proto);
void zperf_session_init(void);
void zperf_reset_session_stats(struct session *session);
/* Reset all sessions for a given protocol. */
void zperf_session_reset(enum session_proto proto);
void zperf_session_foreach(enum session_proto proto, session_cb_t cb,
void *user_data);
#endif /* __ZPERF_SESSION_H */

411
subsys/net/lib/zperf/zperf_shell.c

@ -19,6 +19,7 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); @@ -19,6 +19,7 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
#include <zephyr/net/net_core.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/zperf.h>
#include <zephyr/sys/util_macro.h>
#include "zperf_internal.h"
#include "zperf_session.h"
@ -484,7 +485,8 @@ static int cmd_udp_download(const struct shell *sh, size_t argc, @@ -484,7 +485,8 @@ static int cmd_udp_download(const struct shell *sh, size_t argc,
#endif
static void shell_udp_upload_print_stats(const struct shell *sh,
struct zperf_results *results)
struct zperf_results *results,
bool is_async)
{
if (IS_ENABLED(CONFIG_NET_UDP)) {
uint64_t rate_in_kbps, client_rate_in_kbps;
@ -544,11 +546,35 @@ static void shell_udp_upload_print_stats(const struct shell *sh, @@ -544,11 +546,35 @@ static void shell_udp_upload_print_stats(const struct shell *sh,
shell_fprintf(sh, SHELL_NORMAL, "\t(");
print_number(sh, client_rate_in_kbps, KBPS, KBPS_UNIT);
shell_fprintf(sh, SHELL_NORMAL, ")\n");
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
if (is_async) {
struct session *ses = CONTAINER_OF(results,
struct session,
result);
#ifdef CONFIG_NET_CONTEXT_PRIORITY
shell_fprintf(sh, SHELL_NORMAL,
"Packet priority:\t%d\n",
ses->async_upload_ctx.param.options.priority);
#endif /* CONFIG_NET_CONTEXT_PRIORITY */
shell_fprintf(sh, SHELL_NORMAL,
"Thread priority:\t%d\n",
ses->async_upload_ctx.param.options.thread_priority);
shell_fprintf(sh, SHELL_NORMAL,
"Protocol:\t\t%s\n",
ses->proto == SESSION_UDP ? "UDP" : "TCP");
shell_fprintf(sh, SHELL_NORMAL,
"Session id:\t\t%d\n", ses->id);
}
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
}
}
static void shell_tcp_upload_print_stats(const struct shell *sh,
struct zperf_results *results)
struct zperf_results *results,
bool is_async)
{
if (IS_ENABLED(CONFIG_NET_TCP)) {
uint64_t client_rate_in_kbps;
@ -565,18 +591,41 @@ static void shell_tcp_upload_print_stats(const struct shell *sh, @@ -565,18 +591,41 @@ static void shell_tcp_upload_print_stats(const struct shell *sh,
client_rate_in_kbps = 0U;
}
shell_fprintf(sh, SHELL_NORMAL, "Duration:\t");
shell_fprintf(sh, SHELL_NORMAL, "Duration:\t\t");
print_number_64(sh, results->client_time_in_us,
TIME_US, TIME_US_UNIT);
shell_fprintf(sh, SHELL_NORMAL, "\n");
shell_fprintf(sh, SHELL_NORMAL, "Num packets:\t%u\n",
shell_fprintf(sh, SHELL_NORMAL, "Num packets:\t\t%u\n",
results->nb_packets_sent);
shell_fprintf(sh, SHELL_NORMAL,
"Num errors:\t%u (retry or fail)\n",
"Num errors:\t\t%u (retry or fail)\n",
results->nb_packets_errors);
shell_fprintf(sh, SHELL_NORMAL, "Rate:\t\t");
shell_fprintf(sh, SHELL_NORMAL, "Rate:\t\t\t");
print_number(sh, client_rate_in_kbps, KBPS, KBPS_UNIT);
shell_fprintf(sh, SHELL_NORMAL, "\n");
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
if (is_async) {
struct session *ses = CONTAINER_OF(results,
struct session,
result);
#ifdef CONFIG_NET_CONTEXT_PRIORITY
shell_fprintf(sh, SHELL_NORMAL,
"Packet priority:\t%d\n",
ses->async_upload_ctx.param.options.priority);
#endif /* CONFIG_NET_CONTEXT_PRIORITY */
shell_fprintf(sh, SHELL_NORMAL,
"Thread priority:\t%d\n",
ses->async_upload_ctx.param.options.thread_priority);
shell_fprintf(sh, SHELL_NORMAL,
"Protocol:\t\t%s\n",
ses->proto == SESSION_UDP ? "UDP" : "TCP");
shell_fprintf(sh, SHELL_NORMAL,
"Session id:\t\t%d\n", ses->id);
}
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
}
}
@ -617,17 +666,32 @@ static void udp_upload_cb(enum zperf_status status, @@ -617,17 +666,32 @@ static void udp_upload_cb(enum zperf_status status,
{
const struct shell *sh = user_data;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct session *ses = CONTAINER_OF(result, struct session, result);
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
switch (status) {
case ZPERF_SESSION_STARTED:
break;
case ZPERF_SESSION_FINISHED: {
shell_udp_upload_print_stats(sh, result);
shell_udp_upload_print_stats(sh, result, true);
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
ses->in_progress = false;
ses->state = STATE_COMPLETED;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
break;
}
case ZPERF_SESSION_ERROR:
shell_fprintf(sh, SHELL_ERROR, "UDP upload failed\n");
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
ses->in_progress = false;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
break;
default:
@ -641,6 +705,10 @@ static void tcp_upload_cb(enum zperf_status status, @@ -641,6 +705,10 @@ static void tcp_upload_cb(enum zperf_status status,
{
const struct shell *sh = user_data;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct session *ses = CONTAINER_OF(result, struct session, result);
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
switch (status) {
case ZPERF_SESSION_STARTED:
break;
@ -650,12 +718,23 @@ static void tcp_upload_cb(enum zperf_status status, @@ -650,12 +718,23 @@ static void tcp_upload_cb(enum zperf_status status,
break;
case ZPERF_SESSION_FINISHED: {
shell_tcp_upload_print_stats(sh, result);
shell_tcp_upload_print_stats(sh, result, true);
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
ses->in_progress = false;
ses->state = STATE_COMPLETED;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
break;
}
case ZPERF_SESSION_ERROR:
shell_fprintf(sh, SHELL_ERROR, "TCP upload failed\n");
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
ses->in_progress = false;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
break;
}
}
@ -771,7 +850,7 @@ static int execute_upload(const struct shell *sh, @@ -771,7 +850,7 @@ static int execute_upload(const struct shell *sh,
return ret;
}
shell_udp_upload_print_stats(sh, &results);
shell_udp_upload_print_stats(sh, &results, false);
}
} else {
if (is_udp && !IS_ENABLED(CONFIG_NET_UDP)) {
@ -797,7 +876,7 @@ static int execute_upload(const struct shell *sh, @@ -797,7 +876,7 @@ static int execute_upload(const struct shell *sh,
return ret;
}
shell_tcp_upload_print_stats(sh, &results);
shell_tcp_upload_print_stats(sh, &results, false);
}
} else {
if (!is_udp && !IS_ENABLED(CONFIG_NET_TCP)) {
@ -838,6 +917,25 @@ static int parse_arg(size_t *i, size_t argc, char *argv[]) @@ -838,6 +917,25 @@ static int parse_arg(size_t *i, size_t argc, char *argv[])
return res;
}
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
static bool check_priority(const struct shell *sh, int priority)
{
if (!((priority >= -CONFIG_NUM_COOP_PRIORITIES && priority <= -1) ||
(priority >= 0 && priority <= (CONFIG_NUM_PREEMPT_PRIORITIES - 1)))) {
shell_fprintf(sh, SHELL_WARNING,
"Invalid priority: %d\n"
"Valid values are [%d, %d] for co-operative "
"and [%d, %d] for pre-emptive threads\n",
priority,
-CONFIG_NUM_COOP_PRIORITIES, -1,
0, CONFIG_NUM_PREEMPT_PRIORITIES - 1);
return false;
}
return true;
}
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
static int shell_cmd_upload(const struct shell *sh, size_t argc,
char *argv[], enum net_ip_protocol proto)
{
@ -891,6 +989,19 @@ static int shell_cmd_upload(const struct shell *sh, size_t argc, @@ -891,6 +989,19 @@ static int shell_cmd_upload(const struct shell *sh, size_t argc,
opt_cnt += 1;
break;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
case 't':
param.options.thread_priority = parse_arg(&i, argc, argv);
if (!check_priority(sh, param.options.thread_priority)) {
shell_fprintf(sh, SHELL_WARNING,
"Parse error: %s\n", argv[i]);
return -ENOEXEC;
}
opt_cnt += 2;
async = true;
break;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
#ifdef CONFIG_NET_CONTEXT_PRIORITY
case 'p':
param.options.priority = parse_arg(&i, argc, argv);
@ -1125,6 +1236,19 @@ static int shell_cmd_upload2(const struct shell *sh, size_t argc, @@ -1125,6 +1236,19 @@ static int shell_cmd_upload2(const struct shell *sh, size_t argc,
opt_cnt += 1;
break;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
case 't':
param.options.thread_priority = parse_arg(&i, argc, argv);
if (!check_priority(sh, param.options.thread_priority)) {
shell_fprintf(sh, SHELL_WARNING,
"Parse error: %s\n", argv[i]);
return -ENOEXEC;
}
opt_cnt += 2;
async = true;
break;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
#ifdef CONFIG_NET_CONTEXT_PRIORITY
case 'p':
param.options.priority = parse_arg(&i, argc, argv);
@ -1294,7 +1418,7 @@ static int cmd_udp(const struct shell *sh, size_t argc, char *argv[]) @@ -1294,7 +1418,7 @@ static int cmd_udp(const struct shell *sh, size_t argc, char *argv[])
static int cmd_connectap(const struct shell *sh, size_t argc, char *argv[])
{
shell_fprintf(sh, SHELL_INFO,
"Zephyr has not been built with Wi-Fi support.\n");
"Zephyr has not been built with %s support.\n", "Wi-Fi");
return 0;
}
@ -1415,6 +1539,251 @@ static int cmd_version(const struct shell *sh, size_t argc, char *argv[]) @@ -1415,6 +1539,251 @@ static int cmd_version(const struct shell *sh, size_t argc, char *argv[])
return 0;
}
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct zperf_shell_user_data {
const struct shell *sh;
void *user_data;
int in_progress_count;
int finalized_count;
bool active;
};
static void session_cb(struct session *ses,
enum session_proto proto,
void *user_data)
{
struct zperf_shell_user_data *data = user_data;
const struct shell *sh = data->sh;
bool active = data->active;
if (ses->state == STATE_NULL) {
return;
}
if (active) {
if (ses->in_progress) {
uint32_t remaining;
if (ses->state != STATE_STARTING && ses->state != STATE_ONGOING) {
return;
}
if (ses->proto != proto) {
return;
}
if (data->in_progress_count == 0) {
shell_fprintf(sh, SHELL_NORMAL,
" Thread Remaining\n"
"Id Proto Priority time (sec)\n");
}
remaining = (uint32_t)
(((uint64_t)ses->async_upload_ctx.param.duration_ms -
(k_uptime_get() -
k_ticks_to_ms_ceil64(ses->start_time))) / MSEC_PER_SEC);
shell_fprintf(sh, SHELL_NORMAL,
"[%d] %s %d\t\t%d\n",
ses->id, ses->proto == SESSION_UDP ? "UDP" : "TCP",
ses->async_upload_ctx.param.options.thread_priority,
remaining);
data->in_progress_count++;
}
return;
}
if (!ses->in_progress) {
if (ses->state != STATE_COMPLETED) {
return;
}
if (ses->proto != proto) {
return;
}
if (data->finalized_count == 0) {
shell_fprintf(sh, SHELL_NORMAL,
" Thread\n"
"Id Proto Priority \tDuration\tRate\n");
}
shell_fprintf(sh, SHELL_NORMAL,
"[%d] %s %d\t\t",
ses->id, ses->proto == SESSION_UDP ? "UDP" : "TCP",
ses->async_upload_ctx.param.options.thread_priority);
print_number_64(sh,
(uint64_t)ses->async_upload_ctx.param.duration_ms * USEC_PER_MSEC,
TIME_US, TIME_US_UNIT);
shell_fprintf(sh, SHELL_NORMAL, "\t\t%u kbps\n",
ses->async_upload_ctx.param.rate_kbps);
data->finalized_count++;
}
}
static void session_all_cb(struct session *ses,
enum session_proto proto,
void *user_data)
{
struct zperf_shell_user_data *data = user_data;
const struct shell *sh = data->sh;
if (ses->state == STATE_NULL) {
return;
}
if (!ses->in_progress) {
if (ses->state != STATE_COMPLETED) {
return;
}
if (ses->proto != proto) {
return;
}
if (proto == SESSION_UDP) {
shell_udp_upload_print_stats(sh, &ses->result, true);
} else {
shell_tcp_upload_print_stats(sh, &ses->result, true);
}
data->finalized_count++;
}
}
static void session_clear_cb(struct session *ses,
enum session_proto proto,
void *user_data)
{
struct zperf_shell_user_data *data = user_data;
if (ses->state == STATE_NULL) {
return;
}
if (!ses->in_progress) {
if (ses->state == STATE_COMPLETED) {
ses->state = STATE_NULL;
data->finalized_count++;
}
} else {
if (ses->state == STATE_STARTING || ses->state == STATE_ONGOING) {
data->in_progress_count++;
}
}
}
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
static int cmd_jobs(const struct shell *sh, size_t argc, char *argv[])
{
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct zperf_shell_user_data user_data;
user_data.sh = sh;
user_data.in_progress_count = 0;
user_data.finalized_count = 0;
user_data.active = true;
zperf_session_foreach(SESSION_UDP, session_cb, &user_data);
zperf_session_foreach(SESSION_TCP, session_cb, &user_data);
if (user_data.in_progress_count == 0) {
shell_fprintf(sh, SHELL_NORMAL,
"No active upload sessions\n");
}
shell_fprintf(sh, SHELL_NORMAL, "\n");
user_data.active = false;
zperf_session_foreach(SESSION_UDP, session_cb, &user_data);
zperf_session_foreach(SESSION_TCP, session_cb, &user_data);
if (user_data.finalized_count == 0 && user_data.in_progress_count > 0) {
shell_fprintf(sh, SHELL_NORMAL,
"Active sessions have not yet finished\n");
} else if (user_data.finalized_count == 0) {
shell_fprintf(sh, SHELL_NORMAL,
"No finished sessions found\n");
} else {
shell_fprintf(sh, SHELL_NORMAL,
"Total %d sessions done\n",
user_data.finalized_count);
}
#else
shell_fprintf(sh, SHELL_INFO,
"Zephyr has not been built with %s support.\n",
"CONFIG_ZPERF_SESSION_PER_THREAD");
#endif
return 0;
}
static int cmd_jobs_all(const struct shell *sh, size_t argc, char *argv[])
{
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct zperf_shell_user_data user_data;
user_data.sh = sh;
user_data.in_progress_count = 0;
user_data.finalized_count = 0;
user_data.active = false;
zperf_session_foreach(SESSION_UDP, session_all_cb, &user_data);
zperf_session_foreach(SESSION_TCP, session_all_cb, &user_data);
if (user_data.finalized_count == 0 && user_data.in_progress_count > 0) {
shell_fprintf(sh, SHELL_NORMAL,
"Active sessions have not yet finished\n");
} else if (user_data.finalized_count == 0) {
shell_fprintf(sh, SHELL_NORMAL,
"No finished sessions found\n");
} else {
shell_fprintf(sh, SHELL_NORMAL,
"Total %d sessions done\n",
user_data.finalized_count);
}
#else
shell_fprintf(sh, SHELL_INFO,
"Zephyr has not been built with %s support.\n",
"CONFIG_ZPERF_SESSION_PER_THREAD");
#endif
return 0;
}
static int cmd_jobs_clear(const struct shell *sh, size_t argc, char *argv[])
{
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct zperf_shell_user_data user_data;
user_data.sh = sh;
user_data.in_progress_count = 0;
user_data.finalized_count = 0;
user_data.active = false;
zperf_session_foreach(SESSION_UDP, session_clear_cb, &user_data);
zperf_session_foreach(SESSION_TCP, session_clear_cb, &user_data);
if (user_data.finalized_count == 0 && user_data.in_progress_count > 0) {
shell_fprintf(sh, SHELL_NORMAL,
"Active sessions have not yet finished, not clearing\n");
} else if (user_data.finalized_count == 0) {
shell_fprintf(sh, SHELL_NORMAL, "All sessions already cleared\n");
} else {
shell_fprintf(sh, SHELL_NORMAL,
"Cleared data from %d sessions\n",
user_data.finalized_count);
}
#else
shell_fprintf(sh, SHELL_INFO,
"Zephyr has not been built with %s support.\n",
"CONFIG_ZPERF_SESSION_PER_THREAD");
#endif
return 0;
}
void zperf_shell_init(void)
{
int ret;
@ -1491,6 +1860,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_tcp, @@ -1491,6 +1860,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_tcp,
"-a: Asynchronous call (shell will not block for the upload)\n"
"-i sec: Periodic reporting interval in seconds (async only)\n"
"-n: Disable Nagle's algorithm\n"
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
"-t: Specify custom thread priority\n"
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
#ifdef CONFIG_NET_CONTEXT_PRIORITY
"-p: Specify custom packet priority\n"
#endif /* CONFIG_NET_CONTEXT_PRIORITY */
@ -1511,6 +1883,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_tcp, @@ -1511,6 +1883,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_tcp,
"-a: Asynchronous call (shell will not block for the upload)\n"
"-i sec: Periodic reporting interval in seconds (async only)\n"
"-n: Disable Nagle's algorithm\n"
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
"-t: Specify custom thread priority\n"
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
#ifdef CONFIG_NET_CONTEXT_PRIORITY
"-p: Specify custom packet priority\n"
#endif /* CONFIG_NET_CONTEXT_PRIORITY */
@ -1560,6 +1935,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp, @@ -1560,6 +1935,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp,
"Available options:\n"
"-S tos: Specify IPv4/6 type of service\n"
"-a: Asynchronous call (shell will not block for the upload)\n"
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
"-t: Specify custom thread priority\n"
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
#ifdef CONFIG_NET_CONTEXT_PRIORITY
"-p: Specify custom packet priority\n"
#endif /* CONFIG_NET_CONTEXT_PRIORITY */
@ -1581,6 +1959,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp, @@ -1581,6 +1959,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp,
"Available options:\n"
"-S tos: Specify IPv4/6 type of service\n"
"-a: Asynchronous call (shell will not block for the upload)\n"
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
"-t: Specify custom thread priority\n"
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
#ifdef CONFIG_NET_CONTEXT_PRIORITY
"-p: Specify custom packet priority\n"
#endif /* CONFIG_NET_CONTEXT_PRIORITY */
@ -1610,10 +1991,18 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp, @@ -1610,10 +1991,18 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp,
SHELL_SUBCMD_SET_END
);
SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_jobs,
SHELL_CMD(all, NULL, "Show all statistics", cmd_jobs_all),
SHELL_CMD(clear, NULL, "Clear all statistics", cmd_jobs_clear),
);
SHELL_STATIC_SUBCMD_SET_CREATE(zperf_commands,
SHELL_CMD(connectap, NULL,
"Connect to AP",
cmd_connectap),
SHELL_CMD(jobs, &zperf_cmd_jobs,
"Show currently active tests",
cmd_jobs),
SHELL_CMD(setip, NULL,
"Set IP address\n"
"<my ip> <prefix len>\n"

88
subsys/net/lib/zperf/zperf_tcp_uploader.c

@ -15,10 +15,13 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); @@ -15,10 +15,13 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
#include <zephyr/net/zperf.h>
#include "zperf_internal.h"
#include "zperf_session.h"
static char sample_packet[PACKET_SIZE_MAX];
#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
static struct zperf_async_upload_context tcp_async_upload_ctx;
#endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
static ssize_t sendall(int sock, const void *buf, size_t len)
{
@ -149,9 +152,27 @@ int zperf_tcp_upload(const struct zperf_upload_params *param, @@ -149,9 +152,27 @@ int zperf_tcp_upload(const struct zperf_upload_params *param,
static void tcp_upload_async_work(struct k_work *work)
{
struct zperf_async_upload_context *upload_ctx =
CONTAINER_OF(work, struct zperf_async_upload_context, work);
struct zperf_results result = { 0 };
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct session *ses;
struct zperf_async_upload_context *upload_ctx;
struct zperf_results *result;
ses = CONTAINER_OF(work, struct session, async_upload_ctx.work);
upload_ctx = &ses->async_upload_ctx;
NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
k_thread_priority_get(k_current_get()),
k_thread_name_get(k_current_get()));
result = &ses->result;
ses->in_progress = true;
#else
struct zperf_async_upload_context *upload_ctx = &tcp_async_upload_ctx;
struct zperf_results result_storage = { 0 };
struct zperf_results *result = &result_storage;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
int ret;
struct zperf_upload_params param = upload_ctx->param;
int sock;
@ -198,15 +219,15 @@ static void tcp_upload_async_work(struct k_work *work) @@ -198,15 +219,15 @@ static void tcp_upload_async_work(struct k_work *work)
upload_ctx->callback(ZPERF_SESSION_PERIODIC_RESULT, &periodic_result,
upload_ctx->user_data);
result.nb_packets_sent += periodic_result.nb_packets_sent;
result.client_time_in_us += periodic_result.client_time_in_us;
result.nb_packets_errors += periodic_result.nb_packets_errors;
result->nb_packets_sent += periodic_result.nb_packets_sent;
result->client_time_in_us += periodic_result.client_time_in_us;
result->nb_packets_errors += periodic_result.nb_packets_errors;
}
result.packet_size = periodic_result.packet_size;
result->packet_size = periodic_result.packet_size;
} else {
ret = tcp_upload(sock, param.duration_ms, param.packet_size, &result);
ret = tcp_upload(sock, param.duration_ms, param.packet_size, result);
if (ret < 0) {
upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
upload_ctx->user_data);
@ -214,7 +235,7 @@ static void tcp_upload_async_work(struct k_work *work) @@ -214,7 +235,7 @@ static void tcp_upload_async_work(struct k_work *work)
}
}
upload_ctx->callback(ZPERF_SESSION_FINISHED, &result,
upload_ctx->callback(ZPERF_SESSION_FINISHED, result,
upload_ctx->user_data);
cleanup:
zsock_close(sock);
@ -227,6 +248,49 @@ int zperf_tcp_upload_async(const struct zperf_upload_params *param, @@ -227,6 +248,49 @@ int zperf_tcp_upload_async(const struct zperf_upload_params *param,
return -EINVAL;
}
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct k_work_q *queue;
struct session *ses;
k_tid_t tid;
ses = get_free_session(&param->peer_addr, SESSION_TCP);
if (ses == NULL) {
NET_ERR("Cannot get a session!");
return -ENOENT;
}
if (k_work_is_pending(&ses->async_upload_ctx.work)) {
NET_ERR("[%d] upload already in progress", ses->id);
return -EBUSY;
}
memcpy(&ses->async_upload_ctx.param, param, sizeof(*param));
ses->proto = SESSION_TCP;
ses->async_upload_ctx.callback = callback;
ses->async_upload_ctx.user_data = user_data;
queue = get_queue(SESSION_TCP, ses->id);
if (queue == NULL) {
NET_ERR("Cannot get a work queue!");
return -ENOENT;
}
tid = k_work_queue_thread_get(queue);
k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority);
k_work_init(&ses->async_upload_ctx.work, tcp_upload_async_work);
ses->start_time = k_uptime_ticks();
zperf_async_work_submit(SESSION_TCP, ses->id, &ses->async_upload_ctx.work);
NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
k_thread_priority_get(k_current_get()),
k_thread_name_get(k_current_get()));
#else /* CONFIG_ZPERF_SESSION_PER_THREAD */
if (k_work_is_pending(&tcp_async_upload_ctx.work)) {
return -EBUSY;
}
@ -235,12 +299,16 @@ int zperf_tcp_upload_async(const struct zperf_upload_params *param, @@ -235,12 +299,16 @@ int zperf_tcp_upload_async(const struct zperf_upload_params *param,
tcp_async_upload_ctx.callback = callback;
tcp_async_upload_ctx.user_data = user_data;
zperf_async_work_submit(&tcp_async_upload_ctx.work);
zperf_async_work_submit(SESSION_TCP, -1, &tcp_async_upload_ctx.work);
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
return 0;
}
void zperf_tcp_uploader_init(void)
{
#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
k_work_init(&tcp_async_upload_ctx.work, tcp_upload_async_work);
#endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
}

80
subsys/net/lib/zperf/zperf_udp_uploader.c

@ -13,12 +13,15 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); @@ -13,12 +13,15 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
#include <zephyr/net/zperf.h>
#include "zperf_internal.h"
#include "zperf_session.h"
static uint8_t sample_packet[sizeof(struct zperf_udp_datagram) +
sizeof(struct zperf_client_hdr_v1) +
PACKET_SIZE_MAX];
#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
static struct zperf_async_upload_context udp_async_upload_ctx;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
static inline void zperf_upload_decode_stat(const uint8_t *data,
size_t datalen,
@ -335,20 +338,38 @@ int zperf_udp_upload(const struct zperf_upload_params *param, @@ -335,20 +338,38 @@ int zperf_udp_upload(const struct zperf_upload_params *param,
static void udp_upload_async_work(struct k_work *work)
{
struct zperf_async_upload_context *upload_ctx =
&udp_async_upload_ctx;
struct zperf_results result;
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct session *ses;
struct zperf_async_upload_context *upload_ctx;
struct zperf_results *result;
ses = CONTAINER_OF(work, struct session, async_upload_ctx.work);
upload_ctx = &ses->async_upload_ctx;
NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
k_thread_priority_get(k_current_get()),
k_thread_name_get(k_current_get()));
result = &ses->result;
ses->in_progress = true;
#else
struct zperf_async_upload_context *upload_ctx = &udp_async_upload_ctx;
struct zperf_results result_storage = { 0 };
struct zperf_results *result = &result_storage;
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
int ret;
upload_ctx->callback(ZPERF_SESSION_STARTED, NULL,
upload_ctx->user_data);
ret = zperf_udp_upload(&upload_ctx->param, &result);
ret = zperf_udp_upload(&upload_ctx->param, result);
if (ret < 0) {
upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
upload_ctx->user_data);
} else {
upload_ctx->callback(ZPERF_SESSION_FINISHED, &result,
upload_ctx->callback(ZPERF_SESSION_FINISHED, result,
upload_ctx->user_data);
}
}
@ -360,6 +381,49 @@ int zperf_udp_upload_async(const struct zperf_upload_params *param, @@ -360,6 +381,49 @@ int zperf_udp_upload_async(const struct zperf_upload_params *param,
return -EINVAL;
}
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
struct k_work_q *queue;
struct session *ses;
k_tid_t tid;
ses = get_free_session(&param->peer_addr, SESSION_UDP);
if (ses == NULL) {
NET_ERR("Cannot get a session!");
return -ENOENT;
}
if (k_work_is_pending(&ses->async_upload_ctx.work)) {
NET_ERR("[%d] upload already in progress", ses->id);
return -EBUSY;
}
memcpy(&ses->async_upload_ctx.param, param, sizeof(*param));
ses->proto = SESSION_UDP;
ses->async_upload_ctx.callback = callback;
ses->async_upload_ctx.user_data = user_data;
queue = get_queue(SESSION_UDP, ses->id);
if (queue == NULL) {
NET_ERR("Cannot get a work queue!");
return -ENOENT;
}
tid = k_work_queue_thread_get(queue);
k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority);
k_work_init(&ses->async_upload_ctx.work, udp_upload_async_work);
ses->start_time = k_uptime_ticks();
zperf_async_work_submit(SESSION_UDP, ses->id, &ses->async_upload_ctx.work);
NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
k_thread_priority_get(k_current_get()),
k_thread_name_get(k_current_get()));
#else /* CONFIG_ZPERF_SESSION_PER_THREAD */
if (k_work_is_pending(&udp_async_upload_ctx.work)) {
return -EBUSY;
}
@ -368,12 +432,16 @@ int zperf_udp_upload_async(const struct zperf_upload_params *param, @@ -368,12 +432,16 @@ int zperf_udp_upload_async(const struct zperf_upload_params *param,
udp_async_upload_ctx.callback = callback;
udp_async_upload_ctx.user_data = user_data;
zperf_async_work_submit(&udp_async_upload_ctx.work);
zperf_async_work_submit(SESSION_UDP, -1, &udp_async_upload_ctx.work);
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
return 0;
}
void zperf_udp_uploader_init(void)
{
#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
k_work_init(&udp_async_upload_ctx.work, udp_upload_async_work);
#endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
}

Loading…
Cancel
Save