Skip to content

Commit

Permalink
Implemented async pgs loading. This enables the renderer to parse the…
Browse files Browse the repository at this point in the history
… subtitle stream while it's still downloading. All display sets in the completed download range can already be rendered and don't have to wait until the whole file is completed.
  • Loading branch information
Arcus92 committed Jul 11, 2024
1 parent 223b9b2 commit ef52682
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 46 deletions.
13 changes: 12 additions & 1 deletion src/pgs/displaySet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {PaletteDefinitionSegment} from "./paletteDefinitionSegment";
import {ObjectDefinitionSegment} from "./objectDefinitionSegment";
import {WindowDefinitionSegment} from "./windowDefinitionSegment";
import {SegmentType} from "./segmentType";
import {AsyncBinaryReader} from "../utils/asyncBinaryReader";

/**
* The PGS display set holds all data for the current subtitle update at a given timestamp.
Expand All @@ -22,7 +23,7 @@ export class DisplaySet {
* @param includeHeader If true, the magic-number and timestamps are read. If false, reading starts at the first
* segment.
*/
public read(reader: BigEndianBinaryReader, includeHeader: boolean) {
public async read(reader: BigEndianBinaryReader, includeHeader: boolean) {

// Clear
this.presentationTimestamp = 0;
Expand All @@ -32,6 +33,12 @@ export class DisplaySet {
this.objectDefinitions = [];
this.windowDefinitions = [];

// Handles async readers
let asyncReader: AsyncBinaryReader | undefined = undefined;
if ('requestData' in reader.baseReader) {
asyncReader = reader.baseReader as AsyncBinaryReader;
}

while (true)
{
let presentationTimestamp: number = 0;
Expand All @@ -40,6 +47,7 @@ export class DisplaySet {
// The header is included before every segment. Even for the end segment.
if (includeHeader)
{
await asyncReader?.requestData(10);
const magicNumber = reader.readUInt16();
if (magicNumber != 0x5047) {
throw new Error("Invalid magic number!");
Expand All @@ -49,8 +57,11 @@ export class DisplaySet {
decodingTimestamp = reader.readUInt32();
}

await asyncReader?.requestData(3);
const type = reader.readUInt8();
const size = reader.readUInt16()

await asyncReader?.requestData(size);
switch (type) {
case SegmentType.paletteDefinition:
const pds = new PaletteDefinitionSegment();
Expand Down
17 changes: 11 additions & 6 deletions src/pgsRenderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,20 @@ export class PgsRenderer {
public renderAtTimestamp(time: number): void {
time = time * 1000 * 90; // Convert to PGS time

// Find the last subtitle index for the given time stamp
// All position before and after the available timestamps are invalid (-1).
let index = -1;
for (const updateTimestamp of this.updateTimestamps) {
if (this.updateTimestamps.length > 0 && time < this.updateTimestamps[this.updateTimestamps.length - 1]) {

if (updateTimestamp > time) {
break;
// Find the last subtitle index for the given time stamp
for (const updateTimestamp of this.updateTimestamps) {

if (updateTimestamp > time) {
break;
}
index++;
}
index++;
}

// Only tell the worker, if the subtitle index was changed!
if (this.previousTimestampIndex === index) return;
this.previousTimestampIndex = index;
Expand All @@ -163,7 +168,7 @@ export class PgsRenderer {
private onWorkerMessage = (e: MessageEvent) => {
switch (e.data.op) {
// Is called once a subtitle file was loaded.
case 'loaded':
case 'updateTimestamps':
// Stores the update timestamps, so we don't need to push the timestamp to the worker on every tick.
// Instead, we push the timestamp index if it was changed.
this.updateTimestamps = e.data.updateTimestamps;
Expand Down
75 changes: 62 additions & 13 deletions src/pgsRendererInternal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import {PaletteDefinitionSegment} from "./pgs/paletteDefinitionSegment";
import {ObjectDefinitionSegment} from "./pgs/objectDefinitionSegment";
import {WindowDefinition} from "./pgs/windowDefinitionSegment";
import {Rect} from "./utils/rect";
import {StreamBinaryReader} from "./utils/streamBinaryReader";
import {BinaryReader} from "./utils/binaryReader";
import {ArrayBinaryReader} from "./utils/arrayBinaryReader";

export interface PgsLoadOptions {
/**
* Async pgs streams can return partial updates. When invoked, the `displaySets` and `updateTimestamps` are updated
* to the last available subtitle. There is a minimum threshold of one-second to prevent to many updates.
*/
onProgress?: () => void;
}

/**
* This handles the low-level PGS loading and rendering. This renderer can operate inside the web worker without being
Expand All @@ -29,26 +40,60 @@ export class PgsRendererInternal {
/**
* Loads the subtitle file from the given url.
* @param url The url to the PGS file.
* @param options Optional loading options. Use `onProgress` as callback for partial update while loading.
*/
public async loadFromUrlAsync(url: string): Promise<void> {
const result = await fetch(url);
const buffer = await result.arrayBuffer();
this.loadFromBuffer(buffer);
public async loadFromUrl(url: string, options?: PgsLoadOptions): Promise<void> {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`);
}
const stream = response.body?.getReader()!;
const reader = new StreamBinaryReader(stream)

await this.loadFromReader(reader, options);
}

/**
* Loads the subtitle file from the given buffer.
* @param buffer The PGS data.
* @param options Optional loading options. Use `onProgress` as callback for partial update while loading.
*/
public loadFromBuffer(buffer: ArrayBuffer): void {
public async loadFromBuffer(buffer: ArrayBuffer, options?: PgsLoadOptions): Promise<void> {
await this.loadFromReader(new ArrayBinaryReader(new Uint8Array(buffer)), options);
}

/**
* Loads the subtitle file from the given buffer.
* @param reader The PGS data reader.
* @param options Optional loading options. Use `onProgress` as callback for partial update while loading.
*/
public async loadFromReader(reader: BinaryReader, options?: PgsLoadOptions): Promise<void> {
this.displaySets = [];
this.updateTimestamps = [];
const reader = new BigEndianBinaryReader(new Uint8Array(buffer));
while (reader.position < reader.length) {

let lastUpdateTime = performance.now();

const bigEndianReader = new BigEndianBinaryReader(reader);
while (!reader.eof) {
const displaySet = new DisplaySet();
displaySet.read(reader, true);
await displaySet.read(bigEndianReader, true);
this.displaySets.push(displaySet);
this.updateTimestamps.push(displaySet.presentationTimestamp);

// For async loading, we support frequent progress updates. Sending one update for every new display set
// would be too much. Instead, we use a one-second threshold.
if (options?.onProgress) {
let now = performance.now();
if (now > lastUpdateTime + 1000) {
lastUpdateTime = now;
options.onProgress();
}
}
}

// Call final update.
if (options?.onProgress) {
options.onProgress();
}
}

Expand Down Expand Up @@ -78,14 +123,18 @@ export class PgsRendererInternal {
public renderAtTimestamp(time: number): void {
time = time * 1000 * 90; // Convert to PGS time

// Find the last subtitle index for the given time stamp
// All position before and after the available timestamps are invalid (-1).
let index = -1;
for (const updateTimestamp of this.updateTimestamps) {
if (this.updateTimestamps.length > 0 && time < this.updateTimestamps[this.updateTimestamps.length - 1]) {

if (updateTimestamp > time) {
break;
// Find the last subtitle index for the given time stamp
for (const updateTimestamp of this.updateTimestamps) {

if (updateTimestamp > time) {
break;
}
index++;
}
index++;
}

this.renderAtIndex(index);
Expand Down
4 changes: 4 additions & 0 deletions src/utils/arrayBinaryReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ export class ArrayBinaryReader implements BinaryReader {
return this.array.length;
}

public get eof(): boolean {
return this.position >= this.length;
}

public readByte(): number {
return this.array[this.$position++];
}
Expand Down
11 changes: 11 additions & 0 deletions src/utils/asyncBinaryReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import {BinaryReader} from "./binaryReader";

export interface AsyncBinaryReader extends BinaryReader {
/**
* Ensures that the given number of bytes is available to read synchronously.
* This will wait until the data is ready to read.
* @param count The number of bytes requested.
* @return Returns if the requested number of bytes could be loaded.
*/
requestData(count: number): Promise<boolean>;
}
36 changes: 20 additions & 16 deletions src/utils/bigEndianBinaryReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,55 @@ export class BigEndianBinaryReader {
/**
* The base binary reader.
*/
private readonly reader: BinaryReader;
public readonly baseReader: BinaryReader;

public constructor(buffer: BinaryReader | Uint8Array) {
if (buffer instanceof Uint8Array) {
this.reader = new ArrayBinaryReader(buffer);
this.baseReader = new ArrayBinaryReader(buffer);
}
else {
this.reader = buffer;
this.baseReader = buffer;
}
}

public get position(): number {
return this.reader.position;
return this.baseReader.position;
}

public get length(): number {
return this.reader.length;
return this.baseReader.length;
}

public get eof(): boolean {
return this.baseReader.eof;
}

public readUInt8(): number {
return this.reader.readByte();
return this.baseReader.readByte();
}

public readUInt16(): number {
const b1 = this.reader.readByte();
const b2 = this.reader.readByte();
const b1 = this.baseReader.readByte();
const b2 = this.baseReader.readByte();
return (b1 << 8) + b2;
}

public readUInt24(): number {
const b1 = this.reader.readByte();
const b2 = this.reader.readByte();
const b3 = this.reader.readByte();
const b1 = this.baseReader.readByte();
const b2 = this.baseReader.readByte();
const b3 = this.baseReader.readByte();
return (b1 << 16) + (b2 << 8) + b3;
}

public readUInt32(): number {
const b1 = this.reader.readByte();
const b2 = this.reader.readByte();
const b3 = this.reader.readByte();
const b4 = this.reader.readByte();
const b1 = this.baseReader.readByte();
const b2 = this.baseReader.readByte();
const b3 = this.baseReader.readByte();
const b4 = this.baseReader.readByte();
return (b1 << 24) + (b2 << 16) + (b3 << 8) + b4;
}

public readBytes(count: number): Uint8Array {
return this.reader.readBytes(count);
return this.baseReader.readBytes(count);
}
}
5 changes: 5 additions & 0 deletions src/utils/binaryReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ export interface BinaryReader {
*/
get length(): number;

/**
* Gets if the binary reader has reached the end of the data.
*/
get eof(): boolean;

/**
* Reads a single byte from this buffer.
*/
Expand Down
20 changes: 19 additions & 1 deletion src/utils/combinedBinaryReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {ArrayBinaryReader} from "./arrayBinaryReader";
*/
export class CombinedBinaryReader implements BinaryReader {
private readonly subReaders: BinaryReader[];
private readonly $length: number;
private $length: number;

private $position: number = 0;
private subReaderIndex: number = 0;
Expand All @@ -26,6 +26,20 @@ export class CombinedBinaryReader implements BinaryReader {
this.$length = length;
}

/**
* Adding another sub-reader to the collection.
* @param subReader The new sub-reader to add.
*/
public push(subReader: BinaryReader | Uint8Array) {
if (subReader instanceof Uint8Array) {
this.subReaders.push(new ArrayBinaryReader(subReader));
} else {
this.subReaders.push(subReader);
}

this.$length += subReader.length;
}

public get position(): number {
return this.$position;
}
Expand All @@ -34,6 +48,10 @@ export class CombinedBinaryReader implements BinaryReader {
return this.$length;
}

public get eof(): boolean {
return this.position >= this.length;
}

public readByte(): number {
while (this.subReaders[this.subReaderIndex].position >= this.subReaders[this.subReaderIndex].length) {
this.subReaderIndex++;
Expand Down
54 changes: 54 additions & 0 deletions src/utils/streamBinaryReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {AsyncBinaryReader} from "./asyncBinaryReader";
import {CombinedBinaryReader} from "./combinedBinaryReader";

/**
* A binary reader based on a readable stream. This can read a partially loaded stream - for example a download.
*/
export class StreamBinaryReader implements AsyncBinaryReader {
private readonly stream: ReadableStreamDefaultReader<Uint8Array>;
private readonly reader: CombinedBinaryReader;
private $eof: boolean = false;

public constructor(stream: ReadableStreamDefaultReader<Uint8Array>) {
this.stream = stream;
this.reader = new CombinedBinaryReader([]);
}

public get position(): number {
return this.reader.position;
}

public get length(): number {
return this.reader.length;
}

public get eof(): boolean {
return this.$eof;
}

public readByte(): number {
return this.reader.readByte();
}

public readBytes(count: number): Uint8Array {
return this.reader.readBytes(count);
}

public async requestData(count: number = 0): Promise<boolean> {
// Always try to peak one byte ahead to detect end-of-file early
while (this.reader.position + count + 1 > this.reader.length && !this.$eof) {
let { value, done } = await this.stream.read();

if (value) {
this.reader.push(value);
}

if (done) {
this.$eof = true;
}
}

// Returns if all data could be requested
return this.reader.position + count <= this.reader.length;
}
}
Loading

0 comments on commit ef52682

Please sign in to comment.