Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

add persistance #16

Draft
wants to merge 15 commits into
base: develop
Choose a base branch
from
22 changes: 12 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'org.springframework.boot' version '2.7.2'
id 'io.spring.dependency-management' version '1.0.12.RELEASE'
id 'java'
}

Expand All @@ -20,17 +20,19 @@ repositories {
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.1'
implementation 'org.springframework.boot:spring-boot-configuration-processor:2.7.1'
implementation 'org.springframework.boot:spring-boot-starter-validation:2.7.1'
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive:2.7.1'
runtimeOnly 'org.springframework.boot:spring-boot-devtools:2.7.1'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-configuration-processor'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
runtimeOnly 'org.postgresql:postgresql'
runtimeOnly 'org.springframework.boot:spring-boot-devtools'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.1'
testImplementation 'io.projectreactor:reactor-test:3.4.19'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
runtimeOnly 'com.h2database:h2'
}

tasks.named('test') {
useJUnitPlatform()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.kikawet.reactiveMediaServer.beans;

import java.util.concurrent.Executors;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Configuration
public class SchedulerBean {
@Bean
Scheduler jdbcScheduler(@Value("${spring.task.scheduling.pool.size}") int maxThreads) {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxThreads));
}
}
27 changes: 22 additions & 5 deletions src/main/java/com/kikawet/reactiveMediaServer/beans/StartUp.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,58 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import com.kikawet.reactiveMediaServer.model.User;
import com.kikawet.reactiveMediaServer.model.Video;
import com.kikawet.reactiveMediaServer.model.WatchedVideo;
import com.kikawet.reactiveMediaServer.repository.UserRepository;
import com.kikawet.reactiveMediaServer.repository.VideoRepository;
import com.kikawet.reactiveMediaServer.service.VideoService;

