forked from ng-book/angular2-rxjs-chat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadsService.ts
84 lines (69 loc) · 2.83 KB
/
ThreadsService.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/// <reference path="../../typings/app.d.ts" />
import {Injectable, bind} from "angular2/angular2";
import * as Rx from "rx";
import {Thread, Message} from "../models";
import {MessagesService} from "./MessagesService";
import * as _ from "underscore";
@Injectable()
export class ThreadsService {
// `threads` is a observable that contains the most up to date list of threads
threads: Rx.Observable<{ [key: string]: Thread }>;
// `orderedThreads` contains a newest-first chronological list of threads
orderedThreads: Rx.Observable<Thread[]>;
// `currentThread` contains the currently selected thread
currentThread: Rx.Subject<Thread> =
new Rx.BehaviorSubject<Thread>(new Thread());
// `currentThreadMessages` contains the set of messages for the currently
// selected thread
currentThreadMessages: Rx.Observable<Message[]>;
constructor(public messagesService: MessagesService) {
this.threads = messagesService.messages
.map( (messages: Message[]) => {
let threads: {[key: string]: Thread} = {};
// Store the message's thread it in our accumulator `threads`
messages.map((message: Message) => {
threads[message.thread.id] = threads[message.thread.id] ||
message.thread;
// Cache the most recent message for each thread
let messagesThread: Thread = threads[message.thread.id];
if (!messagesThread.lastMessage ||
messagesThread.lastMessage.sentAt < message.sentAt) {
messagesThread.lastMessage = message;
}
});
return threads;
})
// share this stream across multiple subscribers and makes sure everyone
// receives the current list of threads when they first subscribe
.shareReplay(1);
this.orderedThreads = this.threads
.map((threadGroups: { [key: string]: Thread }) => {
let threads: Thread[] = _.values(threadGroups);
return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse();
})
.shareReplay(1);
this.currentThreadMessages = this.currentThread
.combineLatest(messagesService.messages,
(currentThread: Thread, messages: Message[]) => {
if (currentThread && messages.length > 0) {
return _.chain(messages)
.filter((message: Message) =>
(message.thread.id === currentThread.id))
.map((message: Message) => {
message.isRead = true;
return message; })
.value();
} else {
return [];
}
})
.shareReplay(1);
this.currentThread.subscribe(this.messagesService.markThreadAsRead);
}
setCurrentThread(newThread: Thread): void {
this.currentThread.onNext(newThread);
}
}
export var threadsServiceInjectables: Array<any> = [
bind(ThreadsService).toClass(ThreadsService)
];