From be9e96bd5a3362d1e27da35230edd70dfbcb164b Mon Sep 17 00:00:00 2001 From: clouless Date: Sun, 30 Dec 2018 12:21:59 +0100 Subject: [PATCH] single ws connection per client and max 5 concurrent compress jobs --- package.json | 2 +- src/app/services/backend.service.ts | 52 ++++++++++++------ src/app/types/kartoffelstampf-server.ts | 7 +-- src/app/upload-page/upload-page.component.ts | 55 +++++++++++--------- 4 files changed, 69 insertions(+), 47 deletions(-) diff --git a/package.json b/package.json index 83be591..7533bfd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "kartoffelstampf-client", - "version": "2.1.2", + "version": "2.3.0", "license": "MIT", "scripts": { "ng": "ng", diff --git a/src/app/services/backend.service.ts b/src/app/services/backend.service.ts index f6c31e5..b3f831e 100644 --- a/src/app/services/backend.service.ts +++ b/src/app/services/backend.service.ts @@ -7,7 +7,7 @@ import { } from '../types/kartoffelstampf-server'; import { Observable, Subject, throwError } from 'rxjs'; import { HttpClient, HttpHeaders, HttpErrorResponse } from '@angular/common/http'; -import { catchError } from 'rxjs/operators'; +import { catchError, filter, takeWhile } from 'rxjs/operators'; const httpOptions = { headers: new HttpHeaders({ @@ -22,8 +22,14 @@ export class BackendService { private restApiBaseUrl: string; private webSocketBaseUrl: string; + private ws: WebSocket; + private subject = new Subject(); + constructor(private http: HttpClient) { + const self = this; + // // Autodetect URLs + // const hostname = window.location.hostname; const protocol = window.location.protocol; const port = window.location.port; @@ -41,6 +47,22 @@ export class BackendService { this.restApiBaseUrl = `http://localhost:9999`; this.webSocketBaseUrl = `ws://localhost:9999`; } + // + // Connect + // + self.ws = new WebSocket(`${this.webSocketBaseUrl}/`); + self.ws.onclose = function(event: CloseEvent) { + console.log('websocket onclose', event); + self.subject.complete(); + }; + self.ws.onmessage = function(event: MessageEvent) { + const kartoffelstampfTerminalOutputEntry: KartoffelstampfTerminalOutputEntry = JSON.parse(event.data); + self.subject.next(kartoffelstampfTerminalOutputEntry); + }; + self.ws.onerror = function(event: ErrorEvent) { + console.log('websocket onerror', event); + self.subject.complete(); + }; } getDownloadUrl(temporaryFileName: string, originalFileName: string) { @@ -55,22 +77,18 @@ export class BackendService { } runCompressCommand(compressInstruction: KartoffelstampfCompressInstruction): Observable { - const ws = new WebSocket(`${this.webSocketBaseUrl}/`); - const subject = new Subject(); - ws.onopen = function (event) { - ws.send(JSON.stringify(compressInstruction)); - }; - ws.onmessage = function(event: MessageEvent) { - const kartoffelstampfTerminalOutputEntry: KartoffelstampfTerminalOutputEntry = JSON.parse(event.data); - subject.next(kartoffelstampfTerminalOutputEntry); - }; - ws.onerror = function (event) { - console.log('websocket onerror', event); - }; - ws.onclose = function (event) { - subject.complete(); - }; - return subject.asObservable(); + this.ws.send(JSON.stringify(compressInstruction)); + // Use single websocket connection and distinguish messages by compressInstruction + // The last message sent by the server per compressJob should be type=DONE. This is where we unsubscribe. + return this.subject + .asObservable() + .pipe( + filter(e => + e.compressInstruction.compressType === compressInstruction.compressType && + e.compressInstruction.temporaryFileName === compressInstruction.temporaryFileName + ), + takeWhile(data => data.type !== 'DONE'), + ); } } diff --git a/src/app/types/kartoffelstampf-server.ts b/src/app/types/kartoffelstampf-server.ts index 4b3d0ac..6290214 100644 --- a/src/app/types/kartoffelstampf-server.ts +++ b/src/app/types/kartoffelstampf-server.ts @@ -2,13 +2,10 @@ // Typings: https://github.com/codeclou/karteoffelstampf-server // -export interface KartoffelstampfTerminalOutputEntryPayload { - text: string; -} - export interface KartoffelstampfTerminalOutputEntry { - payload: KartoffelstampfTerminalOutputEntryPayload; + payload: any; type: string; + compressInstruction: KartoffelstampfCompressInstruction; } export interface KartoffelstampfImageUploadRequest { diff --git a/src/app/upload-page/upload-page.component.ts b/src/app/upload-page/upload-page.component.ts index 0fe76c5..21aff2a 100644 --- a/src/app/upload-page/upload-page.component.ts +++ b/src/app/upload-page/upload-page.component.ts @@ -2,7 +2,7 @@ import { Component, OnInit, OnDestroy } from '@angular/core'; import { BackendService } from '../services/backend.service'; import { KartoffelstampfTerminalOutputEntry, KartoffelstampfCompressInstruction } from '../types/kartoffelstampf-server'; import { TerminalLine, CompressImageJobItem } from '../types/kartoffelstampf-client'; -import { finalize, takeUntil } from 'rxjs/operators'; +import { finalize, takeUntil, takeWhile, endWith } from 'rxjs/operators'; import { Subject, throwError, of, EMPTY } from 'rxjs'; import { catchError } from 'rxjs/operators'; import { HttpErrorResponse } from '@angular/common/http'; @@ -57,6 +57,8 @@ export class UploadPageComponent implements OnInit, OnDestroy { uiStateDragLeave = true; activeStep = 1; + concurrentJobLimit = 5; + concurrentJobCount = 0; constructor(private backendService: BackendService) { } @@ -146,30 +148,35 @@ export class UploadPageComponent implements OnInit, OnDestroy { runCompressCommand(job: CompressImageJobItem) { const self = this; - self.backendService.runCompressCommand({ - compressType: KartoffelstampfCompressInstruction.COMPRESS_TYPE_LOSSLESS, - temporaryFileName: job.temporaryFileName, - }) - .pipe( - finalize(() => { - job.compressDone = true; - }), - takeUntil(self.preDestroy) - ) - .subscribe(data => { - if (data.type === 'compressResult') { - job.compressedSize = data.payload['compressedSize']; - } else { - const terminalLine = new TerminalLine(data); - const previousTerminalLine = job.terminalLines[job.terminalLines.length - 1]; - if (previousTerminalLine !== undefined && - previousTerminalLine.clearLine === true && - terminalLine.clearLine === true) { - job.terminalLines.pop(); - } - job.terminalLines.push(terminalLine); + const intervallId = setInterval(function() { + if (self.concurrentJobCount < self.concurrentJobLimit) { + clearInterval(intervallId); + self.concurrentJobCount = self.concurrentJobCount + 1; + self.backendService.runCompressCommand({ + compressType: KartoffelstampfCompressInstruction.COMPRESS_TYPE_LOSSLESS, + temporaryFileName: job.temporaryFileName, + }) + .pipe( + finalize(() => self.concurrentJobCount = self.concurrentJobCount - 1), + takeUntil(self.preDestroy), + ) + .subscribe(data => { + if (data.type === 'compressResult') { + job.compressedSize = data.payload['compressedSize']; + job.compressDone = true; + } else { + const terminalLine = new TerminalLine(data); + const previousTerminalLine = job.terminalLines[job.terminalLines.length - 1]; + if (previousTerminalLine !== undefined && + previousTerminalLine.clearLine === true && + terminalLine.clearLine === true) { + job.terminalLines.pop(); + } + job.terminalLines.push(terminalLine); + } + }); } - }); + }, 300); } }