diff --git a/doc/connectivity/networking/api/zperf.rst b/doc/connectivity/networking/api/zperf.rst index 058f816cf3b..14744236634 100644 --- a/doc/connectivity/networking/api/zperf.rst +++ b/doc/connectivity/networking/api/zperf.rst @@ -96,3 +96,79 @@ and this if you are testing TCP: iPerf output can be limited by using the -b option if Zephyr is not able to receive all the packets in orderly manner. + +Session Management +****************** + +If :kconfig:option:`CONFIG_ZPERF_SESSION_PER_THREAD` option is set, then +multiple upload sessions can be done at the same time if user supplies ``-a`` +option when starting the upload. Each session will have their own work queue +to run the test. The session test results can be viewed also after the tests +have finished. + +Following zperf shell commands are available for session management: + +.. csv-table:: + :header: "zperf shell command", "Description" + :widths: auto + + "``jobs``", "Show currently active or finished sessions" + "``jobs all``", "Show statistics of finished sessions" + "``jobs clear``", "Clear finished session statistics" + +Example: + +.. code-block:: console + + uart:~$ zperf udp upload -a -t 5 192.0.2.2 5001 10 1K 1M + Remote port is 5001 + Connecting to 192.0.2.2 + Duration: 10.00 s + Packet size: 1000 bytes + Rate: 1000 kbps + Starting... + Rate: 1.00 Mbps + Packet duration 7 ms + + uart:~$ zperf jobs all + No sessions sessions found + uart:~$ zperf jobs + Thread Remaining + Id Proto Priority time (sec) + [1] UDP 5 4 + + Active sessions have not yet finished + - + Upload completed! + Statistics: server (client) + Duration: 30.01 s (30.01 s) + Num packets: 3799 (3799) + Num packets out order: 0 + Num packets lost: 0 + Jitter: 63 us + Rate: 1.01 Mbps (1.01 Mbps) + Thread priority: 5 + Protocol: UDP + Session id: 1 + + uart:~$ zperf jobs all + - + Upload completed! + Statistics: server (client) + Duration: 30.01 s (30.01 s) + Num packets: 3799 (3799) + Num packets out order: 0 + Num packets lost: 0 + Jitter: 63 us + Rate: 1.01 Mbps (1.01 Mbps) + Thread priority: 5 + Protocol: UDP + Session id: 1 + Total 1 sessions done + + uart:~$ zperf jobs clear + Cleared data from 1 sessions + + uart:~$ zperf jobs + No active upload sessions + No finished sessions found diff --git a/include/zephyr/net/zperf.h b/include/zephyr/net/zperf.h index 4b6c50f4f61..3001d9419df 100644 --- a/include/zephyr/net/zperf.h +++ b/include/zephyr/net/zperf.h @@ -45,6 +45,9 @@ struct zperf_upload_params { uint8_t tos; int tcp_nodelay; int priority; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + int thread_priority; +#endif uint32_t report_interval_ms; } options; }; diff --git a/subsys/net/lib/zperf/Kconfig b/subsys/net/lib/zperf/Kconfig index 2d63026814c..a20244ea3fb 100644 --- a/subsys/net/lib/zperf/Kconfig +++ b/subsys/net/lib/zperf/Kconfig @@ -22,6 +22,16 @@ config NET_ZPERF_LEGACY_HEADER_COMPAT detected. This option reverts the header format for use with iperf version 2.0.9 and earlier. +config ZPERF_SESSION_PER_THREAD + bool "Run each session in a separate thread" + help + Each session is started in its own thread. This means + that the system will use more memory because multiple + stack frames are needed, so this is not enabled by default. + User is also able to set each thread priority separately and + the ZPERF_WORK_Q_THREAD_PRIORITY is a default value if thread + priority is not set when starting the session. + config ZPERF_WORK_Q_THREAD_PRIORITY int "zperf work queue thread priority" default NUM_PREEMPT_PRIORITIES diff --git a/subsys/net/lib/zperf/zperf_common.c b/subsys/net/lib/zperf/zperf_common.c index 7992e9236c7..66b55942f92 100644 --- a/subsys/net/lib/zperf/zperf_common.c +++ b/subsys/net/lib/zperf/zperf_common.c @@ -41,13 +41,70 @@ struct sockaddr_in *zperf_get_sin(void) return &in4_addr_my; } -#define ZPERF_WORK_Q_THREAD_PRIORITY \ - CLAMP(CONFIG_ZPERF_WORK_Q_THREAD_PRIORITY, K_HIGHEST_APPLICATION_THREAD_PRIO, \ +#define ZPERF_WORK_Q_THREAD_PRIORITY \ + CLAMP(CONFIG_ZPERF_WORK_Q_THREAD_PRIORITY, \ + K_HIGHEST_APPLICATION_THREAD_PRIO, \ K_LOWEST_APPLICATION_THREAD_PRIO) -K_THREAD_STACK_DEFINE(zperf_work_q_stack, CONFIG_ZPERF_WORK_Q_STACK_SIZE); +#if defined(CONFIG_ZPERF_SESSION_PER_THREAD) +struct zperf_work { + struct k_work_q *queue; + struct z_thread_stack_element *stack; + size_t stack_size; +}; + +#define CREATE_WORK_Q(i, _) \ + static struct k_work_q zperf_work_q_##i; \ + static K_KERNEL_STACK_DEFINE(zperf_work_q_stack_##i, \ + CONFIG_ZPERF_WORK_Q_STACK_SIZE) + +/* Both UDP and TCP can have separate sessions so multiply by 2 */ +#if defined(CONFIG_NET_UDP) && defined(CONFIG_NET_TCP) +#define MAX_SESSION_COUNT UTIL_X2(CONFIG_NET_ZPERF_MAX_SESSIONS) +#define SESSION_INDEX CONFIG_NET_ZPERF_MAX_SESSIONS +#else +#define MAX_SESSION_COUNT CONFIG_NET_ZPERF_MAX_SESSIONS +#define SESSION_INDEX 0 +#endif + +LISTIFY(MAX_SESSION_COUNT, CREATE_WORK_Q, (;), _); + +#define SET_WORK_Q(i, _) \ + [i] = { \ + .queue = &zperf_work_q_##i, \ + .stack = zperf_work_q_stack_##i, \ + .stack_size = K_THREAD_STACK_SIZEOF(zperf_work_q_stack_##i), \ + } + +static struct zperf_work zperf_work_q[] = { + LISTIFY(MAX_SESSION_COUNT, SET_WORK_Q, (,), _) +}; + +struct k_work_q *get_queue(enum session_proto proto, int session_id) +{ + if (session_id < 0 || session_id >= CONFIG_NET_ZPERF_MAX_SESSIONS) { + return NULL; + } + + if (proto < 0 || proto >= SESSION_PROTO_END) { + return NULL; + } + + NET_DBG("%s using queue %d for session %d\n", + proto == SESSION_UDP ? "UDP" : "TCP", + proto * SESSION_INDEX + session_id, + session_id); + + return zperf_work_q[proto * SESSION_INDEX + session_id].queue; +} + +#else /* CONFIG_ZPERF_SESSION_PER_THREAD */ + +K_THREAD_STACK_DEFINE(zperf_work_q_stack, CONFIG_ZPERF_WORK_Q_STACK_SIZE); static struct k_work_q zperf_work_q; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + int zperf_get_ipv6_addr(char *host, char *prefix_str, struct in6_addr *addr) { struct net_if_ipv6_prefix *prefix; @@ -220,20 +277,53 @@ uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps) (rate_in_kbps * 1024U)); } -void zperf_async_work_submit(struct k_work *work) +void zperf_async_work_submit(enum session_proto proto, int session_id, struct k_work *work) { +#if defined(CONFIG_ZPERF_SESSION_PER_THREAD) + k_work_submit_to_queue(zperf_work_q[proto * SESSION_INDEX + session_id].queue, work); +#else + ARG_UNUSED(proto); + ARG_UNUSED(session_id); + k_work_submit_to_queue(&zperf_work_q, work); +#endif } static int zperf_init(void) { +#if defined(CONFIG_ZPERF_SESSION_PER_THREAD) + + ARRAY_FOR_EACH(zperf_work_q, i) { + struct k_work_queue_config cfg = { + .no_yield = false, + }; + +#define MAX_NAME_LEN sizeof("zperf_work_q[xxx]") + char name[MAX_NAME_LEN]; + + snprintk(name, sizeof(name), "zperf_work_q[%d]", i); + cfg.name = name; + + k_work_queue_init(zperf_work_q[i].queue); + + k_work_queue_start(zperf_work_q[i].queue, + zperf_work_q[i].stack, + zperf_work_q[i].stack_size, + ZPERF_WORK_Q_THREAD_PRIORITY, + &cfg); + } + +#else /* CONFIG_ZPERF_SESSION_PER_THREAD */ k_work_queue_init(&zperf_work_q); k_work_queue_start(&zperf_work_q, zperf_work_q_stack, - K_THREAD_STACK_SIZEOF(zperf_work_q_stack), ZPERF_WORK_Q_THREAD_PRIORITY, + K_THREAD_STACK_SIZEOF(zperf_work_q_stack), + ZPERF_WORK_Q_THREAD_PRIORITY, NULL); k_thread_name_set(&zperf_work_q.thread, "zperf_work_q"); +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + if (IS_ENABLED(CONFIG_NET_UDP)) { zperf_udp_uploader_init(); } @@ -241,7 +331,8 @@ static int zperf_init(void) zperf_tcp_uploader_init(); } - if (IS_ENABLED(CONFIG_NET_ZPERF_SERVER)) { + if (IS_ENABLED(CONFIG_NET_ZPERF_SERVER) || + IS_ENABLED(CONFIG_ZPERF_SESSION_PER_THREAD)) { zperf_session_init(); } diff --git a/subsys/net/lib/zperf/zperf_internal.h b/subsys/net/lib/zperf/zperf_internal.h index ed2e8fb58c5..e2d7b48271f 100644 --- a/subsys/net/lib/zperf/zperf_internal.h +++ b/subsys/net/lib/zperf/zperf_internal.h @@ -53,6 +53,12 @@ #define ZPERF_VERSION "1.1" +enum session_proto { + SESSION_UDP = 0, + SESSION_TCP = 1, + SESSION_PROTO_END +}; + struct zperf_udp_datagram { uint32_t id; uint32_t tv_sec; @@ -110,13 +116,14 @@ int zperf_get_ipv4_addr(char *host, struct in_addr *addr); struct sockaddr_in *zperf_get_sin(void); extern void connect_ap(char *ssid); +extern struct k_work_q *get_queue(enum session_proto proto, int session_id); int zperf_prepare_upload_sock(const struct sockaddr *peer_addr, uint8_t tos, int priority, int tcp_nodelay, int proto); uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps); -void zperf_async_work_submit(struct k_work *work); +void zperf_async_work_submit(enum session_proto proto, int session_id, struct k_work *work); void zperf_udp_uploader_init(void); void zperf_tcp_uploader_init(void); diff --git a/subsys/net/lib/zperf/zperf_session.c b/subsys/net/lib/zperf/zperf_session.c index 6fff4abf9cc..d824057931d 100644 --- a/subsys/net/lib/zperf/zperf_session.c +++ b/subsys/net/lib/zperf/zperf_session.c @@ -19,6 +19,70 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); static struct session sessions[SESSION_PROTO_END][SESSION_MAX]; +struct session *get_free_session(const struct sockaddr *addr, + enum session_proto proto) +{ + struct session *ptr; + uint64_t oldest = 0ULL; + int oldest_completed_index = -1, oldest_free_index = -1; + int i = 0; + + const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; + const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; + + /* Check whether we already have an active session */ + while (i < SESSION_MAX) { + ptr = &sessions[proto][i]; + + if (ptr->state == STATE_NULL || + ptr->state == STATE_COMPLETED) { + + if (oldest == 0ULL || ptr->last_time < oldest) { + oldest = ptr->last_time; + + if (ptr->state == STATE_COMPLETED) { + if (oldest_completed_index < 0) { + oldest_completed_index = i; + } + } else { + /* Free session */ + if (oldest_free_index < 0) { + oldest_free_index = i; + } + } + } + } + + i++; + } + + ptr = NULL; + + if (oldest_free_index >= 0) { + ptr = &sessions[proto][oldest_free_index]; + } else if (oldest_completed_index >= 0) { + ptr = &sessions[proto][oldest_completed_index]; + } + + if (ptr != NULL) { + if (IS_ENABLED(CONFIG_NET_IPV4) && + addr->sa_family == AF_INET) { + ptr->port = addr4->sin_port; + ptr->ip.family = AF_INET; + net_ipaddr_copy(&ptr->ip.in_addr, &addr4->sin_addr); + } else if (IS_ENABLED(CONFIG_NET_IPV6) && + addr->sa_family == AF_INET6) { + ptr->port = addr6->sin6_port; + ptr->ip.family = AF_INET6; + net_ipaddr_copy(&ptr->ip.in6_addr, &addr6->sin6_addr); + } + + ptr->state = STATE_STARTING; + } + + return ptr; +} + /* Get session from a given packet */ struct session *get_session(const struct sockaddr *addr, enum session_proto proto) @@ -102,6 +166,14 @@ void zperf_reset_session_stats(struct session *session) session->last_transit_time = 0; } +void zperf_session_foreach(enum session_proto proto, session_cb_t cb, + void *user_data) +{ + ARRAY_FOR_EACH(sessions[proto], i) { + cb(&sessions[proto][i], proto, user_data); + } +} + void zperf_session_reset(enum session_proto proto) { int i, j; @@ -114,6 +186,7 @@ void zperf_session_reset(enum session_proto proto) for (j = 0; j < SESSION_MAX; j++) { sessions[i][j].state = STATE_NULL; + sessions[i][j].id = j; zperf_reset_session_stats(&(sessions[i][j])); } } diff --git a/subsys/net/lib/zperf/zperf_session.h b/subsys/net/lib/zperf/zperf_session.h index f01bc66b71a..ead4f40fae5 100644 --- a/subsys/net/lib/zperf/zperf_session.h +++ b/subsys/net/lib/zperf/zperf_session.h @@ -21,23 +21,21 @@ /* Type definition */ enum state { STATE_NULL, /* Session has not yet started */ + STATE_STARTING, /* Session is starting */ STATE_ONGOING, /* 1st packet has been received, last packet not yet */ STATE_LAST_PACKET_RECEIVED, /* Last packet has been received */ STATE_COMPLETED /* Session completed, stats pkt can be sent if needed */ }; -enum session_proto { - SESSION_UDP = 0, - SESSION_TCP = 1, - SESSION_PROTO_END -}; - struct session { + int id; + /* Tuple for UDP */ uint16_t port; struct net_addr ip; enum state state; + enum session_proto proto; /* Stat data */ uint32_t counter; @@ -52,13 +50,26 @@ struct session { /* Stats packet*/ struct zperf_server_hdr stat; + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct zperf_results result; + struct zperf_async_upload_context async_upload_ctx; + bool in_progress; /* is this session finished or not */ +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ }; +typedef void (*session_cb_t)(struct session *ses, enum session_proto proto, + void *user_data); + struct session *get_session(const struct sockaddr *addr, enum session_proto proto); +struct session *get_free_session(const struct sockaddr *addr, + enum session_proto proto); void zperf_session_init(void); void zperf_reset_session_stats(struct session *session); /* Reset all sessions for a given protocol. */ void zperf_session_reset(enum session_proto proto); +void zperf_session_foreach(enum session_proto proto, session_cb_t cb, + void *user_data); #endif /* __ZPERF_SESSION_H */ diff --git a/subsys/net/lib/zperf/zperf_shell.c b/subsys/net/lib/zperf/zperf_shell.c index 8bbcb222541..899e8525a4d 100644 --- a/subsys/net/lib/zperf/zperf_shell.c +++ b/subsys/net/lib/zperf/zperf_shell.c @@ -19,6 +19,7 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); #include #include #include +#include #include "zperf_internal.h" #include "zperf_session.h" @@ -484,7 +485,8 @@ static int cmd_udp_download(const struct shell *sh, size_t argc, #endif static void shell_udp_upload_print_stats(const struct shell *sh, - struct zperf_results *results) + struct zperf_results *results, + bool is_async) { if (IS_ENABLED(CONFIG_NET_UDP)) { uint64_t rate_in_kbps, client_rate_in_kbps; @@ -544,11 +546,35 @@ static void shell_udp_upload_print_stats(const struct shell *sh, shell_fprintf(sh, SHELL_NORMAL, "\t("); print_number(sh, client_rate_in_kbps, KBPS, KBPS_UNIT); shell_fprintf(sh, SHELL_NORMAL, ")\n"); + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + if (is_async) { + struct session *ses = CONTAINER_OF(results, + struct session, + result); + +#ifdef CONFIG_NET_CONTEXT_PRIORITY + shell_fprintf(sh, SHELL_NORMAL, + "Packet priority:\t%d\n", + ses->async_upload_ctx.param.options.priority); +#endif /* CONFIG_NET_CONTEXT_PRIORITY */ + + shell_fprintf(sh, SHELL_NORMAL, + "Thread priority:\t%d\n", + ses->async_upload_ctx.param.options.thread_priority); + shell_fprintf(sh, SHELL_NORMAL, + "Protocol:\t\t%s\n", + ses->proto == SESSION_UDP ? "UDP" : "TCP"); + shell_fprintf(sh, SHELL_NORMAL, + "Session id:\t\t%d\n", ses->id); + } +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ } } static void shell_tcp_upload_print_stats(const struct shell *sh, - struct zperf_results *results) + struct zperf_results *results, + bool is_async) { if (IS_ENABLED(CONFIG_NET_TCP)) { uint64_t client_rate_in_kbps; @@ -565,18 +591,41 @@ static void shell_tcp_upload_print_stats(const struct shell *sh, client_rate_in_kbps = 0U; } - shell_fprintf(sh, SHELL_NORMAL, "Duration:\t"); + shell_fprintf(sh, SHELL_NORMAL, "Duration:\t\t"); print_number_64(sh, results->client_time_in_us, TIME_US, TIME_US_UNIT); shell_fprintf(sh, SHELL_NORMAL, "\n"); - shell_fprintf(sh, SHELL_NORMAL, "Num packets:\t%u\n", + shell_fprintf(sh, SHELL_NORMAL, "Num packets:\t\t%u\n", results->nb_packets_sent); shell_fprintf(sh, SHELL_NORMAL, - "Num errors:\t%u (retry or fail)\n", + "Num errors:\t\t%u (retry or fail)\n", results->nb_packets_errors); - shell_fprintf(sh, SHELL_NORMAL, "Rate:\t\t"); + shell_fprintf(sh, SHELL_NORMAL, "Rate:\t\t\t"); print_number(sh, client_rate_in_kbps, KBPS, KBPS_UNIT); shell_fprintf(sh, SHELL_NORMAL, "\n"); + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + if (is_async) { + struct session *ses = CONTAINER_OF(results, + struct session, + result); + +#ifdef CONFIG_NET_CONTEXT_PRIORITY + shell_fprintf(sh, SHELL_NORMAL, + "Packet priority:\t%d\n", + ses->async_upload_ctx.param.options.priority); +#endif /* CONFIG_NET_CONTEXT_PRIORITY */ + + shell_fprintf(sh, SHELL_NORMAL, + "Thread priority:\t%d\n", + ses->async_upload_ctx.param.options.thread_priority); + shell_fprintf(sh, SHELL_NORMAL, + "Protocol:\t\t%s\n", + ses->proto == SESSION_UDP ? "UDP" : "TCP"); + shell_fprintf(sh, SHELL_NORMAL, + "Session id:\t\t%d\n", ses->id); + } +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ } } @@ -617,17 +666,32 @@ static void udp_upload_cb(enum zperf_status status, { const struct shell *sh = user_data; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct session *ses = CONTAINER_OF(result, struct session, result); +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + switch (status) { case ZPERF_SESSION_STARTED: break; case ZPERF_SESSION_FINISHED: { - shell_udp_upload_print_stats(sh, result); + shell_udp_upload_print_stats(sh, result, true); + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + ses->in_progress = false; + ses->state = STATE_COMPLETED; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + break; } case ZPERF_SESSION_ERROR: shell_fprintf(sh, SHELL_ERROR, "UDP upload failed\n"); + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + ses->in_progress = false; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + break; default: @@ -641,6 +705,10 @@ static void tcp_upload_cb(enum zperf_status status, { const struct shell *sh = user_data; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct session *ses = CONTAINER_OF(result, struct session, result); +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + switch (status) { case ZPERF_SESSION_STARTED: break; @@ -650,12 +718,23 @@ static void tcp_upload_cb(enum zperf_status status, break; case ZPERF_SESSION_FINISHED: { - shell_tcp_upload_print_stats(sh, result); + shell_tcp_upload_print_stats(sh, result, true); + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + ses->in_progress = false; + ses->state = STATE_COMPLETED; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + break; } case ZPERF_SESSION_ERROR: shell_fprintf(sh, SHELL_ERROR, "TCP upload failed\n"); + +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + ses->in_progress = false; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + break; } } @@ -771,7 +850,7 @@ static int execute_upload(const struct shell *sh, return ret; } - shell_udp_upload_print_stats(sh, &results); + shell_udp_upload_print_stats(sh, &results, false); } } else { if (is_udp && !IS_ENABLED(CONFIG_NET_UDP)) { @@ -797,7 +876,7 @@ static int execute_upload(const struct shell *sh, return ret; } - shell_tcp_upload_print_stats(sh, &results); + shell_tcp_upload_print_stats(sh, &results, false); } } else { if (!is_udp && !IS_ENABLED(CONFIG_NET_TCP)) { @@ -838,6 +917,25 @@ static int parse_arg(size_t *i, size_t argc, char *argv[]) return res; } +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD +static bool check_priority(const struct shell *sh, int priority) +{ + if (!((priority >= -CONFIG_NUM_COOP_PRIORITIES && priority <= -1) || + (priority >= 0 && priority <= (CONFIG_NUM_PREEMPT_PRIORITIES - 1)))) { + shell_fprintf(sh, SHELL_WARNING, + "Invalid priority: %d\n" + "Valid values are [%d, %d] for co-operative " + "and [%d, %d] for pre-emptive threads\n", + priority, + -CONFIG_NUM_COOP_PRIORITIES, -1, + 0, CONFIG_NUM_PREEMPT_PRIORITIES - 1); + return false; + } + + return true; +} +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + static int shell_cmd_upload(const struct shell *sh, size_t argc, char *argv[], enum net_ip_protocol proto) { @@ -891,6 +989,19 @@ static int shell_cmd_upload(const struct shell *sh, size_t argc, opt_cnt += 1; break; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + case 't': + param.options.thread_priority = parse_arg(&i, argc, argv); + if (!check_priority(sh, param.options.thread_priority)) { + shell_fprintf(sh, SHELL_WARNING, + "Parse error: %s\n", argv[i]); + return -ENOEXEC; + } + opt_cnt += 2; + async = true; + break; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + #ifdef CONFIG_NET_CONTEXT_PRIORITY case 'p': param.options.priority = parse_arg(&i, argc, argv); @@ -1125,6 +1236,19 @@ static int shell_cmd_upload2(const struct shell *sh, size_t argc, opt_cnt += 1; break; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + case 't': + param.options.thread_priority = parse_arg(&i, argc, argv); + if (!check_priority(sh, param.options.thread_priority)) { + shell_fprintf(sh, SHELL_WARNING, + "Parse error: %s\n", argv[i]); + return -ENOEXEC; + } + opt_cnt += 2; + async = true; + break; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + #ifdef CONFIG_NET_CONTEXT_PRIORITY case 'p': param.options.priority = parse_arg(&i, argc, argv); @@ -1294,7 +1418,7 @@ static int cmd_udp(const struct shell *sh, size_t argc, char *argv[]) static int cmd_connectap(const struct shell *sh, size_t argc, char *argv[]) { shell_fprintf(sh, SHELL_INFO, - "Zephyr has not been built with Wi-Fi support.\n"); + "Zephyr has not been built with %s support.\n", "Wi-Fi"); return 0; } @@ -1415,6 +1539,251 @@ static int cmd_version(const struct shell *sh, size_t argc, char *argv[]) return 0; } +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD +struct zperf_shell_user_data { + const struct shell *sh; + void *user_data; + int in_progress_count; + int finalized_count; + bool active; +}; + +static void session_cb(struct session *ses, + enum session_proto proto, + void *user_data) +{ + struct zperf_shell_user_data *data = user_data; + const struct shell *sh = data->sh; + bool active = data->active; + + if (ses->state == STATE_NULL) { + return; + } + + if (active) { + if (ses->in_progress) { + uint32_t remaining; + + if (ses->state != STATE_STARTING && ses->state != STATE_ONGOING) { + return; + } + + if (ses->proto != proto) { + return; + } + + if (data->in_progress_count == 0) { + shell_fprintf(sh, SHELL_NORMAL, + " Thread Remaining\n" + "Id Proto Priority time (sec)\n"); + } + + remaining = (uint32_t) + (((uint64_t)ses->async_upload_ctx.param.duration_ms - + (k_uptime_get() - + k_ticks_to_ms_ceil64(ses->start_time))) / MSEC_PER_SEC); + + shell_fprintf(sh, SHELL_NORMAL, + "[%d] %s %d\t\t%d\n", + ses->id, ses->proto == SESSION_UDP ? "UDP" : "TCP", + ses->async_upload_ctx.param.options.thread_priority, + remaining); + + data->in_progress_count++; + } + + return; + } + + if (!ses->in_progress) { + if (ses->state != STATE_COMPLETED) { + return; + } + + if (ses->proto != proto) { + return; + } + + if (data->finalized_count == 0) { + shell_fprintf(sh, SHELL_NORMAL, + " Thread\n" + "Id Proto Priority \tDuration\tRate\n"); + } + + shell_fprintf(sh, SHELL_NORMAL, + "[%d] %s %d\t\t", + ses->id, ses->proto == SESSION_UDP ? "UDP" : "TCP", + ses->async_upload_ctx.param.options.thread_priority); + print_number_64(sh, + (uint64_t)ses->async_upload_ctx.param.duration_ms * USEC_PER_MSEC, + TIME_US, TIME_US_UNIT); + shell_fprintf(sh, SHELL_NORMAL, "\t\t%u kbps\n", + ses->async_upload_ctx.param.rate_kbps); + + data->finalized_count++; + } +} + +static void session_all_cb(struct session *ses, + enum session_proto proto, + void *user_data) +{ + struct zperf_shell_user_data *data = user_data; + const struct shell *sh = data->sh; + + if (ses->state == STATE_NULL) { + return; + } + + if (!ses->in_progress) { + if (ses->state != STATE_COMPLETED) { + return; + } + + if (ses->proto != proto) { + return; + } + + if (proto == SESSION_UDP) { + shell_udp_upload_print_stats(sh, &ses->result, true); + } else { + shell_tcp_upload_print_stats(sh, &ses->result, true); + } + + data->finalized_count++; + } +} + +static void session_clear_cb(struct session *ses, + enum session_proto proto, + void *user_data) +{ + struct zperf_shell_user_data *data = user_data; + + if (ses->state == STATE_NULL) { + return; + } + + if (!ses->in_progress) { + if (ses->state == STATE_COMPLETED) { + ses->state = STATE_NULL; + data->finalized_count++; + } + } else { + if (ses->state == STATE_STARTING || ses->state == STATE_ONGOING) { + data->in_progress_count++; + } + } +} +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + +static int cmd_jobs(const struct shell *sh, size_t argc, char *argv[]) +{ +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct zperf_shell_user_data user_data; + + user_data.sh = sh; + user_data.in_progress_count = 0; + user_data.finalized_count = 0; + user_data.active = true; + + zperf_session_foreach(SESSION_UDP, session_cb, &user_data); + zperf_session_foreach(SESSION_TCP, session_cb, &user_data); + + if (user_data.in_progress_count == 0) { + shell_fprintf(sh, SHELL_NORMAL, + "No active upload sessions\n"); + } + + shell_fprintf(sh, SHELL_NORMAL, "\n"); + + user_data.active = false; + + zperf_session_foreach(SESSION_UDP, session_cb, &user_data); + zperf_session_foreach(SESSION_TCP, session_cb, &user_data); + + if (user_data.finalized_count == 0 && user_data.in_progress_count > 0) { + shell_fprintf(sh, SHELL_NORMAL, + "Active sessions have not yet finished\n"); + } else if (user_data.finalized_count == 0) { + shell_fprintf(sh, SHELL_NORMAL, + "No finished sessions found\n"); + } else { + shell_fprintf(sh, SHELL_NORMAL, + "Total %d sessions done\n", + user_data.finalized_count); + } +#else + shell_fprintf(sh, SHELL_INFO, + "Zephyr has not been built with %s support.\n", + "CONFIG_ZPERF_SESSION_PER_THREAD"); +#endif + return 0; +} + +static int cmd_jobs_all(const struct shell *sh, size_t argc, char *argv[]) +{ +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct zperf_shell_user_data user_data; + + user_data.sh = sh; + user_data.in_progress_count = 0; + user_data.finalized_count = 0; + user_data.active = false; + + zperf_session_foreach(SESSION_UDP, session_all_cb, &user_data); + zperf_session_foreach(SESSION_TCP, session_all_cb, &user_data); + + if (user_data.finalized_count == 0 && user_data.in_progress_count > 0) { + shell_fprintf(sh, SHELL_NORMAL, + "Active sessions have not yet finished\n"); + } else if (user_data.finalized_count == 0) { + shell_fprintf(sh, SHELL_NORMAL, + "No finished sessions found\n"); + } else { + shell_fprintf(sh, SHELL_NORMAL, + "Total %d sessions done\n", + user_data.finalized_count); + } +#else + shell_fprintf(sh, SHELL_INFO, + "Zephyr has not been built with %s support.\n", + "CONFIG_ZPERF_SESSION_PER_THREAD"); +#endif + return 0; +} + +static int cmd_jobs_clear(const struct shell *sh, size_t argc, char *argv[]) +{ +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct zperf_shell_user_data user_data; + + user_data.sh = sh; + user_data.in_progress_count = 0; + user_data.finalized_count = 0; + user_data.active = false; + + zperf_session_foreach(SESSION_UDP, session_clear_cb, &user_data); + zperf_session_foreach(SESSION_TCP, session_clear_cb, &user_data); + + if (user_data.finalized_count == 0 && user_data.in_progress_count > 0) { + shell_fprintf(sh, SHELL_NORMAL, + "Active sessions have not yet finished, not clearing\n"); + } else if (user_data.finalized_count == 0) { + shell_fprintf(sh, SHELL_NORMAL, "All sessions already cleared\n"); + } else { + shell_fprintf(sh, SHELL_NORMAL, + "Cleared data from %d sessions\n", + user_data.finalized_count); + } +#else + shell_fprintf(sh, SHELL_INFO, + "Zephyr has not been built with %s support.\n", + "CONFIG_ZPERF_SESSION_PER_THREAD"); +#endif + return 0; +} + void zperf_shell_init(void) { int ret; @@ -1491,6 +1860,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_tcp, "-a: Asynchronous call (shell will not block for the upload)\n" "-i sec: Periodic reporting interval in seconds (async only)\n" "-n: Disable Nagle's algorithm\n" +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + "-t: Specify custom thread priority\n" +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ #ifdef CONFIG_NET_CONTEXT_PRIORITY "-p: Specify custom packet priority\n" #endif /* CONFIG_NET_CONTEXT_PRIORITY */ @@ -1511,6 +1883,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_tcp, "-a: Asynchronous call (shell will not block for the upload)\n" "-i sec: Periodic reporting interval in seconds (async only)\n" "-n: Disable Nagle's algorithm\n" +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + "-t: Specify custom thread priority\n" +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ #ifdef CONFIG_NET_CONTEXT_PRIORITY "-p: Specify custom packet priority\n" #endif /* CONFIG_NET_CONTEXT_PRIORITY */ @@ -1560,6 +1935,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp, "Available options:\n" "-S tos: Specify IPv4/6 type of service\n" "-a: Asynchronous call (shell will not block for the upload)\n" +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + "-t: Specify custom thread priority\n" +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ #ifdef CONFIG_NET_CONTEXT_PRIORITY "-p: Specify custom packet priority\n" #endif /* CONFIG_NET_CONTEXT_PRIORITY */ @@ -1581,6 +1959,9 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp, "Available options:\n" "-S tos: Specify IPv4/6 type of service\n" "-a: Asynchronous call (shell will not block for the upload)\n" +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + "-t: Specify custom thread priority\n" +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ #ifdef CONFIG_NET_CONTEXT_PRIORITY "-p: Specify custom packet priority\n" #endif /* CONFIG_NET_CONTEXT_PRIORITY */ @@ -1610,10 +1991,18 @@ SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_udp, SHELL_SUBCMD_SET_END ); +SHELL_STATIC_SUBCMD_SET_CREATE(zperf_cmd_jobs, + SHELL_CMD(all, NULL, "Show all statistics", cmd_jobs_all), + SHELL_CMD(clear, NULL, "Clear all statistics", cmd_jobs_clear), +); + SHELL_STATIC_SUBCMD_SET_CREATE(zperf_commands, SHELL_CMD(connectap, NULL, "Connect to AP", cmd_connectap), + SHELL_CMD(jobs, &zperf_cmd_jobs, + "Show currently active tests", + cmd_jobs), SHELL_CMD(setip, NULL, "Set IP address\n" " \n" diff --git a/subsys/net/lib/zperf/zperf_tcp_uploader.c b/subsys/net/lib/zperf/zperf_tcp_uploader.c index 33602aa26aa..79a154be9ad 100644 --- a/subsys/net/lib/zperf/zperf_tcp_uploader.c +++ b/subsys/net/lib/zperf/zperf_tcp_uploader.c @@ -15,10 +15,13 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); #include #include "zperf_internal.h" +#include "zperf_session.h" static char sample_packet[PACKET_SIZE_MAX]; +#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD) static struct zperf_async_upload_context tcp_async_upload_ctx; +#endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */ static ssize_t sendall(int sock, const void *buf, size_t len) { @@ -149,9 +152,27 @@ int zperf_tcp_upload(const struct zperf_upload_params *param, static void tcp_upload_async_work(struct k_work *work) { - struct zperf_async_upload_context *upload_ctx = - CONTAINER_OF(work, struct zperf_async_upload_context, work); - struct zperf_results result = { 0 }; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct session *ses; + struct zperf_async_upload_context *upload_ctx; + struct zperf_results *result; + + ses = CONTAINER_OF(work, struct session, async_upload_ctx.work); + upload_ctx = &ses->async_upload_ctx; + + NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(), + k_thread_priority_get(k_current_get()), + k_thread_name_get(k_current_get())); + + result = &ses->result; + + ses->in_progress = true; +#else + struct zperf_async_upload_context *upload_ctx = &tcp_async_upload_ctx; + struct zperf_results result_storage = { 0 }; + struct zperf_results *result = &result_storage; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + int ret; struct zperf_upload_params param = upload_ctx->param; int sock; @@ -198,15 +219,15 @@ static void tcp_upload_async_work(struct k_work *work) upload_ctx->callback(ZPERF_SESSION_PERIODIC_RESULT, &periodic_result, upload_ctx->user_data); - result.nb_packets_sent += periodic_result.nb_packets_sent; - result.client_time_in_us += periodic_result.client_time_in_us; - result.nb_packets_errors += periodic_result.nb_packets_errors; + result->nb_packets_sent += periodic_result.nb_packets_sent; + result->client_time_in_us += periodic_result.client_time_in_us; + result->nb_packets_errors += periodic_result.nb_packets_errors; } - result.packet_size = periodic_result.packet_size; + result->packet_size = periodic_result.packet_size; } else { - ret = tcp_upload(sock, param.duration_ms, param.packet_size, &result); + ret = tcp_upload(sock, param.duration_ms, param.packet_size, result); if (ret < 0) { upload_ctx->callback(ZPERF_SESSION_ERROR, NULL, upload_ctx->user_data); @@ -214,7 +235,7 @@ static void tcp_upload_async_work(struct k_work *work) } } - upload_ctx->callback(ZPERF_SESSION_FINISHED, &result, + upload_ctx->callback(ZPERF_SESSION_FINISHED, result, upload_ctx->user_data); cleanup: zsock_close(sock); @@ -227,6 +248,49 @@ int zperf_tcp_upload_async(const struct zperf_upload_params *param, return -EINVAL; } +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct k_work_q *queue; + struct session *ses; + k_tid_t tid; + + ses = get_free_session(¶m->peer_addr, SESSION_TCP); + if (ses == NULL) { + NET_ERR("Cannot get a session!"); + return -ENOENT; + } + + if (k_work_is_pending(&ses->async_upload_ctx.work)) { + NET_ERR("[%d] upload already in progress", ses->id); + return -EBUSY; + } + + memcpy(&ses->async_upload_ctx.param, param, sizeof(*param)); + + ses->proto = SESSION_TCP; + ses->async_upload_ctx.callback = callback; + ses->async_upload_ctx.user_data = user_data; + + queue = get_queue(SESSION_TCP, ses->id); + if (queue == NULL) { + NET_ERR("Cannot get a work queue!"); + return -ENOENT; + } + + tid = k_work_queue_thread_get(queue); + k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority); + + k_work_init(&ses->async_upload_ctx.work, tcp_upload_async_work); + + ses->start_time = k_uptime_ticks(); + + zperf_async_work_submit(SESSION_TCP, ses->id, &ses->async_upload_ctx.work); + + NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(), + k_thread_priority_get(k_current_get()), + k_thread_name_get(k_current_get())); + +#else /* CONFIG_ZPERF_SESSION_PER_THREAD */ + if (k_work_is_pending(&tcp_async_upload_ctx.work)) { return -EBUSY; } @@ -235,12 +299,16 @@ int zperf_tcp_upload_async(const struct zperf_upload_params *param, tcp_async_upload_ctx.callback = callback; tcp_async_upload_ctx.user_data = user_data; - zperf_async_work_submit(&tcp_async_upload_ctx.work); + zperf_async_work_submit(SESSION_TCP, -1, &tcp_async_upload_ctx.work); + +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ return 0; } void zperf_tcp_uploader_init(void) { +#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD) k_work_init(&tcp_async_upload_ctx.work, tcp_upload_async_work); +#endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */ } diff --git a/subsys/net/lib/zperf/zperf_udp_uploader.c b/subsys/net/lib/zperf/zperf_udp_uploader.c index bcb858eedd1..36b8ec51e0a 100644 --- a/subsys/net/lib/zperf/zperf_udp_uploader.c +++ b/subsys/net/lib/zperf/zperf_udp_uploader.c @@ -13,12 +13,15 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL); #include #include "zperf_internal.h" +#include "zperf_session.h" static uint8_t sample_packet[sizeof(struct zperf_udp_datagram) + sizeof(struct zperf_client_hdr_v1) + PACKET_SIZE_MAX]; +#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD) static struct zperf_async_upload_context udp_async_upload_ctx; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ static inline void zperf_upload_decode_stat(const uint8_t *data, size_t datalen, @@ -335,20 +338,38 @@ int zperf_udp_upload(const struct zperf_upload_params *param, static void udp_upload_async_work(struct k_work *work) { - struct zperf_async_upload_context *upload_ctx = - &udp_async_upload_ctx; - struct zperf_results result; +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct session *ses; + struct zperf_async_upload_context *upload_ctx; + struct zperf_results *result; + + ses = CONTAINER_OF(work, struct session, async_upload_ctx.work); + upload_ctx = &ses->async_upload_ctx; + + NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(), + k_thread_priority_get(k_current_get()), + k_thread_name_get(k_current_get())); + + result = &ses->result; + + ses->in_progress = true; +#else + struct zperf_async_upload_context *upload_ctx = &udp_async_upload_ctx; + struct zperf_results result_storage = { 0 }; + struct zperf_results *result = &result_storage; +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ + int ret; upload_ctx->callback(ZPERF_SESSION_STARTED, NULL, upload_ctx->user_data); - ret = zperf_udp_upload(&upload_ctx->param, &result); + ret = zperf_udp_upload(&upload_ctx->param, result); if (ret < 0) { upload_ctx->callback(ZPERF_SESSION_ERROR, NULL, upload_ctx->user_data); } else { - upload_ctx->callback(ZPERF_SESSION_FINISHED, &result, + upload_ctx->callback(ZPERF_SESSION_FINISHED, result, upload_ctx->user_data); } } @@ -360,6 +381,49 @@ int zperf_udp_upload_async(const struct zperf_upload_params *param, return -EINVAL; } +#ifdef CONFIG_ZPERF_SESSION_PER_THREAD + struct k_work_q *queue; + struct session *ses; + k_tid_t tid; + + ses = get_free_session(¶m->peer_addr, SESSION_UDP); + if (ses == NULL) { + NET_ERR("Cannot get a session!"); + return -ENOENT; + } + + if (k_work_is_pending(&ses->async_upload_ctx.work)) { + NET_ERR("[%d] upload already in progress", ses->id); + return -EBUSY; + } + + memcpy(&ses->async_upload_ctx.param, param, sizeof(*param)); + + ses->proto = SESSION_UDP; + ses->async_upload_ctx.callback = callback; + ses->async_upload_ctx.user_data = user_data; + + queue = get_queue(SESSION_UDP, ses->id); + if (queue == NULL) { + NET_ERR("Cannot get a work queue!"); + return -ENOENT; + } + + tid = k_work_queue_thread_get(queue); + k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority); + + k_work_init(&ses->async_upload_ctx.work, udp_upload_async_work); + + ses->start_time = k_uptime_ticks(); + + zperf_async_work_submit(SESSION_UDP, ses->id, &ses->async_upload_ctx.work); + + NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(), + k_thread_priority_get(k_current_get()), + k_thread_name_get(k_current_get())); + +#else /* CONFIG_ZPERF_SESSION_PER_THREAD */ + if (k_work_is_pending(&udp_async_upload_ctx.work)) { return -EBUSY; } @@ -368,12 +432,16 @@ int zperf_udp_upload_async(const struct zperf_upload_params *param, udp_async_upload_ctx.callback = callback; udp_async_upload_ctx.user_data = user_data; - zperf_async_work_submit(&udp_async_upload_ctx.work); + zperf_async_work_submit(SESSION_UDP, -1, &udp_async_upload_ctx.work); + +#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */ return 0; } void zperf_udp_uploader_init(void) { +#if !defined(CONFIG_ZPERF_SESSION_PER_THREAD) k_work_init(&udp_async_upload_ctx.work, udp_upload_async_work); +#endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */ }