Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: segment not found issues (SOFIE-3626) #1325

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/corelib/src/dataModel/Segment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { SegmentNote } from './Notes'
export enum SegmentOrphanedReason {
/** Segment is deleted from the NRCS but we still need it */
DELETED = 'deleted',
/** Segment should be hidden, but it is still playing */
/** Blueprints want the Segment to be hidden, but it is still playing so is must not be hidden right now. */
HIDDEN = 'hidden',
/** Segment is owned by playout, and is for AdlibTesting in its rundown */
ADLIB_TESTING = 'adlib-testing',
Expand Down
133 changes: 102 additions & 31 deletions packages/job-worker/src/ingest/commit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
import { DatabasePersistedModel } from '../modelBase'
import { updateSegmentIdsForAdlibbedPartInstances } from './commit/updateSegmentIdsForAdlibbedPartInstances'
import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError'
import { AnyBulkWriteOperation } from 'mongodb'

export type BeforePartMapItem = { id: PartId; rank: number }
export type BeforeIngestOperationPartMap = ReadonlyMap<SegmentId, Array<BeforePartMapItem>>
Expand Down Expand Up @@ -177,6 +178,9 @@ export async function CommitIngestOperation(
// Ensure any adlibbed parts are updated to follow the segmentId of the previous part
await updateSegmentIdsForAdlibbedPartInstances(context, ingestModel, beforePartMap)

if (data.renamedSegments && data.renamedSegments.size > 0) {
logger.debug(`Renamed segments: ${JSON.stringify(Array.from(data.renamedSegments.entries()))}`)
}
// ensure instances have matching segmentIds with the parts
await updatePartInstancesSegmentIds(context, ingestModel, data.renamedSegments, beforePartMap)

Expand Down Expand Up @@ -259,10 +263,16 @@ export async function CommitIngestOperation(
}

function canRemoveSegment(
prevPartInstance: ReadonlyDeep<DBPartInstance> | undefined,
currentPartInstance: ReadonlyDeep<DBPartInstance> | undefined,
nextPartInstance: ReadonlyDeep<DBPartInstance> | undefined,
segmentId: SegmentId
): boolean {
if (prevPartInstance?.segmentId === segmentId) {
// Don't allow removing an active rundown
logger.warn(`Not allowing removal of previous playing segment "${segmentId}", making segment unsynced instead`)
return false
}
if (
currentPartInstance?.segmentId === segmentId ||
(nextPartInstance?.segmentId === segmentId && isTooCloseToAutonext(currentPartInstance, false))
Expand Down Expand Up @@ -295,26 +305,32 @@ async function updatePartInstancesSegmentIds(
renamedSegments: ReadonlyMap<SegmentId, SegmentId> | null,
beforePartMap: BeforeIngestOperationPartMap
) {
// A set of rules which can be translated to mongo queries for PartInstances to update
/**
* Maps new SegmentId ->
* A set of rules which can be translated to mongo queries for PartInstances to update
*/
const renameRules = new Map<
SegmentId,
{
/** Parts that have been moved to the new SegmentId */
partIds: PartId[]
fromSegmentId: SegmentId | null
/** Segments that have been renamed to the new SegmentId */
fromSegmentIds: SegmentId[]
}
>()

// Add whole segment renames to the set of rules
if (renamedSegments) {
for (const [fromSegmentId, toSegmentId] of renamedSegments) {
const rule = renameRules.get(toSegmentId) ?? { partIds: [], fromSegmentId: null }
const rule = renameRules.get(toSegmentId) ?? { partIds: [], fromSegmentIds: [] }
renameRules.set(toSegmentId, rule)

rule.fromSegmentId = fromSegmentId
rule.fromSegmentIds.push(fromSegmentId)
}
}

// Reverse the structure
// Reverse the Map structure
/** Maps Part -> SegmentId-of-the-part-before-ingest-changes */
const beforePartSegmentIdMap = new Map<PartId, SegmentId>()
for (const [segmentId, partItems] of beforePartMap.entries()) {
for (const partItem of partItems) {
Expand All @@ -325,8 +341,11 @@ async function updatePartInstancesSegmentIds(
// Some parts may have gotten a different segmentId to the base rule, so track those seperately in the rules
for (const partModel of ingestModel.getAllOrderedParts()) {
const oldSegmentId = beforePartSegmentIdMap.get(partModel.part._id)

if (oldSegmentId && oldSegmentId !== partModel.part.segmentId) {
const rule = renameRules.get(partModel.part.segmentId) ?? { partIds: [], fromSegmentId: null }
// The part has moved to another segment, add a rule to update its corresponding PartInstances:

const rule = renameRules.get(partModel.part.segmentId) ?? { partIds: [], fromSegmentIds: [] }
renameRules.set(partModel.part.segmentId, rule)

rule.partIds.push(partModel.part._id)
Expand All @@ -335,30 +354,80 @@ async function updatePartInstancesSegmentIds(

// Perform a mongo update to modify the PartInstances
if (renameRules.size > 0) {
await context.directCollections.PartInstances.bulkWrite(
Array.from(renameRules.entries()).map(([newSegmentId, rule]) => ({
updateMany: {
filter: {
$or: _.compact([
rule.fromSegmentId
? {
segmentId: rule.fromSegmentId,
}
: undefined,
{
'part._id': { $in: rule.partIds },
const rulesInOrder = Array.from(renameRules.entries()).sort((a, b) => {
// Ensure that the ones with partIds are processed last,
// as that should take precedence.

if (a[1].partIds.length && !b[1].partIds.length) return 1
if (!a[1].partIds.length && b[1].partIds.length) return -1
return 0
})

const writeOps: AnyBulkWriteOperation<DBPartInstance>[] = []

for (const [newSegmentId, rule] of rulesInOrder) {
if (rule.fromSegmentIds.length) {
writeOps.push({
updateMany: {
filter: {
rundownId: ingestModel.rundownId,
segmentId: { $in: rule.fromSegmentIds },
},
update: {
$set: {
segmentId: newSegmentId,
'part.segmentId': newSegmentId,
},
]),
},
},
update: {
$set: {
segmentId: newSegmentId,
'part.segmentId': newSegmentId,
})
}
if (rule.partIds.length) {
writeOps.push({
updateMany: {
filter: {
rundownId: ingestModel.rundownId,
'part._id': { $in: rule.partIds },
},
update: {
$set: {
segmentId: newSegmentId,
'part.segmentId': newSegmentId,
},
},
},
},
}))
)
})
}
}
if (writeOps.length) await context.directCollections.PartInstances.bulkWrite(writeOps)

// Double check that there are no parts using the old segment ids:
const oldSegmentIds = Array.from(renameRules.keys())
const [badPartInstances, badParts] = await Promise.all([
await context.directCollections.PartInstances.findFetch({
rundownId: ingestModel.rundownId,
segmentId: { $in: oldSegmentIds },
}),
await context.directCollections.Parts.findFetch({
rundownId: ingestModel.rundownId,
segmentId: { $in: oldSegmentIds },
}),
])
if (badPartInstances.length > 0) {
logger.error(
`updatePartInstancesSegmentIds: Failed to update all PartInstances using old SegmentIds "${JSON.stringify(
oldSegmentIds
)}": ${JSON.stringify(badPartInstances)}, writeOps: ${JSON.stringify(writeOps)}`
)
}

if (badParts.length > 0) {
logger.error(
`updatePartInstancesSegmentIds: Failed to update all Parts using old SegmentIds "${JSON.stringify(
oldSegmentIds
)}": ${JSON.stringify(badParts)}, writeOps: ${JSON.stringify(writeOps)}`
)
}
}
}

Expand Down Expand Up @@ -662,7 +731,7 @@ async function removeSegments(
_changedSegmentIds: ReadonlyDeep<SegmentId[]>,
removedSegmentIds: ReadonlyDeep<SegmentId[]>
) {
const { currentPartInstance, nextPartInstance } = await getSelectedPartInstances(
const { previousPartInstance, currentPartInstance, nextPartInstance } = await getSelectedPartInstances(
context,
newPlaylist,
rundownsInPlaylist.map((r) => r._id)
Expand All @@ -672,7 +741,7 @@ async function removeSegments(
const orphanDeletedSegmentIds = new Set<SegmentId>()
const orphanHiddenSegmentIds = new Set<SegmentId>()
for (const segmentId of removedSegmentIds) {
if (canRemoveSegment(currentPartInstance, nextPartInstance, segmentId)) {
if (canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
purgeSegmentIds.add(segmentId)
} else {
logger.warn(
Expand All @@ -685,8 +754,10 @@ async function removeSegments(
for (const segment of ingestModel.getAllSegments()) {
const segmentId = segment.segment._id
if (segment.segment.isHidden) {
if (!canRemoveSegment(currentPartInstance, nextPartInstance, segmentId)) {
// Protect live segment from being hidden
// Blueprints want to hide the Segment

if (!canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
// The Segment is live, so we need to protect it from being hidden
logger.warn(`Cannot hide live segment ${segmentId}, it will be orphaned`)
switch (segment.segment.orphaned) {
case SegmentOrphanedReason.DELETED:
Expand All @@ -706,7 +777,7 @@ async function removeSegments(
} else if (!orphanDeletedSegmentIds.has(segmentId) && segment.parts.length === 0) {
// No parts in segment

if (!canRemoveSegment(currentPartInstance, nextPartInstance, segmentId)) {
if (!canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
// Protect live segment from being hidden
logger.warn(`Cannot hide live segment ${segmentId}, it will be orphaned`)
orphanHiddenSegmentIds.add(segmentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export interface IngestSegmentModel extends IngestSegmentModelReadonly {
setOrphaned(orphaned: SegmentOrphanedReason | undefined): void

/**
* Mark this Part as being hidden
* Mark this Segment as being hidden
* @param hidden New hidden state
*/
setHidden(hidden: boolean): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ export class DocumentChangeTracker<TDoc extends { _id: ProtectedString<any> }> {
}
}

getDeletedIds(): TDoc['_id'][] {
return Array.from(this.#deletedIds.values())
}

/**
* Generate the mongodb BulkWrite operations for the documents known to this tracker
* @returns mongodb BulkWrite operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { JobContext } from '../../../jobs'
import { ExpectedPackagesStore } from './ExpectedPackagesStore'
import { IngestSegmentModelImpl } from './IngestSegmentModelImpl'
import { DocumentChangeTracker } from './DocumentChangeTracker'
import { logger } from '../../../logging'
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'

export class SaveIngestModelHelper {
#expectedPackages = new DocumentChangeTracker<ExpectedPackageDB>()
Expand Down Expand Up @@ -55,6 +57,23 @@ export class SaveIngestModelHelper {
}

commit(context: JobContext): Array<Promise<unknown>> {
// Log deleted ids:
const deletedIds: { [key: string]: ProtectedString<any>[] } = {
expectedPackages: this.#expectedPackages.getDeletedIds(),
expectedPlayoutItems: this.#expectedPlayoutItems.getDeletedIds(),
expectedMediaItems: this.#expectedMediaItems.getDeletedIds(),
segments: this.#segments.getDeletedIds(),
parts: this.#parts.getDeletedIds(),
pieces: this.#pieces.getDeletedIds(),
adLibPieces: this.#adLibPieces.getDeletedIds(),
adLibActions: this.#adLibActions.getDeletedIds(),
}
for (const [key, ids] of Object.entries<ProtectedString<any>[]>(deletedIds)) {
if (ids.length > 0) {
logger.debug(`Deleted ${key}: ${JSON.stringify(ids)} `)
}
}

return [
context.directCollections.ExpectedPackages.bulkWrite(this.#expectedPackages.generateWriteOps()),
context.directCollections.ExpectedPlayoutItems.bulkWrite(this.#expectedPlayoutItems.generateWriteOps()),
Expand Down
22 changes: 21 additions & 1 deletion packages/job-worker/src/playout/infinites.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { SegmentOrphanedReason } from '@sofie-automation/corelib/dist/dataModel/
import { sortRundownIDsInPlaylist } from '@sofie-automation/corelib/dist/playout/playlist'
import { mongoWhere } from '@sofie-automation/corelib/dist/mongo'
import { PlayoutRundownModel } from './model/PlayoutRundownModel'
import { logger } from '../logging'

/** When we crop a piece, set the piece as "it has definitely ended" this far into the future. */
export const DEFINITELY_ENDED_FUTURE_DURATION = 1 * 1000
Expand Down Expand Up @@ -330,7 +331,26 @@ export function getPieceInstancesForPart(
if (!playingRundown) throw new Error(`Rundown "${playingPartInstance.partInstance.rundownId}" not found!`)

playingSegment = playingRundown.getSegment(playingPartInstance.partInstance.segmentId)
if (!playingSegment) throw new Error(`Segment "${playingPartInstance.partInstance.segmentId}" not found!`)
if (!playingSegment) {
const rundownId = playingRundown.rundown._id
context.directCollections.Segments.findFetch({
rundownId: rundownId,
})
.then((segment) => {
logger.error(
`TROUBLESHOOT: Segment not found, rundown "${rundownId}", segments in db: ${JSON.stringify(
segment.map((s) => s._id)
)}`
)
})
.catch((e) => logger.error(e))

throw new Error(
`Segment "${playingPartInstance.partInstance.segmentId}" in Rundown "${
playingRundown.rundown._id
}" not found! (other segments: ${JSON.stringify(playingRundown.segments.map((s) => s.segment._id))})`
)
}
}

const segment = rundown.getSegment(part.segmentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { PlayoutModel, PlayoutModelPreInit } from '../PlayoutModel'
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
import { RundownBaselineObj } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineObj'
import { sortRundownsWithinPlaylist } from '@sofie-automation/corelib/dist/playout/playlist'
import { logger } from '../../../logging'

/**
* Load a PlayoutModelPreInit for the given RundownPlaylist
Expand Down Expand Up @@ -188,7 +189,7 @@ async function loadRundowns(
context.directCollections.Segments.findFetch({
$or: [
{
// In a different rundown
// Either in rundown when ingestModel === null or not available in ingestModel
rundownId: { $in: loadRundownIds },
},
{
Expand Down Expand Up @@ -233,14 +234,25 @@ async function loadRundowns(
}
}

return rundowns.map(
(rundown) =>
new PlayoutRundownModelImpl(
rundown,
groupedSegmentsWithParts.get(rundown._id) ?? [],
groupedBaselineObjects.get(rundown._id) ?? []
return rundowns.map((rundown) => {
const groupedSegmentsWithPartsForRundown = groupedSegmentsWithParts.get(rundown._id)
if (!groupedSegmentsWithPartsForRundown) {
logger.debug(
`groupedSegmentsWithPartsForRundown for Rundown "${rundown._id}" is undefined (has the rundown no segments?)`
)
)
}
const groupedBaselineObjectsForRundown = groupedBaselineObjects.get(rundown._id)
if (!groupedBaselineObjectsForRundown)
logger.debug(
`groupedBaselineObjectsForRundown for Rundown "${rundown._id}" is undefined (has the rundown no baseline objects?)`
)

return new PlayoutRundownModelImpl(
rundown,
groupedSegmentsWithPartsForRundown ?? [],
groupedBaselineObjectsForRundown ?? []
)
})
}

async function loadPartInstances(
Expand Down
Loading
Loading