Skip to content

Commit

Permalink
Add etcd service discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
labkode committed Jan 22, 2017
1 parent 7ba6a52 commit ed98755
Show file tree
Hide file tree
Showing 12 changed files with 902 additions and 374 deletions.
102 changes: 92 additions & 10 deletions cmd/clawiod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/clawio/clawiod/root/corsmiddleware"
"github.com/clawio/clawiod/root/datawebservice"
"github.com/clawio/clawiod/root/datawebserviceclient"
"github.com/clawio/clawiod/root/etcdregistrydriver"
"github.com/clawio/clawiod/root/fileconfigurationsource"
"github.com/clawio/clawiod/root/fsdatadriver"
"github.com/clawio/clawiod/root/fsmdatadriver"
Expand All @@ -38,6 +39,8 @@ import (
"io/ioutil"
"net/http"
"os"
"runtime"
"strconv"
"strings"
)

Expand Down Expand Up @@ -70,6 +73,14 @@ func main() {
}

mainLogger := logger.With("pkg", "main")

// Set CPU capacity
err = setCPU(config.GetCPU())
if err != nil {
mainLogger.Crit().Log("msg", "error tweaking cpu", "error", err)
os.Exit(1)
}

server, err := newServer(config)
if err != nil {
mainLogger.Error().Log("error", err)
Expand All @@ -82,8 +93,20 @@ func main() {
os.Exit(1)
}

mainLogger.Info().Log("msg", "server is listening", "port", config.GetPort(), "url", fmt.Sprintf("http://%s:%d", hostname, config.GetPort()))
mainLogger.Error().Log("error", http.ListenAndServe(fmt.Sprintf(":%d", config.GetPort()), server))
addr := fmt.Sprintf(":%d", config.GetPort())
if config.IsTLSEnabled() {
mainLogger.Info().Log("msg", "serving secure client requests", "addr", fmt.Sprintf("https://%s:%d", hostname, config.GetPort()))
mainLogger.Error().Log("error", http.ListenAndServeTLS(
addr,
config.GetTLSCertificate(),
config.GetTLSPrivateKey(),
server))
} else {
mainLogger.Warn().Log("msg", "serving insecure client requests", "addr", fmt.Sprintf("http://%s:%d", hostname, config.GetPort()))
mainLogger.Error().Log("error", http.ListenAndServe(
addr,
server))
}
}

func getUserDriver(config root.Configuration) (root.UserDriver, error) {
Expand Down Expand Up @@ -149,7 +172,8 @@ func getDataDriver(config root.Configuration) (root.DataDriver, error) {
}
return ocfsdatadriver.New(logger,
config.GetOCFSDataDriverDataFolder(),
config.GetOCFSDataDriverDataFolder(),
config.GetOCFSDataDriverTemporaryFolder(),
config.GetOCFSDataDriverChunksFolder(),
config.GetOCFSDataDriverChecksum(),
config.GetOCFSDataDriverVerifyClientChecksum(),
metaDataDriver)
Expand Down Expand Up @@ -315,7 +339,8 @@ func getAuthenticationWebService(config root.Configuration) (root.WebService, er
userDriver,
tokenDriver,
authenticationMiddleware,
webErrorConverter), nil
webErrorConverter,
false), nil
case "proxied":
logger, err := getLogger(config)
if err != nil {
Expand Down Expand Up @@ -449,8 +474,7 @@ func getOCWebService(config root.Configuration) (root.WebService, error) {
basicAuthMiddleware,
webErrorConverter,
mimeGuesser,
config.GetOCWebServiceMaxUploadFileSize(),
config.GetOCWebServiceChunksFolder()), nil
config.GetOCWebServiceMaxUploadFileSize()), nil
case "proxied":
logger, err := getLogger(config)
if err != nil {
Expand Down Expand Up @@ -478,17 +502,20 @@ func getOCWebService(config root.Configuration) (root.WebService, error) {
if err != nil {
return nil, err
}
dataWebServiceClient := datawebserviceclient.New(logger, cm, config.GetRemoteOCWebServiceDataURL())
metaDataWebServiceClient := metadatawebserviceclient.New(logger, cm, config.GetRemoteOCWebServiceMetaDataURL())
registryDriver, err := getRegistryDriver(config)
if err != nil {
return nil, err
}
dataWebServiceClient := datawebserviceclient.New(logger, cm, registryDriver)
metaDataWebServiceClient := metadatawebserviceclient.New(logger, cm, registryDriver)
return remoteocwebservice.New(cm,
logger,
dataWebServiceClient,
metaDataWebServiceClient,
basicAuthMiddleware,
webErrorConverter,
mimeGuesser,
config.GetRemoteOCWebServiceMaxUploadFileSize(),
config.GetRemoteOCWebServiceChunksFolder()), nil
config.GetRemoteOCWebServiceMaxUploadFileSize()), nil
default:
return nil, errors.New("configured oc webservice does not exist")

Expand Down Expand Up @@ -521,6 +548,26 @@ func getConfigurationSource(source string) (root.ConfigurationSource, error) {

}

func getRegistryDriver(config root.Configuration) (root.RegistryDriver, error) {
logger, err := getLogger(config)
if err != nil {
return nil, err
}
logger = logger.With("pkg", "etcdregistrydriver")

switch config.GetRegistryDriver() {
case "etcd":
return etcdregistrydriver.New(
logger,
config.GetETCDRegistryDriverUrls(),
config.GetETCDRegistryDriverKey(),
config.GetETCDRegistryDriverUsername(),
config.GetETCDRegistryDriverPassword())
default:
return nil, fmt.Errorf("registry driver does not exist")
}
}

func getWebErrorConverter(config root.Configuration) (root.WebErrorConverter, error) {
return weberrorconverter.New(), nil
}
Expand Down Expand Up @@ -583,3 +630,38 @@ func getWebServices(config root.Configuration) (map[string]root.WebService, erro
}
return webServices, nil
}

