From bec031a7a38f145537362a37d80bc558aa8e5968 Mon Sep 17 00:00:00 2001 From: Marcin Szkudlinski Date: Wed, 16 Aug 2023 08:32:28 +0200 Subject: [PATCH] dp: introduce dp_queue DP queue is a lockless circular buffer providing safe consumer/producer cached operations cross cores prerequisites: 1) incoming and outgoing data rate MUST be the same 2) Both data consumer and data producer declare max chunk sizes they want to use (IBS/OBS) required Buffer size: - 2*MAX(IBS,OBS) if the larger of IBS/OBS is multiplication of smaller - 3*MAX(IBS,OBS) otherwise The queue may work in 2 modes 1) local mode in case both receiver and sender are located on the same core and cache coherency does not matter. dp_queue structure is located in cached memory In this case DP Queue is a simple ring buffer 2) shared mode In this case we need to writeback cache when new data arrive and invalidate cache on secondary core. dp_queue structure is located in shared memory dpQueue is a lockless consumer/producer safe buffer. It is achieved by having only 2 shared variables: write_offset - can be modified by data producer only read_offset - can be modified by data consumer only as 32 bit operations are atomic, it is multi-thread and multi-core save There some explanation needed how free_space and available_data are calculated number of avail data in circular buffer may be calculated as: data_avail = write_offset - read_offset and check for wrap around if (data_avail < 0) data_avail = buffer_size + data_avail The problem is when write_offset == read_offset, !!! it may mean either that the buffer is empty or the buffer is completely filled !!! To solve the above issue having only 2 variables mentioned before: - allow both offsets to point from 0 to DOUBLE buffer_size - when calculating pointers to data, use: data_bufer[offset % buffer_size] - use double buffer size in wrap around check when calculating available data And now: - write_offset == read_offset always means "buffer empty" - write_offset == read_offset + buffer_size always means "buffer full" - data_avail = write_offset - read_offset if (data_avail < 0) data_avail = 2 * buffer_size + data_avail Signed-off-by: Marcin Szkudlinski --- src/audio/dp_queue.c | 300 +++++++++++++++++++++++++++++++ src/include/sof/audio/dp_queue.h | 169 +++++++++++++++++ zephyr/CMakeLists.txt | 3 + 3 files changed, 472 insertions(+) create mode 100644 src/audio/dp_queue.c create mode 100644 src/include/sof/audio/dp_queue.h diff --git a/src/audio/dp_queue.c b/src/audio/dp_queue.c new file mode 100644 index 000000000000..62ca3e85f894 --- /dev/null +++ b/src/audio/dp_queue.c @@ -0,0 +1,300 @@ +// SPDX-License-Identifier: BSD-3-Clause +// +// Copyright(c) 2023 Intel Corporation. All rights reserved. +// + +#include +#include +#include + +#include + +#include +#include + +LOG_MODULE_REGISTER(dp_queue, CONFIG_SOF_LOG_LEVEL); + +/* 393608d8-4188-11ee-be56-0242ac120002 */ +DECLARE_SOF_RT_UUID("dp_queue", dp_queue_uuid, 0x393608d8, 0x4188, 0x11ee, + 0xbe, 0x56, 0x02, 0x42, 0xac, 0x12, 0x20, 0x02); +DECLARE_TR_CTX(dp_queue_tr, SOF_UUID(dp_queue_uuid), LOG_LEVEL_INFO); + +static inline uint8_t __sparse_cache *dp_queue_buffer_end(struct dp_queue *dp_queue) +{ + return dp_queue->_data_buffer + dp_queue->data_buffer_size; +} + +static inline struct dp_queue *dp_queue_from_sink(struct sof_sink *sink) +{ + return container_of(sink, struct dp_queue, _sink_api); +} + +static inline struct dp_queue *dp_queue_from_source(struct sof_source *source) +{ + return container_of(source, struct dp_queue, _source_api); +} + +static inline void dp_queue_invalidate_shared(struct dp_queue *dp_queue, + void __sparse_cache *ptr, size_t size) +{ + /* no cache required in case of not shared queue */ + if (!dp_queue_is_shared(dp_queue)) + return; + + /* wrap-around? */ + if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) { + /* writeback till the end of circular buffer */ + dcache_invalidate_region + (ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr); + size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr; + ptr = dp_queue->_data_buffer; + } + /* invalidate rest of data */ + dcache_invalidate_region(ptr, size); +} + +static inline void dp_queue_writeback_shared(struct dp_queue *dp_queue, + void __sparse_cache *ptr, size_t size) +{ + /* no cache required in case of not shared queue */ + if (!dp_queue_is_shared(dp_queue)) + return; + + /* wrap-around? */ + if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) { + /* writeback till the end of circular buffer */ + dcache_writeback_region + (ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr); + size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr; + ptr = dp_queue->_data_buffer; + } + /* writeback rest of data */ + dcache_writeback_region(ptr, size); +} + +static inline +uint8_t __sparse_cache *dp_queue_get_pointer(struct dp_queue *dp_queue, uint32_t offset) +{ + /* check if offset is not in "double area" + * lines below do a quicker version of offset %= dp_queue->data_buffer_size; + */ + if (offset >= dp_queue->data_buffer_size) + offset -= dp_queue->data_buffer_size; + return dp_queue->_data_buffer + offset; +} + +static inline +uint32_t dp_queue_inc_offset(struct dp_queue *dp_queue, uint32_t offset, uint32_t inc) +{ + assert(inc <= dp_queue->data_buffer_size); + offset += inc; + /* wrap around ? 2*size because of "double area" */ + if (offset >= 2 * dp_queue->data_buffer_size) + offset -= 2 * dp_queue->data_buffer_size; + return offset; +} + +static inline +size_t _dp_queue_get_data_available(struct dp_queue *dp_queue) +{ + int32_t avail_data = dp_queue->_write_offset - dp_queue->_read_offset; + /* wrap around ? 2*size because of "double area" */ + if (avail_data < 0) + avail_data = 2 * dp_queue->data_buffer_size + avail_data; + + return avail_data; +} + +static size_t dp_queue_get_data_available(struct sof_source *source) +{ + struct dp_queue *dp_queue = dp_queue_from_source(source); + + CORE_CHECK_STRUCT(dp_queue); + return _dp_queue_get_data_available(dp_queue); +} + +static size_t dp_queue_get_free_size(struct sof_sink *sink) +{ + struct dp_queue *dp_queue = dp_queue_from_sink(sink); + + CORE_CHECK_STRUCT(dp_queue); + return dp_queue->data_buffer_size - _dp_queue_get_data_available(dp_queue); +} + +static int dp_queue_get_buffer(struct sof_sink *sink, size_t req_size, + void **data_ptr, void **buffer_start, size_t *buffer_size) +{ + struct dp_queue *dp_queue = dp_queue_from_sink(sink); + + CORE_CHECK_STRUCT(dp_queue); + if (req_size > dp_queue_get_free_size(sink)) + return -ENODATA; + + /* note, __sparse_force is to be removed once sink/src use __sparse_cache for data ptrs */ + *data_ptr = (__sparse_force void *)dp_queue_get_pointer(dp_queue, dp_queue->_write_offset); + *buffer_start = (__sparse_force void *)dp_queue->_data_buffer; + *buffer_size = dp_queue->data_buffer_size; + + /* no need to invalidate cache - buffer is to be written only */ + return 0; +} + +static int dp_queue_commit_buffer(struct sof_sink *sink, size_t commit_size) +{ + struct dp_queue *dp_queue = dp_queue_from_sink(sink); + + CORE_CHECK_STRUCT(dp_queue); + if (commit_size) { + dp_queue_writeback_shared(dp_queue, + dp_queue_get_pointer(dp_queue, dp_queue->_write_offset), + commit_size); + + /* move write pointer */ + dp_queue->_write_offset = + dp_queue_inc_offset(dp_queue, dp_queue->_write_offset, commit_size); + } + + return 0; +} + +static int dp_queue_get_data(struct sof_source *source, size_t req_size, + void const **data_ptr, void const **buffer_start, size_t *buffer_size) +{ + struct dp_queue *dp_queue = dp_queue_from_source(source); + __sparse_cache void *_data_ptr; + + CORE_CHECK_STRUCT(dp_queue); + if (req_size > dp_queue_get_data_available(source)) + return -ENODATA; + + _data_ptr = dp_queue_get_pointer(dp_queue, dp_queue->_read_offset); + + /* clean cache in provided data range */ + dp_queue_invalidate_shared(dp_queue, _data_ptr, req_size); + + *buffer_start = (__sparse_force void *)dp_queue->_data_buffer; + *buffer_size = dp_queue->data_buffer_size; + *data_ptr = (__sparse_force void *)_data_ptr; + + return 0; +} + +static int dp_queue_release_data(struct sof_source *source, size_t free_size) +{ + struct dp_queue *dp_queue = dp_queue_from_source(source); + + CORE_CHECK_STRUCT(dp_queue); + if (free_size) { + /* data consumed, free buffer space, no need for any special cache operations */ + dp_queue->_read_offset = + dp_queue_inc_offset(dp_queue, dp_queue->_read_offset, free_size); + } + + return 0; +} + +static int dp_queue_set_ipc_params(struct dp_queue *dp_queue, + struct sof_ipc_stream_params *params, + bool force_update) +{ + CORE_CHECK_STRUCT(dp_queue); + if (dp_queue->_hw_params_configured && !force_update) + return 0; + + dp_queue->audio_stream_params.frame_fmt = params->frame_fmt; + dp_queue->audio_stream_params.rate = params->rate; + dp_queue->audio_stream_params.channels = params->channels; + dp_queue->audio_stream_params.buffer_fmt = params->buffer_fmt; + + dp_queue->_hw_params_configured = true; + + return 0; +} + +static int dp_queue_set_ipc_params_source(struct sof_source *source, + struct sof_ipc_stream_params *params, + bool force_update) +{ + struct dp_queue *dp_queue = dp_queue_from_source(source); + + CORE_CHECK_STRUCT(dp_queue); + return dp_queue_set_ipc_params(dp_queue, params, force_update); +} + +static int dp_queue_set_ipc_params_sink(struct sof_sink *sink, + struct sof_ipc_stream_params *params, + bool force_update) +{ + struct dp_queue *dp_queue = dp_queue_from_sink(sink); + + CORE_CHECK_STRUCT(dp_queue); + return dp_queue_set_ipc_params(dp_queue, params, force_update); +} + +static const struct source_ops dp_queue_source_ops = { + .get_data_available = dp_queue_get_data_available, + .get_data = dp_queue_get_data, + .release_data = dp_queue_release_data, + .audio_set_ipc_params = dp_queue_set_ipc_params_source, +}; + +static const struct sink_ops dp_queue_sink_ops = { + .get_free_size = dp_queue_get_free_size, + .get_buffer = dp_queue_get_buffer, + .commit_buffer = dp_queue_commit_buffer, + .audio_set_ipc_params = dp_queue_set_ipc_params_sink, +}; + +struct dp_queue *dp_queue_create(size_t ibs, size_t obs, uint32_t flags) +{ + struct dp_queue *dp_queue; + + /* allocate DP structure */ + if (flags & DP_QUEUE_MODE_SHARED) + dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME_SHARED, 0, SOF_MEM_CAPS_RAM, + sizeof(*dp_queue)); + else + dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_queue)); + if (!dp_queue) + return NULL; + + dp_queue->_flags = flags; + + CORE_CHECK_STRUCT_INIT(dp_queue, flags & DP_QUEUE_MODE_SHARED); + + /* initiate sink/source */ + source_init(dp_queue_get_source(dp_queue), &dp_queue_source_ops, + &dp_queue->audio_stream_params); + sink_init(dp_queue_get_sink(dp_queue), &dp_queue_sink_ops, + &dp_queue->audio_stream_params); + + /* set obs/ibs in sink/source interfaces */ + sink_set_obs(&dp_queue->_sink_api, obs); + source_set_ibs(&dp_queue->_source_api, ibs); + + uint32_t max_ibs_obs = MAX(ibs, obs); + uint32_t min_ibs_obs = MIN(ibs, obs); + + /* calculate required buffer size */ + if (max_ibs_obs % min_ibs_obs == 0) + dp_queue->data_buffer_size = 2 * max_ibs_obs; + else + dp_queue->data_buffer_size = 3 * max_ibs_obs; + + /* allocate data buffer - always in cached memory alias */ + dp_queue->data_buffer_size = ALIGN_UP(dp_queue->data_buffer_size, PLATFORM_DCACHE_ALIGN); + dp_queue->_data_buffer = (__sparse_force __sparse_cache void *) + rballoc_align(0, 0, dp_queue->data_buffer_size, PLATFORM_DCACHE_ALIGN); + if (!dp_queue->_data_buffer) + goto err; + + tr_info(&dp_queue_tr, "DpQueue created, shared: %u ibs: %u obs %u, size %u", + dp_queue_is_shared(dp_queue), ibs, obs, dp_queue->data_buffer_size); + + /* return a pointer to allocated structure */ + return dp_queue; +err: + tr_err(&dp_queue_tr, "DpQueue creation failure"); + rfree(dp_queue); + return NULL; +} diff --git a/src/include/sof/audio/dp_queue.h b/src/include/sof/audio/dp_queue.h new file mode 100644 index 000000000000..64409abed6cd --- /dev/null +++ b/src/include/sof/audio/dp_queue.h @@ -0,0 +1,169 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright(c) 2023 Intel Corporation. All rights reserved. + * + */ + +#ifndef __SOF_DP_QUEUE_H__ +#define __SOF_DP_QUEUE_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * DP queue is a lockless circular buffer + * providing safe consumer/producer cached operations cross cores + * + * prerequisites: + * 1) incoming and outgoing data rate MUST be the same + * 2) Both data consumer and data producer declare max chunk sizes they want to use (IBS/OBS) + * + * required Buffer size: + * - 2*MAX(IBS,OBS) if the larger of IBS/OBS is multiplication of smaller + * - 3*MAX(IBS,OBS) otherwise + * + * The queue may work in 2 modes + * 1) local mode + * in case both receiver and sender are located on the same core and cache coherency + * does not matter. dp_queue structure is located in cached memory + * In this case DP Queue is a simple ring buffer + * + * 2) shared mode + * In this case we need to writeback cache when new data arrive and invalidate cache on + * secondary core. dp_queue structure is located in shared memory + * + * + * dpQueue is a lockless consumer/producer safe buffer. It is achieved by having only 2 shared + * variables: + * _write_offset - can be modified by data producer only + * _read_offset - can be modified by data consumer only + * + * as 32 bit operations are atomic, it is multi-thread and multi-core save + * + * There some explanation needed how free_space and available_data are calculated + * + * number of avail data in circular buffer may be calculated as: + * data_avail = _write_offset - _read_offset + * and check for wrap around + * if (data_avail < 0) data_avail = buffer_size - data_avail + * + * The problem is when _write_offset == _read_offset, + * !!! it may mean either that the buffer is empty or the buffer is completely filled !!! + * + * To solve the above issue having only 2 variables mentioned before: + * - allow both offsets to point from 0 to DOUBLE buffer_size + * - when calculating pointers to data, use: data_bufer[offset % buffer_size] + * - use double buffer size in wrap around check when calculating available data + * + * And now: + * - _write_offset == _read_offset + * always means "buffer empty" + * - _write_offset == _read_offset + buffer_size + * always means "buffer full" + */ + +struct dp_queue; +struct sof_audio_stream_params; + +/* DP flags */ +#define DP_QUEUE_MODE_LOCAL 0 +#define DP_QUEUE_MODE_SHARED BIT(1) + +/* the dpQueue structure */ +struct dp_queue { + CORE_CHECK_STRUCT_FIELD; + + /* public: read only */ + struct sof_audio_stream_params audio_stream_params; + size_t data_buffer_size; + + /* private: */ + struct sof_source _source_api; /**< src api handler */ + struct sof_sink _sink_api; /**< sink api handler */ + + uint32_t _flags; /* DP_QUEUE_MODE_* */ + + uint8_t __sparse_cache *_data_buffer; + uint32_t _write_offset; /* private: to be modified by data producer using API */ + uint32_t _read_offset; /* private: to be modified by data consumer using API */ + + bool _hw_params_configured; +}; + +/** + * @param ibs input buffer size + * the size of data to be produced in 1 cycle + * the data producer declares here how much data it will produce in single cycle + * + * @param obs output buffer size + * the size of data to be consumed in 1 cycle + * the data receiver declares here how much data it will consume in single cycle + * + * @param flags a combinatin of DP_QUEUE_MODE_* flags determining working mode + * + */ +struct dp_queue *dp_queue_create(size_t ibs, size_t obs, uint32_t flags); + +/** + * @brief free dp queue memory + */ +static inline +void dp_queue_free(struct dp_queue *dp_queue) +{ + CORE_CHECK_STRUCT(dp_queue); + rfree((__sparse_force void *)dp_queue->_data_buffer); + rfree(dp_queue); +} + +/** + * @brief return a handler to sink API of dp_queue. + * the handler may be used by helper functions defined in sink_api.h + */ +static inline +struct sof_sink *dp_queue_get_sink(struct dp_queue *dp_queue) +{ + CORE_CHECK_STRUCT(dp_queue); + return &dp_queue->_sink_api; +} + +/** + * @brief return a handler to source API of dp_queue + * the handler may be used by helper functions defined in source_api.h + */ +static inline +struct sof_source *dp_queue_get_source(struct dp_queue *dp_queue) +{ + CORE_CHECK_STRUCT(dp_queue); + return &dp_queue->_source_api; +} + +/** + * @brief this is a backdoor to get complete audio params structure from dp_queue + * it is needed till pipeline 2.0 is ready + * + */ +static inline +struct sof_audio_stream_params *dp_queue_get_audio_params(struct dp_queue *dp_queue) +{ + CORE_CHECK_STRUCT(dp_queue); + return &dp_queue->audio_stream_params; +} + +/** + * @brief return true if the queue is shared between 2 cores + */ +static inline +bool dp_queue_is_shared(struct dp_queue *dp_queue) +{ + CORE_CHECK_STRUCT(dp_queue); + return !!(dp_queue->_flags & DP_QUEUE_MODE_SHARED); +} + +#endif /* __SOF_DP_QUEUE_H__ */ diff --git a/zephyr/CMakeLists.txt b/zephyr/CMakeLists.txt index e531eaabb690..1412508147c9 100644 --- a/zephyr/CMakeLists.txt +++ b/zephyr/CMakeLists.txt @@ -362,6 +362,9 @@ zephyr_library_sources( lib.c ) +if(CONFIG_ZEPHYR_DP_SCHEDULER) + zephyr_library_sources(${SOF_AUDIO_PATH}/dp_queue.c) +endif() if(CONFIG_SCHEDULE_DMA_SINGLE_CHANNEL AND NOT(CONFIG_DMA_DOMAIN)) zephyr_library_sources(${SOF_SRC_PATH}/schedule/dma_single_chan_domain.c) endif()