@Component
@Profile("!test")
public class StartUp {
// TODO: remove this class when the user repo is done and move everything to sql

@Autowired
Environment env;

@Autowired
private UserRepository users;

@Autowired
private VideoRepository videos;
private VideoService videos;

@PostConstruct
void setUp() {
User u = new User();
Video v = new Video(":D");
Video v = this.videos.createVideo(":D").block();

u.setLogin("tom");

u.setAvailableVideos(Set.of(v));

u.setHistory(new ArrayList<>(
Arrays.asList(
new WatchedVideo(u, v, LocalDateTime.now(), 17),
new WatchedVideo(u, v, LocalDateTime.now(), 69),
new WatchedVideo(u, v, LocalDateTime.now(), 33))));

this.users.put(u.getLogin(), u);
this.videos.put(v.getTitle(), v);
this.users.save(u);

User user = this.users.findByLogin(u.getLogin());
if (user == null) {
this.users.save(u);
}

System.out.println("Saved user " + user.getLogin());
System.out.println("Saved video " + v.getTitle());
}
}
31 changes: 26 additions & 5 deletions src/main/java/com/kikawet/reactiveMediaServer/model/User.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,54 @@
package com.kikawet.reactiveMediaServer.model;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import javax.persistence.Transient;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@RequiredArgsConstructor
@Entity
@Table(name = "USERS") // User is a keyword so can't create a table with that name
public class User {
@Id
@NonNull
String login;
List<Video> availableVideos = new ArrayList<>();

@OneToMany
Set<Video> availableVideos = new HashSet<>();

@Transient
List<WatchedVideo> history = new ArrayList<>();

public Stream<Video> getVideos() {
public Stream<Video> getAvailableVideos() {
return availableVideos.stream();
}

public Stream<WatchedVideo> getHistory() {
return history.stream();
}

public boolean appendHistory(Collection<WatchedVideo> history) {
return this.history.addAll(history);
public boolean addWatchVideo(WatchedVideo wv) {
if (!availableVideos.contains(wv.video))
return false;

return this.history.add(wv);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package com.kikawet.reactiveMediaServer.model;

import javax.persistence.Entity;
import javax.persistence.Id;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;

@AllArgsConstructor
@Getter
@Entity
@NoArgsConstructor
public class Video {
@Id
@NonNull
String title;
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
package com.kikawet.reactiveMediaServer.repository;

import java.util.HashMap;
import java.util.Map;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import com.kikawet.reactiveMediaServer.model.User;

@Repository
public class UserRepository {
private static final Map<String, User> users = new HashMap<>();

public User findById(String login) {
return users.get(login);
}

public void updateUser(User u) {
users.put(u.getLogin(), u);
}

public void put(String login, User u) {
users.put(login, u);
}
public interface UserRepository extends JpaRepository<User, Long>{
User findByLogin(String login);
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
package com.kikawet.reactiveMediaServer.repository;

import java.util.HashMap;
import java.util.Map;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import com.kikawet.reactiveMediaServer.model.Video;

@Repository
public class VideoRepository {
private static final Map<String, Video> videos = new HashMap<>();

public Video findVideoByTitle(String title) {
return videos.get(title);
}

public void put(String title, Video video) {
videos.put(title, video);
}
public interface VideoRepository extends JpaRepository<Video, Long> {
Video findByTitle(String title);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.web.reactive.function.server.ServerResponse.badRequest;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.web.reactive.function.server.ServerResponse.status;
import static reactor.core.publisher.Flux.fromStream;

import java.util.stream.Stream;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
Expand All @@ -22,13 +21,13 @@
import org.springframework.web.server.ResponseStatusException;

import com.kikawet.reactiveMediaServer.beans.PageableMapper;
import com.kikawet.reactiveMediaServer.dto.WatchedVideoDTO;
import com.kikawet.reactiveMediaServer.exception.ResourceNotFoundException;
import com.kikawet.reactiveMediaServer.exception.UnauthorizedUserException;
import com.kikawet.reactiveMediaServer.model.WatchedVideo;
import com.kikawet.reactiveMediaServer.service.UserService;
import com.kikawet.reactiveMediaServer.validation.WatcherVideoDTOValidationHandler;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Configuration
Expand All @@ -47,48 +46,60 @@ RouterFunction<ServerResponse> getHistoryByLogin() {
return route(GET("/user/{login}/history"), this::getHistoryByLoginHandler);
}

@Bean
RouterFunction<ServerResponse> getAvailableVideosByLogin() {
return route(GET("/user/{login}/videos"), this::getAvailableVideosByLoginHandler);
}

@Bean
RouterFunction<ServerResponse> updateHistoryByLogin() {
return route(POST("/user/{login}/watch"), this::updateHistoryByLoginHandler)
.and(route(POST("/user/{login}/history"), this::updateHistoryByLoginHandler));
}

public Mono<ServerResponse> updateHistoryByLoginHandler(ServerRequest serverRequest) {
public Mono<ServerResponse> updateHistoryByLoginHandler(final ServerRequest serverRequest) {
final String login = serverRequest.pathVariable("login");

return watcherValidationHandler.requireValidBodyList(serverRequest,
newWatches -> {
if (newWatches.isEmpty())
return ok().build();

try {
boolean success = users.updateHistoryByLogin(login, newWatches);
return users.updateHistoryByLogin(login, newWatches)
.flatMap(success -> {
if (success)
return status(HttpStatus.CREATED).build();
return badRequest().build();
})
.onErrorResume(UnauthorizedUserException.class,
e -> status(HttpStatus.UNAUTHORIZED).build())
.doOnError(ResourceNotFoundException.class, e -> {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage(), e);
});

if (success)
return status(HttpStatus.CREATED).build();
});
}

return badRequest().build();
public Mono<ServerResponse> getHistoryByLoginHandler(final ServerRequest serverRequest) {
final String login = serverRequest.pathVariable("login");
final Pageable page = pm.getPageable(serverRequest);

} catch (UnauthorizedUserException e) {
return status(HttpStatus.UNAUTHORIZED).build();
} catch (ResourceNotFoundException e) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage(), e);
}
});
return fluxToResponse(users.getHistoryByLogin(login, page)
.map(WatchedVideo::toDTO));
}

public Mono<ServerResponse> getHistoryByLoginHandler(ServerRequest serverRequest) {
public Mono<ServerResponse> getAvailableVideosByLoginHandler(final ServerRequest serverRequest) {
final String login = serverRequest.pathVariable("login");
final Pageable page = pm.getPageable(serverRequest);

try {
Stream<WatchedVideoDTO> history = users.getHistoryByLogin(login, page).map(WatchedVideo::toDTO);
return fluxToResponse(users.getAvailableVideosByLogin(login, page));
}

return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(fromStream(history), WatchedVideoDTO.class);
} catch (UnauthorizedUserException e) {
return status(HttpStatus.UNAUTHORIZED).build();
}
private Mono<ServerResponse> fluxToResponse(Flux<?> objects) {
return objects.collectList()
.flatMap(items -> ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(items), List.class))
.onErrorResume(UnauthorizedUserException.class, e -> status(HttpStatus.UNAUTHORIZED).build());
}
}
Loading