Skip to content

Commit

Permalink
add context & waitgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
dezren39 committed Feb 25, 2024
1 parent dc9b239 commit ef78b9f
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 116 deletions.
279 changes: 197 additions & 82 deletions sources/identity/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/developing-today/code/src/identity/auth"
"github.com/developing-today/code/src/identity/configuration"
"github.com/developing-today/code/src/identity/observability"
"github.com/developing-today/code/src/identity/stream"
"github.com/developing-today/code/src/identity/web"
"github.com/knadh/koanf"
"github.com/muesli/reflow/wordwrap"
Expand All @@ -56,10 +57,9 @@ remote config (
nats ->
etc) (
dont do all this, just this is the direction eventually as things become available if)
// todo: dependency injection / remove globals and inits ???
*/

// todo: put these into configuration
var Separator = "."
var ConfigurationFilePath = "config.kdl"
var EmbeddedConfigurationFilePath = "embed/config.kdl"
Expand Down Expand Up @@ -87,8 +87,31 @@ func NewConfiguration() *configuration.IdentityServerConfiguration {
}
}

func StartCharmCmd(config *configuration.IdentityServerConfiguration) *cobra.Command {
result := charmcmd.ServeCmd
func WrappedCharmFromContext(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
cmd := charmcmd.RootCmd

go func() {
<-ctx.Done()

p, err := os.FindProcess(os.Getpid())
if err != nil {
log.Error("could not find process", "error", err)
return
}
if err := p.Signal(syscall.SIGINT); err != nil {
log.Error("could not send interrupt signal", "error", err)
}
}()

return cmd
}

func CharmCmd(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
return WrappedCharmFromContext(ctx, config)
}

func StartCharmCmd(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
result := WrappedCharmFromContext(ctx, config)
result.Use = "charm"
result.Aliases = []string{"ch", "c"}
return result
Expand All @@ -108,145 +131,237 @@ func LoadDefaultConfiguration() *configuration.IdentityServerConfiguration {
return config
}

func RootCmd(config *configuration.IdentityServerConfiguration) *cobra.Command {
func RootCmd(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
result := &cobra.Command{
Use: "identity",
Short: "publish your identity",
Long: `publish your identity and allow others to connect to you.`,
}
result.AddCommand(charmcmd.RootCmd, StartAllCmd(config))
result.AddCommand(charmcmd.RootCmd, StartAllCmd(ctx, config))
return result
}

func StartAllCmd(config *configuration.IdentityServerConfiguration) *cobra.Command {
func StartAllCmd(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
result := &cobra.Command{
Use: "start",
Short: "Starts the identity and charm servers",
Run: StartAll(config),
Run: StartAllTasks(ctx, config),
Aliases: []string{"s", "run", "serve", "publish", "pub", "p", "i", "y", "u", "o", "p", "q", "w", "e", "r", "t", "a", "s", "d", "f", "g", "h", "j", "k", "l", "z", "x", "c", "v", "b"},
}
result.AddCommand(StartCharmCmd(config), StartIdentityCmd(config), StartStreamCmd(config), StartAllAltCmd(*result))
result.AddCommand(StartCharmCmd(ctx, config), StartIdentityCmd(ctx, config), StartStreamCmd(ctx, config))
result.AddCommand(StartAllAltCmd(*result))
return result
}

func StartIdentityCmd(config *configuration.IdentityServerConfiguration) *cobra.Command {
func StartIdentityCmd(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
return &cobra.Command{
Use: "identity",
Short: "Starts only the identity server",
Run: StartIdentity(config),
Run: StartIdentityFromContext(ctx, config),
Aliases: []string{"id", "i"},
}
}

func StartStreamCmd(config *configuration.IdentityServerConfiguration) *cobra.Command {
func StartStreamCmd(ctx context.Context, config *configuration.IdentityServerConfiguration) *cobra.Command {
return &cobra.Command{
Use: "stream",
Short: "Starts only the stream server",
Run: StartStream(config),
Run: StartStreamFromContext(ctx, config),
Aliases: []string{"tr", "t"},
}
}

func StartAll(config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
func GetTasks(ctx context.Context, config *configuration.IdentityServerConfiguration) []func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string) {
return []func(ictx context.Context, wg *sync.WaitGroup) func(*cobra.Command, []string){
StartStream(config),
StartCharm(config),
StartIdentity(config),
}
}

func StartAllTasks(ctx context.Context, config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
tasks := []func(*cobra.Command, []string){
StartCharm(config),
StartIdentity(config),
StartStream(config),
if ctx == nil {
ctx = context.Background()
}
StartAll(ctx, GetTasks(ctx, config)...)(cmd, args)
}
}

func StartAll(ctx context.Context, tasks ...func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string)) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(len(tasks))

for _, task := range tasks {
go RunTask(&wg, task)(cmd, args)
go func(task func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string)) {
defer wg.Done()
defer cancel()
task(ctx, &wg)(cmd, args)
}(task)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, os.Interrupt, os.Kill)
defer signal.Stop(c)

select {
case <-c:
log.Info("SIGINT received, cancelling tasks.")
cancel()
case <-ctx.Done():
log.Info("Context cancelled, ensuring all tasks complete.")
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
<-done
fmt.Println("All tasks completed. Proceeding to cleanup and shutdown.")
wg.Wait()
log.Info("All tasks have completed or been cancelled. Proceeding to cleanup and shutdown.")
FinalShutdown(ctx, cmd, args, tasks...)
}
}

func RunTask(wg *sync.WaitGroup, taskFunc func(*cobra.Command, []string)) func(*cobra.Command, []string) {
func CleanupAndShutdown(cancel context.CancelFunc, done chan struct{}) {
cancel()
<-done
}

func FinalShutdown(ctx context.Context, cmd *cobra.Command, args []string, tasks ...func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string)) {
log.Info("All tasks cleaned up. Shutting down.", "len(tasks)", len(tasks), "command", cmd.Name(), "args", args)
log.Info("Bye!", "time", time.Now())
}

func RunTask(ctx context.Context, wg *sync.WaitGroup, taskFunc func(context.Context, *cobra.Command, []string)) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
if ctx == nil {
ctx = context.Background()
}
defer wg.Done()
taskFunc(cmd, args)
taskFunc(ctx, cmd, args)
}
}

func StartStream(config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
func StartStreamFromContext(ctx context.Context, config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
log.Info("Starting stream server")
if ctx == nil {
ctx = context.Background()
}
StartStream(config)(ctx, nil)(cmd, args)
}
}

func StartCharm(config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
func StartStream(config *configuration.IdentityServerConfiguration) func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string) {
return func(ctx context.Context, wg *sync.WaitGroup) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
// todo: do something with the wait group ! don't increment for stream itself, but for the tasks it runs !! todo: is this the wrong way around? fuck
log.Info("Starting stream server")
if ctx == nil {
ctx = context.Background()
}
stream.RunStreamServer(ctx, config, cmd, args)
}
}
}
func StartCharmFromContext(ctx context.Context, config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
if ctx == nil {
ctx = context.Background()
}
return func(cmd *cobra.Command, args []string) {
log.Info("Starting charm server")
charmcmd.ServeCmdRunE(cmd, args)
StartCharm(config)(ctx, nil)(cmd, args)
}
}

