Skip to content

Commit

Permalink
DP: connect dp_queue to processing DP modules
Browse files Browse the repository at this point in the history
DP component need to work flawlessly on different cores
than the main pipeline processing data independently
from the modules before and after them in the pipeline

To achieve this, a cross core producer-consumer safe
dp_queue should be used

DP queue can only be connected to modules that use
sink/src interface, so DP modules need to use it
obligatory.

To connect dp_queue into modules chain, double buffering
method is used:
 - in LL task DP module is processed as an LL module, but the
   copy method is copying data from/to audio_streams
   to/from dp_queues
 - the main DP module processing takes place in DP task
   (in separate Zephyr thread). The tread may be bind to
   separate core

Signed-off-by: Marcin Szkudlinski <[email protected]>
  • Loading branch information
marcinszkudlinski committed Aug 31, 2023
1 parent 60e6db7 commit dcbc7fa
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 91 deletions.
13 changes: 10 additions & 3 deletions src/audio/component.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,16 @@ int comp_copy(struct comp_dev *dev)

assert(dev->drv->ops.copy);

/* copy only if we are the owner of the LL component */
if (dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL &&
cpu_is_me(dev->ipc_config.core)) {
/* copy only if we are the owner of component OR this is DP component
*
* DP components (modules) require two stage processing:
* - in first step the pipeline (this) must copy data to/from module DP queues
* - second step will be performed by a thread specific to the DP module
*
* to be removed when pipeline 2.0 is ready
*/
if (cpu_is_me(dev->ipc_config.core) ||
dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP) {
#if CONFIG_PERFORMANCE_COUNTERS
perf_cnt_init(&dev->pcd);
#endif
Expand Down
228 changes: 224 additions & 4 deletions src/audio/module_adapter/module_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <sof/audio/module_adapter/module/generic.h>
#include <sof/audio/sink_api.h>
#include <sof/audio/source_api.h>
#include <sof/audio/sink_source_utils.h>
#include <sof/audio/dp_queue.h>
#include <sof/audio/pipeline.h>
#include <sof/common.h>
#include <sof/platform.h>
Expand Down Expand Up @@ -71,7 +73,16 @@ struct comp_dev *module_adapter_new(const struct comp_driver *drv,
dev->ipc_config = *config;
dev->drv = drv;

mod = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*mod));
/* allocate module information.
* for DP shared modules this struct must be accessible from all cores
* Unfortunalely at this point there's no information of components the module
* will be binded to. So we need to allocate shared memory for each DP module
* To be removed when pipeline 2.0 is ready
*/
enum mem_zone zone = config->proc_domain == COMP_PROCESSING_DOMAIN_DP ?
SOF_MEM_ZONE_RUNTIME_SHARED : SOF_MEM_ZONE_RUNTIME;

mod = rzalloc(zone, 0, SOF_MEM_CAPS_RAM, sizeof(*mod));
if (!mod) {
comp_err(dev, "module_adapter_new(), failed to allocate memory for module");
rfree(dev);
Expand Down Expand Up @@ -161,6 +172,12 @@ struct comp_dev *module_adapter_new(const struct comp_driver *drv,
goto err;
}

#if CONFIG_ZEPHYR_DP_SCHEDULER
/* create a task for DP processing */
if (config->proc_domain == COMP_PROCESSING_DOMAIN_DP)
pipeline_comp_dp_task_init(dev);
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

#if CONFIG_IPC_MAJOR_4
dst->init_data = NULL;
#endif
Expand Down Expand Up @@ -207,6 +224,118 @@ static int module_adapter_sink_src_prepare(struct comp_dev *dev)
return ret;
}

#if CONFIG_ZEPHYR_DP_SCHEDULER
static int module_adapter_dp_queue_prepare(struct comp_dev *dev)
{
struct list_item *blist;
struct processing_module *mod = comp_get_drvdata(dev);
int i;
int ret;
struct dp_queue *dp_queue;
int dp_mode = dev->is_shared ? DP_QUEUE_MODE_SHARED : DP_QUEUE_MODE_LOCAL;

/* for DP processing we need to create a DP QUEUE for each module input/output
* till pipeline2.0 is ready, DP processing requires double buffering
*
* first, set all parameters by calling "module prepare" with pointers to
* "main" audio_stream buffers
*/
ret = module_adapter_sink_src_prepare(dev);
if (ret)
return ret;

/*
* second step - create a "shadow" cross-core DpQueue for existing buffers
* and copy stream parameters to shadow buffers
*/
i = 0;
list_for_item(blist, &dev->bsource_list) {
struct comp_buffer *source_buffer =
container_of(blist, struct comp_buffer, sink_list);

/* copy IBS & OBS from buffer to be shadowed */
size_t ibs = source_get_ibs(audio_stream_get_source(&source_buffer->stream));
size_t obs = sink_get_obs(audio_stream_get_sink(&source_buffer->stream));

/* create a shadow dp queue */
dp_queue = dp_queue_create(ibs, obs, dp_mode);

if (!dp_queue)
goto err;

mod->dp_queue_ll_to_dp[i] = dp_queue;
/* it will override source pointers set by module_adapter_sink_src_prepare
* module will use shadow dpQueue for processing
*/
mod->sources[i] = dp_queue_get_source(dp_queue);

/* copy parameters from buffer to be shadowed */
memcpy_s(&dp_queue->audio_stream_params,
sizeof(dp_queue->audio_stream_params),
&source_buffer->stream.runtime_stream_params,
sizeof(source_buffer->stream.runtime_stream_params));
i++;
}
mod->num_of_sources = i;

i = 0;
list_for_item(blist, &dev->bsink_list) {
struct comp_buffer *sink_buffer =
container_of(blist, struct comp_buffer, source_list);

/* copy IBS & OBS from buffer to be shadowed */
size_t ibs = source_get_ibs(audio_stream_get_source(&sink_buffer->stream));
size_t obs = sink_get_obs(audio_stream_get_sink(&sink_buffer->stream));

/* create a shadow dp queue */
dp_queue = dp_queue_create(ibs, obs, dp_mode);

if (!dp_queue)
goto err;

mod->dp_queue_dp_to_ll[i] = dp_queue;
/* it will override sink pointers set by module_adapter_sink_src_prepare
* module will use shadow dpQueue for processing
*/
mod->sinks[i] = dp_queue_get_sink(dp_queue);

/* copy parameters from buffer to be shadowed */
memcpy_s(&dp_queue->audio_stream_params,
sizeof(dp_queue->audio_stream_params),
&sink_buffer->stream.runtime_stream_params,
sizeof(sink_buffer->stream.runtime_stream_params));

i++;
}
mod->num_of_sinks = i;

return 0;

err:
for (int i = 0; i < mod->num_of_sources; i++)
if (mod->dp_queue_ll_to_dp[i]) {
dp_queue_free(mod->dp_queue_ll_to_dp[i]);
mod->dp_queue_ll_to_dp[i] = NULL;
mod->sources[i] = NULL;
}
mod->num_of_sources = 0;

for (int i = 0; i < mod->num_of_sinks; i++)
if (mod->dp_queue_dp_to_ll[i]) {
dp_queue_free(mod->dp_queue_dp_to_ll[i]);
mod->dp_queue_dp_to_ll[i] = NULL;
mod->sinks[i] = NULL;
}
mod->num_of_sinks = 0;
return -ENOMEM;
}
#else
static int module_adapter_dp_queue_prepare(struct comp_dev *dev)
{
return -EINVAL;
}
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

/*
* \brief Prepare the module
* \param[in] dev - component device pointer.
Expand All @@ -229,11 +358,21 @@ int module_adapter_prepare(struct comp_dev *dev)
comp_dbg(dev, "module_adapter_prepare() start");

/* Prepare module */
if (IS_PROCESSING_MODE_SINK_SOURCE(mod))
if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP)
ret = module_adapter_dp_queue_prepare(dev);

