Skip to content

Commit

Permalink
backend: Fixed bug with recording location added in 1.8.4 egress version
Browse files Browse the repository at this point in the history
  • Loading branch information
CSantosM committed Oct 16, 2024
1 parent 41b087d commit aca15ee
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 75 deletions.
40 changes: 17 additions & 23 deletions backend/src/helpers/recording.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export class RecordingHelper {
const startedAt = RecordingHelper.extractCreatedAt(egressInfo);
const endTimeInMilliseconds = RecordingHelper.extractEndedAt(egressInfo);
const filename = RecordingHelper.extractFilename(egressInfo);
const location = RecordingHelper.extractLocation(egressInfo);
return {
id: egressInfo.egressId,
roomName: egressInfo.roomName,
Expand All @@ -23,8 +22,7 @@ export class RecordingHelper {
startedAt,
endedAt: endTimeInMilliseconds,
duration,
size,
location
size
};
}

Expand Down Expand Up @@ -91,30 +89,26 @@ export class RecordingHelper {
}
}

static extractFilename(egressInfo: EgressInfo) {
return egressInfo.fileResults?.[0]?.filename.split('/').pop();
}
static extractFilename(recordingInfo: RecordingInfo): string | undefined;

static extractLocation(egressInfo: EgressInfo) {
// TODO: Implement this method
return egressInfo.fileResults?.[0]?.location;
}
static extractFilename(egressInfo: EgressInfo): string | undefined;