// setCPU parses string cpu and sets GOMAXPROCS
// according to its value. It accepts either
// a number (e.g. 3) or a percent (e.g. 50%).
func setCPU(cpu string) error {
var numCPU int

availCPU := runtime.NumCPU()

if strings.HasSuffix(cpu, "%") {
// Percent
var percent float32
pctStr := cpu[:len(cpu)-1]
pctInt, err := strconv.Atoi(pctStr)
if err != nil || pctInt < 1 || pctInt > 100 {
return errors.New("invalid CPU value: percentage must be between 1-100")
}
percent = float32(pctInt) / 100
numCPU = int(float32(availCPU) * percent)
} else {
// Number
num, err := strconv.Atoi(cpu)
if err != nil || num < 1 {
return errors.New("invalid CPU value: provide a number or percent greater than 0")
}
numCPU = num
}

if numCPU > availCPU {
numCPU = availCPU
}

runtime.GOMAXPROCS(numCPU)
return nil
}
103 changes: 96 additions & 7 deletions cmd/clawiod/server.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,94 @@
package main

import (
"context"
"fmt"
"github.com/clawio/clawiod/root"
"github.com/go-kit/kit/log/levels"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"io"
"net/http"
"os"
"time"
)

type server struct {
logger levels.Levels
router http.Handler
config root.Configuration
httpLogger io.Writer
logger levels.Levels
router http.Handler
config root.Configuration
httpLogger io.Writer
registryDriver root.RegistryDriver
webServices map[string]root.WebService
}

func newServer(config root.Configuration) (*server, error) {
logger, err := getLogger(config)
if err != nil {
return nil, err
}
s := &server{logger: logger, config: config}
registryDriver, err := getRegistryDriver(config)
if err != nil {
return nil, err
}
s := &server{logger: logger, config: config, registryDriver: registryDriver}
err = s.configureRouter()
if err != nil {
return nil, err
}
// do the register in other routine repeatedly to avoid the node
// being removed by the TTL constraint
go func() {
err = s.registerNode()
if err != nil {
s.logger.Error().Log("error", "error registering node")
}
for range time.Tick(time.Second * 5) {
s.logger.Info().Log("msg", "keep alive is issued every 5 seconds: re-registering node")
err = s.registerNode()
if err != nil {
s.logger.Error().Log("error", "error registering node")
}
}
}()

return s, nil
}

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handlers.CombinedLoggingHandler(s.httpLogger, s.router).ServeHTTP(w, r)
}

func (s *server) registerNode() error {
hostname, err := os.Hostname()
if err != nil {
s.logger.Error().Log("error", err)
return err
}
for key := range s.webServices {
rol := key + "-node"
url := fmt.Sprintf("%s:%d", hostname, s.config.GetPort())
if s.config.IsTLSEnabled() {
url = fmt.Sprintf("https://%s", url)
} else {
url = fmt.Sprintf("http://%s", url)
}
node := &node{
xhost: fmt.Sprintf("%s:%d", hostname, s.config.GetPort()),
xid: fmt.Sprintf("%s:%d", hostname, s.config.GetPort()),
xrol: rol,
xurl: url,
xversion: "TODO"}
err := s.registryDriver.Register(context.Background(), node)
if err != nil {
s.logger.Error().Log("error", err)
return err
}
}
return nil
}

func (s *server) configureRouter() error {
config := s.config

Expand Down Expand Up @@ -62,6 +117,7 @@ func (s *server) configureRouter() error {
return err
}
s.logger.Info().Log("msg", "web services enabled", "webservices", config.GetEnabledWebServices())
s.webServices = webServices

router := mux.NewRouter()
router.Handle("/metrics", prometheus.Handler()).Methods("GET")
Expand All @@ -76,14 +132,23 @@ func (s *server) configureRouter() error {
if config.IsCORSMiddlewareEnabled() {
handler = handlerFunc
handler = corsMiddleware.Handler(handler)
router.Handle(path, handler).Methods(method)
if method == "*" {
router.Handle(path, handler)
} else {
router.Handle(path, handler).Methods(method)
}

prometheus.InstrumentHandler(path, handler)
s.logger.Info().Log("method", method, "endpoint", path, "msg", "endpoint available")
router.Handle(path, handler).Methods("OPTIONS")
s.logger.Info().Log("method", "OPTIONS", "endpoint", path, "msg", "endpoint available - created by corsmiddleware")
} else {
handler = handlerFunc
router.Handle(path, handler).Methods(method)
if method == "*" {
router.Handle(path, handler)
} else {
router.Handle(path, handler).Methods(method)
}
prometheus.InstrumentHandler(path, handler)
s.logger.Info().Log("method", method, "endpoint", path, "msg", "endpoint available")
}
Expand All @@ -93,3 +158,27 @@ func (s *server) configureRouter() error {
s.router = router
return nil
}

type node struct {
xid string
xrol string
xhost string
xversion string
xurl string
}

func (n *node) ID() string {
return n.xid
}
func (n *node) Rol() string {
return n.xrol
}
func (n *node) Host() string {
return n.xhost
}
func (n *node) Version() string {
return n.xversion
}
func (n *node) URL() string {
return n.xurl
}
Loading

0 comments on commit ed98755

Please sign in to comment.