diff --git a/server/Gomfile b/server/Gomfile index 890eb14..f6e83a9 100644 --- a/server/Gomfile +++ b/server/Gomfile @@ -1,7 +1,8 @@ gom 'github.com/namsral/flag', :commit => 'v1.7.4-alpha' gom 'github.com/stretchr/testify', :tag => 'v1.0' gom 'github.com/jbrodriguez/mlog', :commit => '006dc6db226a30dddd9e8eab3cce1e44f651ed8a' -gom 'github.com/jbrodriguez/pubsub', :commit => 'adb0f108803dfb17dc292f377492a3de391fa077' +gom 'github.com/jbrodriguez/pubsub', :tag => '1.0.0' +gom 'github.com/jbrodriguez/actor', :tag => '1.0.0' gom 'github.com/mattn/go-sqlite3', :commit => 'eac1dfa2a61ebccaa117538a5bb12044f6700cd0' gom 'github.com/jbrodriguez/go-tmdb', :commit => 'c1c9b4a6a2a857d1b86f9d2a163fc1a154b493ed' gom 'github.com/nfnt/resize', :commit => '891127d8d1b52734debe1b3c3d7e747502b6c366' diff --git a/server/src/services/cache.go b/server/src/services/cache.go index 682b1e9..578a73f 100644 --- a/server/src/services/cache.go +++ b/server/src/services/cache.go @@ -1,6 +1,7 @@ package services import ( + "github.com/jbrodriguez/actor" "github.com/jbrodriguez/mlog" "github.com/jbrodriguez/pubsub" // "io/ioutil" @@ -15,19 +16,20 @@ import ( // Cache - type Cache struct { - Service - bus *pubsub.PubSub settings *lib.Settings pool *lib.Pool - mailbox chan *pubsub.Mailbox + actor *actor.Actor } // NewCache - func NewCache(bus *pubsub.PubSub, settings *lib.Settings) *Cache { - cache := &Cache{bus: bus, settings: settings} - cache.init() + cache := &Cache{ + bus: bus, + settings: settings, + actor: actor.NewActor(bus), + } return cache } @@ -35,11 +37,11 @@ func NewCache(bus *pubsub.PubSub, settings *lib.Settings) *Cache { func (c *Cache) Start() { mlog.Info("Starting service Cache ...") - c.mailbox = c.register(c.bus, "/command/movie/cache", c.cacheMovie) + c.actor.Register("/command/movie/cache", c.cacheMovie) c.pool = lib.NewPool(4, 2000) - go c.react() + go c.actor.React() } // Stop - @@ -47,13 +49,6 @@ func (c *Cache) Stop() { mlog.Info("Stopped service Cache") } -func (c *Cache) react() { - for mbox := range c.mailbox { - // mlog.Info("Scraper:Topic: %s", mbox.Topic) - c.dispatch(mbox.Topic, mbox.Content) - } -} - func (c *Cache) cacheMovie(msg *pubsub.Message) { dto := msg.Payload.(*dto.Scrape) movie := dto.Movie.(*model.Movie) diff --git a/server/src/services/core.go b/server/src/services/core.go index 50db9b9..a5efa8c 100644 --- a/server/src/services/core.go +++ b/server/src/services/core.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/jbrodriguez/actor" "github.com/jbrodriguez/mlog" "github.com/jbrodriguez/pubsub" "github.com/micro/go-micro/client" @@ -23,22 +24,23 @@ import ( // Core - type Core struct { - Service - bus *pubsub.PubSub settings *lib.Settings - mailbox chan *pubsub.Mailbox - re *regexp.Regexp - maps map[string]bool + actor *actor.Actor + re *regexp.Regexp + maps map[string]bool wg sync.WaitGroup } // NewCore - func NewCore(bus *pubsub.PubSub, settings *lib.Settings) *Core { - core := &Core{bus: bus, settings: settings} - core.init() + core := &Core{ + bus: bus, + settings: settings, + actor: actor.NewActor(bus), + } return core } @@ -46,20 +48,20 @@ func NewCore(bus *pubsub.PubSub, settings *lib.Settings) *Core { func (c *Core) Start() { mlog.Info("Starting service Core ...") - c.mailbox = c.register(c.bus, "/get/config", c.getConfig) - c.registerAdditional(c.bus, "/post/import", c.importMovies, c.mailbox) - c.registerAdditional(c.bus, "/post/prune", c.pruneMovies, c.mailbox) - c.registerAdditional(c.bus, "/put/config/folder", c.addMediaFolder, c.mailbox) - c.registerAdditional(c.bus, "/put/movies/fix", c.fixMovie, c.mailbox) + c.actor.Register("/get/config", c.getConfig) + c.actor.Register("/post/import", c.importMovies) + c.actor.Register("/post/prune", c.pruneMovies) + c.actor.Register("/put/config/folder", c.addMediaFolder) + c.actor.Register("/put/movies/fix", c.fixMovie) - c.registerAdditional(c.bus, "/event/movie/found", c.doMovieFound, c.mailbox) - c.registerAdditional(c.bus, "/event/movie/tmdbnotfound", c.doMovieTmdbNotFound, c.mailbox) - c.registerAdditional(c.bus, "/event/movie/scraped", c.doMovieScraped, c.mailbox) - c.registerAdditional(c.bus, "/event/movie/rescraped", c.doMovieReScraped, c.mailbox) - // c.registerAdditional(c.bus, "/event/movie/updated", c.doMovieUpdated, c.mailbox) - // c.registerAdditional(c.bus, "/event/movie/cached/forced", c.doMovieCachedForced, c.mailbox) + c.actor.Register("/event/movie/found", c.doMovieFound) + c.actor.Register("/event/movie/tmdbnotfound", c.doMovieTmdbNotFound) + c.actor.Register("/event/movie/scraped", c.doMovieScraped) + c.actor.Register("/event/movie/rescraped", c.doMovieReScraped) + // c.actor.Register("/event/movie/updated", c.doMovieUpdated) + // c.actor.Register("/event/movie/cached/forced", c.doMovieCachedForced) - c.registerAdditional(c.bus, "/event/workunit/done", c.doWorkUnitDone, c.mailbox) + c.actor.Register("/event/workunit/done", c.doWorkUnitDone) c.re = regexp.MustCompile(`(?i)/Volumes/(.*?)/.*`) c.maps = make(map[string]bool) @@ -72,7 +74,7 @@ func (c *Core) Start() { } } - go c.react() + go c.actor.React() } // Stop - @@ -80,13 +82,6 @@ func (c *Core) Stop() { mlog.Info("Stopped service Core ...") } -func (c *Core) react() { - for mbox := range c.mailbox { - // mlog.Info("Core:Topic: %s", mbox.Topic) - c.dispatch(mbox.Topic, mbox.Content) - } -} - func (c *Core) getConfig(msg *pubsub.Message) { msg.Reply <- &c.settings.Config mlog.Info("Sent config") diff --git a/server/src/services/dal.go b/server/src/services/dal.go index 280aaf5..1d5f4d9 100644 --- a/server/src/services/dal.go +++ b/server/src/services/dal.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/jbrodriguez/actor" "github.com/jbrodriguez/mlog" "github.com/jbrodriguez/pubsub" _ "github.com/mattn/go-sqlite3" // sqlite3 doesn't need to be named @@ -20,12 +21,10 @@ import ( // Dal - type Dal struct { - Service - bus *pubsub.PubSub settings *lib.Settings - mailbox chan *pubsub.Mailbox + actor *actor.Actor db *sql.DB // dbase string @@ -44,8 +43,11 @@ type Dal struct { // NewDal - func NewDal(bus *pubsub.PubSub, settings *lib.Settings) *Dal { - dal := &Dal{bus: bus, settings: settings} - dal.init() + dal := &Dal{ + bus: bus, + settings: settings, + actor: actor.NewActor(bus), + } return dal } @@ -61,24 +63,24 @@ func (d *Dal) Start() { mlog.Fatalf("Unable to open database (%s): %s", dbPath, err) } - d.mailbox = d.register(d.bus, "/get/movies/cover", d.getCover) - d.registerAdditional(d.bus, "/get/movies", d.getMovies, d.mailbox) - d.registerAdditional(d.bus, "/get/movies/duplicates", d.getDuplicates, d.mailbox) - d.registerAdditional(d.bus, "/get/movie", d.getMovie, d.mailbox) - d.registerAdditional(d.bus, "/command/movie/exists", d.checkExists, d.mailbox) - d.registerAdditional(d.bus, "/command/movie/store", d.storeMovie, d.mailbox) - d.registerAdditional(d.bus, "/command/movie/partialstore", d.partialStoreMovie, d.mailbox) - d.registerAdditional(d.bus, "/command/movie/update", d.updateMovie, d.mailbox) - d.registerAdditional(d.bus, "/command/movie/delete", d.deleteMovie, d.mailbox) - d.registerAdditional(d.bus, "/put/movies/score", d.setScore, d.mailbox) - d.registerAdditional(d.bus, "/put/movies/watched", d.setWatched, d.mailbox) - d.registerAdditional(d.bus, "/put/movies/duplicate", d.setDuplicate, d.mailbox) + d.actor.Register("/get/movies/cover", d.getCover) + d.actor.Register("/get/movies", d.getMovies) + d.actor.Register("/get/movies/duplicates", d.getDuplicates) + d.actor.Register("/get/movie", d.getMovie) + d.actor.Register("/command/movie/exists", d.checkExists) + d.actor.Register("/command/movie/store", d.storeMovie) + d.actor.Register("/command/movie/partialstore", d.partialStoreMovie) + d.actor.Register("/command/movie/update", d.updateMovie) + d.actor.Register("/command/movie/delete", d.deleteMovie) + d.actor.Register("/put/movies/score", d.setScore) + d.actor.Register("/put/movies/watched", d.setWatched) + d.actor.Register("/put/movies/duplicate", d.setDuplicate) d.countRows = d.prepare("select count(*) from movie;") mlog.Info("Connected to database %s", dbPath) - go d.react() + go d.actor.React() } // Stop - @@ -86,13 +88,6 @@ func (d *Dal) Stop() { mlog.Info("Stopped service Dal ...") } -func (d *Dal) react() { - for mbox := range d.mailbox { - // mlog.Info("DAL:Topic: %s", mbox.Topic) - d.dispatch(mbox.Topic, mbox.Content) - } -} - func (d *Dal) getCover(msg *pubsub.Message) { options := lib.Options{ Limit: 60, diff --git a/server/src/services/scanner.go b/server/src/services/scanner.go index 4eb8ea3..4e4b157 100644 --- a/server/src/services/scanner.go +++ b/server/src/services/scanner.go @@ -15,17 +15,16 @@ import ( "github.com/jbrodriguez/pubsub" "github.com/micro/go-micro/client" + "github.com/jbrodriguez/actor" "golang.org/x/net/context" ) // Scanner - type Scanner struct { - Service - bus *pubsub.PubSub settings *lib.Settings - mailbox chan *pubsub.Mailbox + actor *actor.Actor re []*lib.Rexp includedMask string @@ -33,8 +32,11 @@ type Scanner struct { // NewScanner - func NewScanner(bus *pubsub.PubSub, settings *lib.Settings) *Scanner { - scanner := &Scanner{bus: bus, settings: settings} - scanner.init() + scanner := &Scanner{ + bus: bus, + settings: settings, + actor: actor.NewActor(bus), + } return scanner } @@ -42,8 +44,8 @@ func NewScanner(bus *pubsub.PubSub, settings *lib.Settings) *Scanner { func (s *Scanner) Start() { mlog.Info("Starting service Scanner ...") - s.mailbox = s.register(s.bus, "/command/movie/scan", s.scanMovies) - s.registerAdditional(s.bus, "/event/config/changed", s.configChanged, s.mailbox) + s.actor.Register("/command/movie/scan", s.scanMovies) + s.actor.Register("/event/config/changed", s.configChanged) re := []string{ `(?i)(.*)/(?P.*?)/(?P.*?)\s\((?P\d\d\d\d)\)/(?:.*/)*bdmv/index.(?Pbdmv)$`, @@ -64,7 +66,7 @@ func (s *Scanner) Start() { // cmd.Init() - go s.react() + go s.actor.React() } // Stop - @@ -72,13 +74,6 @@ func (s *Scanner) Stop() { mlog.Info("Stopped service Scanner ...") } -func (s *Scanner) react() { - for mbox := range s.mailbox { - // mlog.Info("Scanner:Topic: %s", mbox.Topic) - s.dispatch(mbox.Topic, mbox.Content) - } -} - func (s *Scanner) scanMovies(msg *pubsub.Message) { defer s.bus.Pub(nil, "/event/workunit/done") diff --git a/server/src/services/scraper.go b/server/src/services/scraper.go index 5e17e3b..e0b4f88 100644 --- a/server/src/services/scraper.go +++ b/server/src/services/scraper.go @@ -12,24 +12,27 @@ import ( "strconv" "strings" "time" + + "github.com/jbrodriguez/actor" ) // Scraper - type Scraper struct { - Service - bus *pubsub.PubSub settings *lib.Settings pool *lib.Pool tmdb *tmdb.Tmdb - mailbox chan *pubsub.Mailbox + actor *actor.Actor } // NewScraper - func NewScraper(bus *pubsub.PubSub, settings *lib.Settings) *Scraper { - scraper := &Scraper{bus: bus, settings: settings} - scraper.init() + scraper := &Scraper{ + bus: bus, + settings: settings, + actor: actor.NewActor(bus), + } return scraper } @@ -43,13 +46,13 @@ func (s *Scraper) Start() { mlog.Fatalf("Unable to create tmdb client: %s", err) } - s.mailbox = s.register(s.bus, "/command/movie/scrape", s.scrapeMovie) - s.registerAdditional(s.bus, "/command/movie/rescrape", s.reScrapeMovie, s.mailbox) - s.registerAdditional(s.bus, "/event/config/changed", s.configChanged, s.mailbox) + s.actor.Register("/command/movie/scrape", s.scrapeMovie) + s.actor.Register("/command/movie/rescrape", s.reScrapeMovie) + s.actor.Register("/event/config/changed", s.configChanged) s.pool = lib.NewPool(12, 4000) - go s.react() + go s.actor.React() } // Stop - @@ -57,13 +60,6 @@ func (s *Scraper) Stop() { mlog.Info("Stopped service Scraper ...") } -func (s *Scraper) react() { - for mbox := range s.mailbox { - // mlog.Info("Scraper:Topic: %s", mbox.Topic) - s.dispatch(mbox.Topic, mbox.Content) - } -} - func (s *Scraper) scrapeMovie(msg *pubsub.Message) { movie := msg.Payload.(*model.Movie) diff --git a/server/src/services/server.go b/server/src/services/server.go index 20eb8a0..cc784ad 100644 --- a/server/src/services/server.go +++ b/server/src/services/server.go @@ -13,6 +13,7 @@ import ( "golang.org/x/net/websocket" + "github.com/jbrodriguez/actor" "github.com/jbrodriguez/mlog" "github.com/jbrodriguez/pubsub" "github.com/labstack/echo" @@ -27,12 +28,10 @@ const ( // Server - type Server struct { - Service - bus *pubsub.PubSub settings *lib.Settings router *echo.Echo - mailbox chan *pubsub.Mailbox + actor *actor.Actor pool map[*net.Connection]bool } @@ -42,9 +41,9 @@ func NewServer(bus *pubsub.PubSub, settings *lib.Settings) *Server { server := &Server{ bus: bus, settings: settings, + actor: actor.NewActor(bus), pool: make(map[*net.Connection]bool), } - server.init() return server } @@ -135,8 +134,8 @@ func (s *Server) Start() { port := ":7623" go s.router.Start(port) - s.mailbox = s.register(s.bus, "socket:broadcast", s.broadcast) - go s.react() + s.actor.Register("socket:broadcast", s.broadcast) + go s.actor.React() mlog.Info("Listening on %s", port) } @@ -146,13 +145,6 @@ func (s *Server) Stop() { mlog.Info("Stopped service Server ...") } -func (s *Server) react() { - for mbox := range s.mailbox { - // mlog.Info("Core:Topic: %s", mbox.Topic) - s.dispatch(mbox.Topic, mbox.Content) - } -} - // // Closer - // func Closer() gin.HandlerFunc { // return func(c echo.Context) { diff --git a/server/src/services/service.go b/server/src/services/service.go deleted file mode 100644 index 62df595..0000000 --- a/server/src/services/service.go +++ /dev/null @@ -1,32 +0,0 @@ -package services - -import ( - "github.com/jbrodriguez/pubsub" -) - -// MailboxHandler - -type MailboxHandler func(msg *pubsub.Message) - -// Service - -type Service struct { - registry map[string]MailboxHandler -} - -func (s *Service) init() { - s.registry = make(map[string]MailboxHandler) -} - -func (s *Service) register(bus *pubsub.PubSub, topic string, handler MailboxHandler) (mbox chan *pubsub.Mailbox) { - mbox = bus.Sub(topic) - s.registry[topic] = handler - return mbox -} - -func (s *Service) registerAdditional(bus *pubsub.PubSub, topic string, handler MailboxHandler, mb chan *pubsub.Mailbox) { - bus.AddSub(mb, topic) - s.registry[topic] = handler -} - -func (s *Service) dispatch(topic string, msg *pubsub.Message) { - s.registry[topic](msg) -}