Skip to content

Commit

Permalink
Refactor Actor api
Browse files Browse the repository at this point in the history
Use new Actor api, based on
github.com/jbrodriguez/actor (1.0.0)
github.com/jbrodriguez/pubsub (1.0.0)

Related to #23
  • Loading branch information
jbrodriguez committed Apr 12, 2017
1 parent 7b7ecf1 commit 5a5475d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 143 deletions.
3 changes: 2 additions & 1 deletion server/Gomfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
gom 'github.com/namsral/flag', :commit => 'v1.7.4-alpha'
gom 'github.com/stretchr/testify', :tag => 'v1.0'
gom 'github.com/jbrodriguez/mlog', :commit => '006dc6db226a30dddd9e8eab3cce1e44f651ed8a'
gom 'github.com/jbrodriguez/pubsub', :commit => 'adb0f108803dfb17dc292f377492a3de391fa077'
gom 'github.com/jbrodriguez/pubsub', :tag => '1.0.0'
gom 'github.com/jbrodriguez/actor', :tag => '1.0.0'
gom 'github.com/mattn/go-sqlite3', :commit => 'eac1dfa2a61ebccaa117538a5bb12044f6700cd0'
gom 'github.com/jbrodriguez/go-tmdb', :commit => 'c1c9b4a6a2a857d1b86f9d2a163fc1a154b493ed'
gom 'github.com/nfnt/resize', :commit => '891127d8d1b52734debe1b3c3d7e747502b6c366'
Expand Down
23 changes: 9 additions & 14 deletions server/src/services/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package services