static extractFileNameFromUrl(url: string | undefined): string | null {
if (!url) {
return null;
}
static extractFilename(info: RecordingInfo | EgressInfo): string | undefined {
if (!info) return undefined;

// Use a regular expression to capture the desired part of the URL
const regex = /https:\/\/[^\/]+\/(.+)/;
const match = url.match(regex);
if ('request' in info) {
// EgressInfo
return info.fileResults?.[0]?.filename.split('/').pop();
} else {
// RecordingInfo
const { roomName, filename, roomId } = info;

// Check if there is a match and extract the captured group
if (match && match[1]) {
return match[1];
}
if (!filename) {
return undefined;
}

throw new Error('The URL does not match the expected format.');
return roomName ? `${roomName}-${roomId}/${filename}` : filename;
}
}

/**
Expand Down Expand Up @@ -146,7 +140,7 @@ export class RecordingHelper {
*/
static extractCreatedAt(egressInfo: EgressInfo): number {
const { startedAt, updatedAt } = egressInfo;
const createdAt = startedAt && Number(startedAt) !== 0 ? startedAt : updatedAt ?? 0;
const createdAt = startedAt && Number(startedAt) !== 0 ? startedAt : (updatedAt ?? 0);
return this.toMilliseconds(Number(createdAt));
}

Expand Down
1 change: 0 additions & 1 deletion backend/src/models/recording.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ export interface RecordingInfo {
endedAt?: number;
duration?: number;
size?: number;
location?: string;
}
1 change: 1 addition & 0 deletions backend/src/services/livekit.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
EncodedFileOutput,
ListEgressOptions,
ParticipantInfo,
Room,
RoomCompositeOptions,
RoomServiceClient,
StreamOutput,
Expand Down
39 changes: 20 additions & 19 deletions backend/src/services/recording.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,21 @@ export class RecordingService {
active: true
};

const activeEgress = await this.livekitService.getEgress(egressOptions);
const [activeEgressResult, roomDataResult] = await Promise.allSettled([
this.livekitService.getEgress(egressOptions),
this.roomService.getRoom(roomName)
]);

if (activeEgress.length > 0) {
// Get the results of the promises
const activeEgress = activeEgressResult.status === 'fulfilled' ? activeEgressResult.value : null;
const roomData = roomDataResult.status === 'fulfilled' ? roomDataResult.value : null;

// If there is an active egress, it means that the recording is already started
if (!activeEgress || activeEgressResult.status === 'rejected') {
throw errorRecordingAlreadyStarted(roomName);
}

const recordingId = `${roomName}-${Date.now()}`;
const recordingId = `${roomName}-${roomData?.sid || Date.now()}`;
const options = this.generateCompositeOptionsFromRequest();
const output = this.generateFileOutputFromRequest(recordingId);
const egressInfo = await this.livekitService.startRoomComposite(roomName, output, options);
Expand Down Expand Up @@ -109,7 +117,7 @@ export class RecordingService {
throw errorRecordingNotStopped(egressId);
}

const recordingPath = RecordingHelper.extractFileNameFromUrl(recordingInfo.location);
const recordingPath = RecordingHelper.extractFilename(recordingInfo);

if (!recordingPath) throw internalError(`Error extracting path from recording ${egressId}`);

Expand Down Expand Up @@ -167,10 +175,12 @@ export class RecordingService {
// Get all recordings that match the room name and room ID from the S3 bucket
const roomNameSanitized = this.sanitizeRegExp(roomName);
const roomIdSanitized = this.sanitizeRegExp(roomId);
const regexPattern = `${roomNameSanitized}.*${roomIdSanitized}\\.json`;
// Match the room name and room ID in any order
const regexPattern = `${roomNameSanitized}.*${roomIdSanitized}|${roomIdSanitized}.*${roomNameSanitized}\\.json`;
const metadatagObject = await this.s3Service.listObjects('.metadata', regexPattern);

if (!metadatagObject.Contents || metadatagObject.Contents.length === 0) {
this.logger.verbose(`No recordings found for room ${roomName}. Returning an empty array.`);
return [];
}

Expand Down Expand Up @@ -205,8 +215,8 @@ export class RecordingService {
range?: string
): Promise<{ fileSize: number | undefined; fileStream: Readable; start?: number; end?: number }> {
const RECORDING_FILE_PORTION_SIZE = 5 * 1024 * 1024; // 5MB
const egressInfo = await this.getRecording(recordingId);
const recordingPath = RecordingHelper.extractFileNameFromUrl(egressInfo.location);
const recordingInfo: RecordingInfo = await this.getRecording(recordingId);
const recordingPath = RecordingHelper.extractFilename(recordingInfo);

if (!recordingPath) throw new Error(`Error extracting path from recording ${recordingId}`);

Expand Down Expand Up @@ -245,18 +255,9 @@ export class RecordingService {
* @param fileName - The name of the file (default is 'recording').
* @returns The generated file output object.
*/
private generateFileOutputFromRequest(recordingId: string, filePathOrFileName?: string): EncodedFileOutput {
let filepath: string;

if (!filePathOrFileName) filePathOrFileName = recordingId;

const isFilePath = filePathOrFileName.includes('/');

if (isFilePath) {
filepath = filePathOrFileName;
} else {
filepath = `${recordingId}/${filePathOrFileName}`;
}
private generateFileOutputFromRequest(recordingId: string): EncodedFileOutput {
// Added unique identifier to the file path for avoiding overwriting
const filepath = `${recordingId}/${recordingId}-${Date.now()}`;

return new EncodedFileOutput({
fileType: EncodedFileType.DEFAULT_FILETYPE,
Expand Down
27 changes: 1 addition & 26 deletions backend/src/services/s3.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {
_Object,
DeleteObjectCommand,
DeleteObjectCommandOutput,
DeleteObjectsCommand,
GetObjectCommand,
GetObjectCommandOutput,
HeadObjectCommand,
Expand Down Expand Up @@ -245,6 +244,7 @@ export class S3Service {
});
return await this.run(headParams);
} catch (error) {
this.logger.error(`Error getting header object from S3 in ${name}: ${error}`);
throw internalError(error);
}
}
Expand All @@ -253,31 +253,6 @@ export class S3Service {
this.s3.destroy();
}

// async uploadStreamToS3(key: string, fileStream: Readable, bucketName: string = CALL_AWS_S3_BUCKET) {
// try {
// const parallelUploads3 = new Upload({
// client: this.s3,
// params: {
// Bucket: bucketName,
// Key: key,
// Body: fileStream
// },
// partSize: 50 * 1024 * 1024, // 50 MB
// queueSize: 10 // 10 parallel uploads
// });

// parallelUploads3.on('httpUploadProgress', (progress: Progress) => {
// const uploadedMB = Math.round((progress.loaded ?? 0) / 1024 / 1024);
// this.logger.log(`Uploading ${progress.Key} file... ${uploadedMB} MB`);
// });

// return parallelUploads3.done();
// } catch (error) {
// this.logger.error('Error putting object in S3:', error);
// throw error;
// }
// }

private async getObject(
name: string,
bucket: string = CALL_S3_BUCKET,
Expand Down
17 changes: 11 additions & 6 deletions backend/src/services/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ export class WebhookService {
payload = RecordingHelper.toRecordingInfo(egressInfo);

// Add recording metadata
const s3Directory = payload.filename?.split('.')[0];
const path = `.metadata/${s3Directory}/${s3Directory}_${payload.id}_${payload.roomId}.json`;
const metadataPath = this.generateMetadataPath(payload);
await Promise.all([
this.s3Service.uploadObject(path, payload),
this.s3Service.uploadObject(metadataPath, payload),
this.roomService.sendSignal(roomName, payload, { topic })
]);

Expand Down Expand Up @@ -117,10 +116,9 @@ export class WebhookService {
payload = RecordingHelper.toRecordingInfo(egressInfo);

// Update recording metadata
const s3Directory = payload.filename?.split('.')[0];
const path = `.metadata/${s3Directory}/${s3Directory}_${payload.id}_${payload.roomId}.json`;
const metadataPath = this.generateMetadataPath(payload);
await Promise.all([
this.s3Service.uploadObject(path, payload),
this.s3Service.uploadObject(metadataPath, payload),
this.roomService.sendSignal(roomName, payload, { topic })
]);
} else {
Expand Down Expand Up @@ -177,4 +175,11 @@ export class WebhookService {
};
await this.roomService.sendSignal(roomName, payload, signalOptions);
}

private generateMetadataPath(payload: RecordingInfo): string {
const metadataFilename = `${payload.roomName}-${payload.roomId}`;
const recordingFilename = payload.filename?.split('.')[0];
const egressId = payload.id;
return `.metadata/${metadataFilename}/${recordingFilename}_${egressId}.json`;
}
}

0 comments on commit aca15ee

Please sign in to comment.