Skip to content

Commit

Permalink
fix: handle sources with same id from diff ingests (#95)
Browse files Browse the repository at this point in the history
* fix: handle sources with same id from diff ingests

* fix: correct exist-check
  • Loading branch information
Saelmala authored Nov 13, 2024
1 parent 5f5be89 commit bcb2bc3
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 19 deletions.
4 changes: 3 additions & 1 deletion src/api/ateliereLive/pipelines/streams/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ export async function createStream(
);

const pipelineSource = pipeline.sources?.find(
(source) => source.source_id === sourceId
(s) =>
s.source_id === sourceId &&
s.settings.ingest_name === source.ingest_name
);

const stream: PipelineStreamSettings = {
Expand Down
35 changes: 26 additions & 9 deletions src/api/manager/productions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ function deleteMonitoring(db: Db, productionId: string) {
export async function getProductionPipelineSourceAlignment(
productionId: string,
pipelineId: string,
sourceId: number
sourceId: number,
sourceIngestName: string
) {
const production = await getProduction(productionId);

Expand All @@ -118,7 +119,9 @@ export async function getProductionPipelineSourceAlignment(
);

const source = pipeline?.sources?.find(
(source) => String(source.source_id) === String(sourceId)
(source) =>
String(source.source_id) === String(sourceId) &&
source.settings.ingest_name === sourceIngestName
);

const alignment =
Expand All @@ -133,6 +136,7 @@ export async function setProductionPipelineSourceAlignment(
productionId: string,
pipelineId: string,
sourceId: number,
sourceIngestName: string,
alignment_ms: number
) {
const db = await getDatabase();
Expand All @@ -142,7 +146,9 @@ export async function setProductionPipelineSourceAlignment(
{
_id: new ObjectId(productionId),
'production_settings.pipelines.pipeline_id': pipelineId,
'production_settings.pipelines.sources.source_id': sourceId
'production_settings.pipelines.sources.source_id': sourceId,
'production_settings.pipelines.sources.settings.ingest_name':
sourceIngestName
},
{
$set: {
Expand All @@ -153,7 +159,10 @@ export async function setProductionPipelineSourceAlignment(
{
arrayFilters: [
{ 'p.pipeline_id': pipelineId },
{ 's.source_id': sourceId }
{
's.source_id': sourceId,
's.settings.ingest_name': sourceIngestName
}
]
}
);
Expand All @@ -173,7 +182,8 @@ export async function setProductionPipelineSourceAlignment(
export async function getProductionSourceLatency(
productionId: string,
pipelineId: string,
sourceId: number
sourceId: number,
sourceIngestName: string
) {
const production = await getProduction(productionId);

Expand All @@ -187,7 +197,9 @@ export async function getProductionSourceLatency(
);

const source = pipeline?.sources?.find(
(source) => String(source.source_id) === String(sourceId)
(source) =>
String(source.source_id) === String(sourceId) &&
source.settings.ingest_name === sourceIngestName
);

const latency =
Expand All @@ -197,11 +209,11 @@ export async function getProductionSourceLatency(

return latency;
}

export async function setProductionPipelineSourceLatency(
productionId: string,
pipelineId: string,
sourceId: number,
sourceIngestName: string,
max_network_latency_ms: number
) {
const db = await getDatabase();
Expand All @@ -211,7 +223,9 @@ export async function setProductionPipelineSourceLatency(
{
_id: new ObjectId(productionId),
'production_settings.pipelines.pipeline_id': pipelineId,
'production_settings.pipelines.sources.source_id': sourceId
'production_settings.pipelines.sources.source_id': sourceId,
'production_settings.pipelines.sources.settings.ingest_name':
sourceIngestName
},
{
$set: {
Expand All @@ -222,7 +236,10 @@ export async function setProductionPipelineSourceLatency(
{
arrayFilters: [
{ 'p.pipeline_id': pipelineId },
{ 's.source_id': sourceId }
{
's.source_id': sourceId,
's.settings.ingest_name': sourceIngestName
}
]
}
);
Expand Down
9 changes: 7 additions & 2 deletions src/api/manager/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ async function connectIngestSources(
);

const pipelineSource = pipeline.sources?.find(
(source) => source.source_id === sourceId
(s) =>
s.source_id === sourceId &&
s.settings.ingest_name === source.ingest_name
);

const stream: PipelineStreamSettings = {
Expand Down Expand Up @@ -874,12 +876,15 @@ export async function startProduction(
);

const currentSettings = pipeline.sources?.find(
(s) => s.source_id === sourceId
(s) =>
s.source_id === sourceId &&
s.settings.ingest_name === source.ingest_name
)?.settings;

return {
source_id: sourceId || 0,
settings: {
ingest_name: source.ingest_name,
alignment_ms:
currentSettings?.alignment_ms ?? pipeline.alignment_ms,
max_network_latency_ms:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export async function GET(
? await getProductionPipelineSourceAlignment(
params.id,
params.pipeline_id,
sourceId
sourceId,
params.ingest_name
)
: 0;

Expand All @@ -49,7 +50,8 @@ export async function GET(
? await getProductionSourceLatency(
params.id,
params.pipeline_id,
sourceId
sourceId,
params.ingest_name
)
: 0;

Expand Down Expand Up @@ -93,12 +95,14 @@ export async function PUT(
params.id,
params.pipeline_id,
sourceId,
params.ingest_name,
alignment
);
const latencyResult = await setProductionPipelineSourceLatency(
params.id,
params.pipeline_id,
sourceId,
params.ingest_name,
latency
);

Expand Down
17 changes: 12 additions & 5 deletions src/app/production/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,13 @@ export default function ProductionConfiguration({ params }: PageProps) {
pipelines: productionSetup.production_settings.pipelines.map(
(pipeline) => {
if (pipeline.pipeline_id === pipeline_uuid) {
pipeline.sources?.map((source) => {
if (source.source_id === sourceId) {
source.settings.alignment_ms = alignment;
source.settings.max_network_latency_ms = latency;
pipeline.sources?.map((s) => {
if (
s.source_id === sourceId &&
s.settings.ingest_name === source.ingest_name
) {
s.settings.alignment_ms = alignment;
s.settings.max_network_latency_ms = latency;
}
});
}
Expand Down Expand Up @@ -598,13 +601,16 @@ export default function ProductionConfiguration({ params }: PageProps) {
source.ingest_source_name
),
settings: {
ingest_name: source.ingest_name,
alignment_ms: pipeline.alignment_ms,
max_network_latency_ms: pipeline.max_network_latency_ms
}
};

const exists = pipeline.sources?.some(
(s) => s.source_id === newSource.source_id
(s) =>
s.source_id === newSource.source_id &&
s.settings.ingest_name === newSource.settings.ingest_name
);

const updatedSources = exists
Expand Down Expand Up @@ -730,6 +736,7 @@ export default function ProductionConfiguration({ params }: PageProps) {
selectedSource.ingest_source_name
),
settings: {
ingest_name: selectedSource.ingest_name,
alignment_ms: pipeline.alignment_ms,
max_network_latency_ms: pipeline.max_network_latency_ms
}
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface SrtOutput {
export interface PipelineSource {
source_id: number;
settings: {
ingest_name: string;
alignment_ms?: number;
max_network_latency_ms?: number;
};
Expand Down

0 comments on commit bcb2bc3

Please sign in to comment.