import (
"github.com/jbrodriguez/actor"
"github.com/jbrodriguez/mlog"
"github.com/jbrodriguez/pubsub"
// "io/ioutil"
Expand All @@ -15,45 +16,39 @@ import (

// Cache -
type Cache struct {
Service

bus *pubsub.PubSub
settings *lib.Settings
pool *lib.Pool

mailbox chan *pubsub.Mailbox
actor *actor.Actor
}

// NewCache -
func NewCache(bus *pubsub.PubSub, settings *lib.Settings) *Cache {
cache := &Cache{bus: bus, settings: settings}
cache.init()
cache := &Cache{
bus: bus,
settings: settings,
actor: actor.NewActor(bus),
}
return cache
}

// Start -
func (c *Cache) Start() {
mlog.Info("Starting service Cache ...")

c.mailbox = c.register(c.bus, "/command/movie/cache", c.cacheMovie)
c.actor.Register("/command/movie/cache", c.cacheMovie)

c.pool = lib.NewPool(4, 2000)

go c.react()
go c.actor.React()
}

// Stop -
func (c *Cache) Stop() {
mlog.Info("Stopped service Cache")
}

func (c *Cache) react() {
for mbox := range c.mailbox {
// mlog.Info("Scraper:Topic: %s", mbox.Topic)
c.dispatch(mbox.Topic, mbox.Content)
}
}

func (c *Cache) cacheMovie(msg *pubsub.Message) {
dto := msg.Payload.(*dto.Scrape)
movie := dto.Movie.(*model.Movie)
Expand Down
49 changes: 22 additions & 27 deletions server/src/services/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/jbrodriguez/actor"
"github.com/jbrodriguez/mlog"
"github.com/jbrodriguez/pubsub"
"github.com/micro/go-micro/client"
Expand All @@ -23,43 +24,44 @@ import (

// Core -
type Core struct {
Service

bus *pubsub.PubSub
settings *lib.Settings

mailbox chan *pubsub.Mailbox
re *regexp.Regexp
maps map[string]bool
actor *actor.Actor
re *regexp.Regexp
maps map[string]bool

wg sync.WaitGroup
}

// NewCore -
func NewCore(bus *pubsub.PubSub, settings *lib.Settings) *Core {
core := &Core{bus: bus, settings: settings}
core.init()
core := &Core{
bus: bus,
settings: settings,
actor: actor.NewActor(bus),
}
return core
}

// Start -
func (c *Core) Start() {
mlog.Info("Starting service Core ...")

c.mailbox = c.register(c.bus, "/get/config", c.getConfig)
c.registerAdditional(c.bus, "/post/import", c.importMovies, c.mailbox)
c.registerAdditional(c.bus, "/post/prune", c.pruneMovies, c.mailbox)
c.registerAdditional(c.bus, "/put/config/folder", c.addMediaFolder, c.mailbox)
c.registerAdditional(c.bus, "/put/movies/fix", c.fixMovie, c.mailbox)
c.actor.Register("/get/config", c.getConfig)
c.actor.Register("/post/import", c.importMovies)
c.actor.Register("/post/prune", c.pruneMovies)
c.actor.Register("/put/config/folder", c.addMediaFolder)
c.actor.Register("/put/movies/fix", c.fixMovie)

c.registerAdditional(c.bus, "/event/movie/found", c.doMovieFound, c.mailbox)
c.registerAdditional(c.bus, "/event/movie/tmdbnotfound", c.doMovieTmdbNotFound, c.mailbox)
c.registerAdditional(c.bus, "/event/movie/scraped", c.doMovieScraped, c.mailbox)
c.registerAdditional(c.bus, "/event/movie/rescraped", c.doMovieReScraped, c.mailbox)
// c.registerAdditional(c.bus, "/event/movie/updated", c.doMovieUpdated, c.mailbox)
// c.registerAdditional(c.bus, "/event/movie/cached/forced", c.doMovieCachedForced, c.mailbox)
c.actor.Register("/event/movie/found", c.doMovieFound)
c.actor.Register("/event/movie/tmdbnotfound", c.doMovieTmdbNotFound)
c.actor.Register("/event/movie/scraped", c.doMovieScraped)
c.actor.Register("/event/movie/rescraped", c.doMovieReScraped)
// c.actor.Register("/event/movie/updated", c.doMovieUpdated)
// c.actor.Register("/event/movie/cached/forced", c.doMovieCachedForced)

c.registerAdditional(c.bus, "/event/workunit/done", c.doWorkUnitDone, c.mailbox)
c.actor.Register("/event/workunit/done", c.doWorkUnitDone)

c.re = regexp.MustCompile(`(?i)/Volumes/(.*?)/.*`)
c.maps = make(map[string]bool)
Expand All @@ -72,21 +74,14 @@ func (c *Core) Start() {
}
}

go c.react()
go c.actor.React()
}

// Stop -
func (c *Core) Stop() {
mlog.Info("Stopped service Core ...")
}

func (c *Core) react() {
for mbox := range c.mailbox {
// mlog.Info("Core:Topic: %s", mbox.Topic)
c.dispatch(mbox.Topic, mbox.Content)
}
}

func (c *Core) getConfig(msg *pubsub.Message) {
msg.Reply <- &c.settings.Config
mlog.Info("Sent config")
Expand Down
45 changes: 20 additions & 25 deletions server/src/services/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/jbrodriguez/actor"
"github.com/jbrodriguez/mlog"
"github.com/jbrodriguez/pubsub"
_ "github.com/mattn/go-sqlite3" // sqlite3 doesn't need to be named
Expand All @@ -20,12 +21,10 @@ import (

// Dal -
type Dal struct {
Service

bus *pubsub.PubSub
settings *lib.Settings

mailbox chan *pubsub.Mailbox
actor *actor.Actor

db *sql.DB
// dbase string
Expand All @@ -44,8 +43,11 @@ type Dal struct {

// NewDal -
func NewDal(bus *pubsub.PubSub, settings *lib.Settings) *Dal {
dal := &Dal{bus: bus, settings: settings}
dal.init()
dal := &Dal{
bus: bus,
settings: settings,
actor: actor.NewActor(bus),
}
return dal
}

Expand All @@ -61,38 +63,31 @@ func (d *Dal) Start() {
mlog.Fatalf("Unable to open database (%s): %s", dbPath, err)
}

d.mailbox = d.register(d.bus, "/get/movies/cover", d.getCover)
d.registerAdditional(d.bus, "/get/movies", d.getMovies, d.mailbox)
d.registerAdditional(d.bus, "/get/movies/duplicates", d.getDuplicates, d.mailbox)
d.registerAdditional(d.bus, "/get/movie", d.getMovie, d.mailbox)
d.registerAdditional(d.bus, "/command/movie/exists", d.checkExists, d.mailbox)
d.registerAdditional(d.bus, "/command/movie/store", d.storeMovie, d.mailbox)
d.registerAdditional(d.bus, "/command/movie/partialstore", d.partialStoreMovie, d.mailbox)
d.registerAdditional(d.bus, "/command/movie/update", d.updateMovie, d.mailbox)
d.registerAdditional(d.bus, "/command/movie/delete", d.deleteMovie, d.mailbox)
d.registerAdditional(d.bus, "/put/movies/score", d.setScore, d.mailbox)
d.registerAdditional(d.bus, "/put/movies/watched", d.setWatched, d.mailbox)
d.registerAdditional(d.bus, "/put/movies/duplicate", d.setDuplicate, d.mailbox)
d.actor.Register("/get/movies/cover", d.getCover)
d.actor.Register("/get/movies", d.getMovies)
d.actor.Register("/get/movies/duplicates", d.getDuplicates)
d.actor.Register("/get/movie", d.getMovie)
d.actor.Register("/command/movie/exists", d.checkExists)
d.actor.Register("/command/movie/store", d.storeMovie)
d.actor.Register("/command/movie/partialstore", d.partialStoreMovie)
d.actor.Register("/command/movie/update", d.updateMovie)
d.actor.Register("/command/movie/delete", d.deleteMovie)
d.actor.Register("/put/movies/score", d.setScore)
d.actor.Register("/put/movies/watched", d.setWatched)
d.actor.Register("/put/movies/duplicate", d.setDuplicate)

d.countRows = d.prepare("select count(*) from movie;")

mlog.Info("Connected to database %s", dbPath)

go d.react()
go d.actor.React()
}

// Stop -
func (d *Dal) Stop() {
mlog.Info("Stopped service Dal ...")
}

func (d *Dal) react() {
for mbox := range d.mailbox {
// mlog.Info("DAL:Topic: %s", mbox.Topic)
d.dispatch(mbox.Topic, mbox.Content)
}
}

func (d *Dal) getCover(msg *pubsub.Message) {
options := lib.Options{
Limit: 60,
Expand Down
25 changes: 10 additions & 15 deletions server/src/services/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,37 @@ import (
"github.com/jbrodriguez/pubsub"
"github.com/micro/go-micro/client"

"github.com/jbrodriguez/actor"
"golang.org/x/net/context"
)

// Scanner -
type Scanner struct {
Service

bus *pubsub.PubSub
settings *lib.Settings

mailbox chan *pubsub.Mailbox
actor *actor.Actor

re []*lib.Rexp
includedMask string
}

// NewScanner -
func NewScanner(bus *pubsub.PubSub, settings *lib.Settings) *Scanner {
scanner := &Scanner{bus: bus, settings: settings}
scanner.init()
scanner := &Scanner{
bus: bus,
settings: settings,
actor: actor.NewActor(bus),
}
return scanner
}

// Start -
func (s *Scanner) Start() {
mlog.Info("Starting service Scanner ...")

s.mailbox = s.register(s.bus, "/command/movie/scan", s.scanMovies)
s.registerAdditional(s.bus, "/event/config/changed", s.configChanged, s.mailbox)
s.actor.Register("/command/movie/scan", s.scanMovies)
s.actor.Register("/event/config/changed", s.configChanged)

re := []string{
`(?i)(.*)/(?P<Resolution>.*?)/(?P<Name>.*?)\s\((?P<Year>\d\d\d\d)\)/(?:.*/)*bdmv/index.(?P<FileType>bdmv)$`,
Expand All @@ -64,21 +66,14 @@ func (s *Scanner) Start() {

// cmd.Init()

go s.react()
go s.actor.React()
}

// Stop -
func (s *Scanner) Stop() {
mlog.Info("Stopped service Scanner ...")
}

func (s *Scanner) react() {
for mbox := range s.mailbox {
// mlog.Info("Scanner:Topic: %s", mbox.Topic)
s.dispatch(mbox.Topic, mbox.Content)
}
}

func (s *Scanner) scanMovies(msg *pubsub.Message) {
defer s.bus.Pub(nil, "/event/workunit/done")

Expand Down
28 changes: 12 additions & 16 deletions server/src/services/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,27 @@ import (
"strconv"
"strings"
"time"

"github.com/jbrodriguez/actor"
)

// Scraper -
type Scraper struct {
Service

bus *pubsub.PubSub
settings *lib.Settings
pool *lib.Pool
tmdb *tmdb.Tmdb

mailbox chan *pubsub.Mailbox
actor *actor.Actor
}

// NewScraper -
func NewScraper(bus *pubsub.PubSub, settings *lib.Settings) *Scraper {
scraper := &Scraper{bus: bus, settings: settings}
scraper.init()
scraper := &Scraper{
bus: bus,
settings: settings,
actor: actor.NewActor(bus),
}
return scraper
}

Expand All @@ -43,27 +46,20 @@ func (s *Scraper) Start() {
mlog.Fatalf("Unable to create tmdb client: %s", err)
}

s.mailbox = s.register(s.bus, "/command/movie/scrape", s.scrapeMovie)
s.registerAdditional(s.bus, "/command/movie/rescrape", s.reScrapeMovie, s.mailbox)
s.registerAdditional(s.bus, "/event/config/changed", s.configChanged, s.mailbox)
s.actor.Register("/command/movie/scrape", s.scrapeMovie)
s.actor.Register("/command/movie/rescrape", s.reScrapeMovie)
s.actor.Register("/event/config/changed", s.configChanged)

s.pool = lib.NewPool(12, 4000)

go s.react()
go s.actor.React()
}

// Stop -
func (s *Scraper) Stop() {
mlog.Info("Stopped service Scraper ...")
}

func (s *Scraper) react() {
for mbox := range s.mailbox {
// mlog.Info("Scraper:Topic: %s", mbox.Topic)
s.dispatch(mbox.Topic, mbox.Content)
}
}

func (s *Scraper) scrapeMovie(msg *pubsub.Message) {
movie := msg.Payload.(*model.Movie)

Expand Down
Loading

0 comments on commit 5a5475d

Please sign in to comment.