func StartIdentity(config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
log.Info("Starting identity server")
// todo split web and ssh into separate functions
connections := auth.NewSafeConnectionMap()
web.GoRunWebServer(connections, config)
handler := scp.NewFileSystemHandler(ScpFileSystemDirPath)
s, err := wish.NewServer(
wish.WithMiddleware(
scp.Middleware(handler, handler),
bubbletea.Middleware(TeaHandler), // todo: before bubbletea, use non-fullscreen teahandler to accept TOS if not this verion accepted in DB. check connection for previous tos, but this might need to be a charm user column? // separate todo: add tos table in database and pulldown latest tos on boot?
comment.Middleware("Thanks, have a nice day!"),
elapsed.Middleware(),
promwish.Middleware("0.0.0.0:9222", "identity"),
logging.Middleware(),
observability.Middleware(connections),
),
wish.WithPasswordAuth(func(ctx ssh.Context, password string) bool {
log.Info("Accepting password", "password", password, "len", len(password))
return Connect(ctx, nil, &password, nil, connections)
}),
wish.WithKeyboardInteractiveAuth(func(ctx ssh.Context, challenge gossh.KeyboardInteractiveChallenge) bool {
log.Info("Accepting keyboard interactive")
return Connect(ctx, nil, nil, challenge, connections)
}),
wish.WithPublicKeyAuth(func(ctx ssh.Context, key ssh.PublicKey) bool {
log.Info("Accepting public key", "publicKeyType", key.Type(), "publicKeyString", base64.StdEncoding.EncodeToString(key.Marshal()))
return Connect(ctx, key, nil, nil, connections)
}),
wish.WithBannerHandler(Banner(config)),
wish.WithAddress(fmt.Sprintf("%s:%d", config.Configuration.String("identity.server.host"), config.Configuration.Int("identity.server.ssh.port"))),
wish.WithHostKeyPath(HostKeyPath),
)
if err != nil {
log.Error("could not start server", "error", err)
return
func StartCharm(config *configuration.IdentityServerConfiguration) func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string) {
return func(ctx context.Context, wg *sync.WaitGroup) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
// todo: do something with the wait group ! don't increment for stream itself, but for the tasks it runs !! todo: is this the wrong way around? fuck
log.Info("Starting charm server")
if err := charmcmd.ServeCmdRunEWithContext(ctx, cmd, args); err != nil {
log.Error("Error running charm server command", "error", err)
}
}
}
}