else if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL)
ret = module_adapter_sink_src_prepare(dev);
else

else if ((IS_PROCESSING_MODE_RAW_DATA(mod) || IS_PROCESSING_MODE_AUDIO_STREAM(mod)) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL)
ret = module_prepare(mod, NULL, 0, NULL, 0);

else
ret = -EINVAL;

if (ret) {
if (ret != PPL_STATUS_PATH_STOP)
comp_err(dev, "module_adapter_prepare() error %x: module prepare failed",
Expand Down Expand Up @@ -272,6 +411,10 @@ int module_adapter_prepare(struct comp_dev *dev)
mod->period_bytes = audio_stream_period_bytes(&sink->stream, dev->frames);
comp_dbg(dev, "module_adapter_prepare(): got period_bytes = %u", mod->period_bytes);

/* no more to do for sink/source mode */
if (IS_PROCESSING_MODE_SINK_SOURCE(mod))
return 0;

mod->num_input_buffers = 0;
mod->num_output_buffers = 0;

Expand Down Expand Up @@ -972,6 +1115,61 @@ static int module_adapter_audio_stream_type_copy(struct comp_dev *dev)
return ret;
}

#if CONFIG_ZEPHYR_DP_SCHEDULER
static int module_adapter_copy_dp_queues(struct comp_dev *dev)
{
/*
* copy data from component audio streams to dp_queue
* DP module processing itself will take place in DP thread
* This is an adapter, to be removed when pipeline2.0 is ready
*/
struct processing_module *mod = comp_get_drvdata(dev);
int i = 0;
struct list_item *blist;

list_for_item(blist, &dev->bsource_list) {
/* input - we need to copy data from audio_stream (as source)
* to dp_queue (as sink)
*/
struct comp_buffer *buffer =
container_of(blist, struct comp_buffer, sink_list);
struct sof_source *data_src = audio_stream_get_source(&buffer->stream);

struct sof_sink *data_sink = dp_queue_get_sink(mod->dp_queue_ll_to_dp[i]);
uint32_t to_copy = MIN(sink_get_free_size(data_sink),
source_get_data_available(data_src));

source_to_sink_copy(data_src, data_sink, true, to_copy);

i++;
}

i = 0;
list_for_item(blist, &dev->bsink_list) {
/* output - we need to copy data from dp_queue (as source)
* to audio_stream (as sink)
*/
struct sof_source *data_src = dp_queue_get_source(mod->dp_queue_dp_to_ll[i]);
struct comp_buffer *buffer =
container_of(blist, struct comp_buffer, source_list);
struct sof_sink *data_sink = audio_stream_get_sink(&buffer->stream);

uint32_t to_copy = MIN(sink_get_free_size(data_sink),
source_get_data_available(data_src));

source_to_sink_copy(data_src, data_sink, true, to_copy);

i++;
}
return 0;
}
#else
static int module_adapter_copy_dp_queues(struct comp_dev *dev)
{
return -ENOTSUP;
}
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

static int module_adapter_sink_source_copy(struct comp_dev *dev)
{
struct processing_module *mod = comp_get_drvdata(dev);
Expand Down Expand Up @@ -1106,6 +1304,8 @@ int module_adapter_copy(struct comp_dev *dev)
comp_dbg(dev, "module_adapter_copy(): start");

struct processing_module *mod = comp_get_drvdata(dev);
if (mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP)
return module_adapter_copy_dp_queues(dev);

if (IS_PROCESSING_MODE_AUDIO_STREAM(mod))
return module_adapter_audio_stream_type_copy(dev);
Expand Down Expand Up @@ -1344,7 +1544,27 @@ int module_adapter_reset(struct comp_dev *dev)
mod->num_output_buffers = 0;
}

if (IS_PROCESSING_MODE_SINK_SOURCE(mod)) {
#if CONFIG_ZEPHYR_DP_SCHEDULER
if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP) {
for (int i = 0; i < mod->num_of_sources; i++) {
mod->sources[i] = NULL;
if (mod->dp_queue_ll_to_dp[i])
dp_queue_free(mod->dp_queue_ll_to_dp[i]);
mod->dp_queue_ll_to_dp[i] = NULL;
}
for (int i = 0; i < mod->num_of_sinks; i++) {
mod->sinks[i] = NULL;
if (mod->dp_queue_dp_to_ll[i])
dp_queue_free(mod->dp_queue_dp_to_ll[i]);
mod->dp_queue_ll_to_dp[i] = NULL;
}
}

#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL) {
for (int i = 0; i < mod->num_of_sources; i++)
mod->sources[i] = NULL;
for (int i = 0; i < mod->num_of_sinks; i++)
Expand Down
30 changes: 3 additions & 27 deletions src/audio/pipeline/pipeline-params.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ static int pipeline_comp_prepare(struct comp_dev *current,
struct pipeline_data *ppl_data = ctx->comp_data;
int stream_direction = dir;
int end_type;
int err;
int err = 0;

pipe_dbg(current->pipeline, "pipeline_comp_prepare(), current->comp.id = %u, dir = %u",
dev_comp_id(current), dir);
Expand Down Expand Up @@ -311,33 +311,9 @@ static int pipeline_comp_prepare(struct comp_dev *current,
}
}

switch (current->ipc_config.proc_domain) {
case COMP_PROCESSING_DOMAIN_LL:
/* this is a LL scheduled module */
if (current->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL)
/* init a task for LL module, DP task has been created in during init_instance */
err = pipeline_comp_ll_task_init(current->pipeline);
break;

#if CONFIG_ZEPHYR_DP_SCHEDULER
case COMP_PROCESSING_DOMAIN_DP:
/* this is a DP scheduled module */

/*
* workaround - because of some issues with cache, currently we can allow DP
* modules to run on the same core as LL pipeline only.
* to be removed once buffering is fixed
*/
if (current->pipeline->core != current->ipc_config.core)
err = -EINVAL;
else
err = pipeline_comp_dp_task_init(current);

break;
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

default:
err = -EINVAL;
}

if (err < 0)
return err;

Expand Down
10 changes: 7 additions & 3 deletions src/audio/pipeline/pipeline-schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <sof/schedule/ll_schedule.h>
#include <sof/schedule/dp_schedule.h>
#include <sof/schedule/schedule.h>
#include <sof/audio/module_adapter/module/generic.h>
#include <rtos/task.h>
#include <rtos/spinlock.h>
#include <rtos/string.h>
Expand Down Expand Up @@ -378,15 +379,18 @@ int pipeline_comp_ll_task_init(struct pipeline *p)
#if CONFIG_ZEPHYR_DP_SCHEDULER
static enum task_state dp_task_run(void *data)
{
struct comp_dev *comp = data;
struct processing_module *mod = data;

module_process_sink_src(mod, mod->sources, mod->num_input_buffers,
mod->sinks, mod->num_output_buffers);

comp->drv->ops.copy(comp);
return SOF_TASK_STATE_RESCHEDULE;
}

int pipeline_comp_dp_task_init(struct comp_dev *comp)
{
int ret;
struct processing_module *mod = comp_get_drvdata(comp);
struct task_ops ops = {
.run = dp_task_run,
.get_deadline = NULL,
Expand All @@ -397,7 +401,7 @@ int pipeline_comp_dp_task_init(struct comp_dev *comp)
ret = scheduler_dp_task_init(&comp->task,
SOF_UUID(dp_task_uuid),
&ops,
comp,
mod,
comp->ipc_config.core,
TASK_DP_STACK_SIZE,
ZEPHYR_DP_THREAD_PRIORITY);
Expand Down
Loading

0 comments on commit dcbc7fa

Please sign in to comment.