done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
log.Info("Starting ssh server", "identity.server.host", config.Configuration.String("identity.server.host"), "identity.server.ssh.port", config.Configuration.Int("identity.server.ssh.port"), "address", fmt.Sprintf("%s:%d", config.Configuration.String("identity.server.host"), config.Configuration.Int("identity.server.ssh.port")))
go func() {
if err := s.ListenAndServe(); err != nil && !errors.Is(err, ssh.ErrServerClosed) {
func StartIdentityFromContext(ctx context.Context, config *configuration.IdentityServerConfiguration) func(*cobra.Command, []string) {
if ctx == nil {
ctx = context.Background()
}
return func(cmd *cobra.Command, args []string) {
StartIdentity(config)(ctx, nil)(cmd, args)
}
}
func StartIdentity(config *configuration.IdentityServerConfiguration) func(context.Context, *sync.WaitGroup) func(*cobra.Command, []string) {
return func(goctx context.Context, wg *sync.WaitGroup) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
// todo: do something with the wait group ! don't increment for stream itself, but for the tasks it runs !! todo: is this the wrong way around? fuck
log.Info("Starting identity server")
if goctx == nil {
goctx = context.Background()
}
connections := auth.NewSafeConnectionMap()
web.GoRunWebServer(goctx, connections, config)
handler := scp.NewFileSystemHandler(ScpFileSystemDirPath)
s, err := wish.NewServer(
wish.WithMiddleware(
scp.Middleware(handler, handler),
bubbletea.Middleware(TeaHandler),
comment.Middleware("Thanks, have a nice day!"),
elapsed.Middleware(),
promwish.MiddlewareWithContext(goctx, "0.0.0.0:9222", "identity", promwish.SkipDefaultDoneSignals(), promwish.WithWaitGroup(wg)),
logging.Middleware(),
observability.Middleware(connections),
),
wish.WithPasswordAuth(func(ctx ssh.Context, password string) bool {
log.Info("Accepting password", "password", password, "len", len(password))
return Connect(ctx, nil, &password, nil, connections)
}),
wish.WithKeyboardInteractiveAuth(func(ctx ssh.Context, challenge gossh.KeyboardInteractiveChallenge) bool {
log.Info("Accepting keyboard interactive")
return Connect(ctx, nil, nil, challenge, connections)
}),
wish.WithPublicKeyAuth(func(ctx ssh.Context, key ssh.PublicKey) bool {
log.Info("Accepting public key", "publicKeyType", key.Type(), "publicKeyString", base64.StdEncoding.EncodeToString(key.Marshal()))
return Connect(ctx, key, nil, nil, connections)
}),
wish.WithBannerHandler(Banner(config)),
wish.WithAddress(fmt.Sprintf("%s:%d", config.Configuration.String("identity.server.host"), config.Configuration.Int("identity.server.ssh.port"))),
wish.WithHostKeyPath(HostKeyPath),
)
if err != nil {
log.Error("could not start server", "error", err)
done <- os.Interrupt
return
}
}()

<-done
log.Info("Stopping ssh server", "identity.server.host", config.Configuration.String("identity.server.host"), "identity.server.ssh.port", config.Configuration.Int("identity.server.ssh.port"), "address", fmt.Sprintf("%s:%d", config.Configuration.String("identity.server.host"), config.Configuration.Int("identity.server.ssh.port")))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
log.Info("Starting ssh server", "identity.server.host", config.Configuration.String("identity.server.host"), "identity.server.ssh.port", config.Configuration.Int("identity.server.ssh.port"), "address", fmt.Sprintf("%s:%d", config.Configuration.String("identity.server.host"), config.Configuration.Int("identity.server.ssh.port")))
go func() {
if err := s.ListenAndServe(); err != nil && !errors.Is(err, ssh.ErrServerClosed) {
log.Error("could not start server", "error", err)
done <- os.Interrupt
}
}()

if err := s.Shutdown(ctx); err != nil && !errors.Is(err, ssh.ErrServerClosed) {
log.Error("could not stop server", "error", err)
go func() {
select {
case <-goctx.Done():
done <- os.Interrupt
case <-done:
}
}()

<-done
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil && !errors.Is(err, ssh.ErrServerClosed) {
log.Error("could not stop server", "error", err)
}
log.Info("Stopping ssh server", "identity.server.host", config.Configuration.String("identity.server.host"), "identity.server.ssh.port", config.Configuration.Int("identity.server.ssh.port"), "address", fmt.Sprintf("%s:%d", config.Configuration.String("identity.server.host"), config.Configuration.Int("identity.server.ssh.port")))
}
}
}
Expand Down
Loading

0 comments on commit ef78b9f

Please sign in to comment.