Skip to content

Commit

Permalink
add replay commands (#370)
Browse files Browse the repository at this point in the history
* add replay commands

* capitalize
  • Loading branch information
frostbyte73 authored Jul 11, 2024
1 parent bfbbc18 commit cc3b0c5
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/lk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func main() {
app.Commands = append(app.Commands, JoinCommands...)
app.Commands = append(app.Commands, EgressCommands...)
app.Commands = append(app.Commands, IngressCommands...)
app.Commands = append(app.Commands, ReplayCommands...)
app.Commands = append(app.Commands, LoadTestCommands...)
app.Commands = append(app.Commands, ProjectCommands...)
app.Commands = append(app.Commands, SIPCommands...)
Expand Down
223 changes: 223 additions & 0 deletions cmd/lk/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package main

import (
"context"
"fmt"
"net/http"
"os"

"github.com/olekukonko/tablewriter"
"github.com/twitchtv/twirp"
"github.com/urfave/cli/v3"

"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/replay"
lksdk "github.com/livekit/server-sdk-go/v2"
)

var (
ReplayCommands = []*cli.Command{
{
Name: "replay",
Usage: "experimental (not yet available)",
Category: "I/O",
Hidden: true,
HideHelpCommand: true,
Commands: []*cli.Command{
{
Name: "list",
Before: createReplayClient,
Action: listReplays,
},
{
Name: "load",
Before: createReplayClient,
Action: loadReplay,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
Usage: "Replay `ID`",
Required: true,
},
&cli.StringFlag{
Name: "room",
Usage: "Playback room name",
Required: true,
},
&cli.IntFlag{
Name: "pts",
Usage: "Playback start time",
Required: false,
},
},
},
{
Name: "seek",
Before: createReplayClient,
Action: seek,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
Usage: "Playback `ID`",
Required: true,
},
&cli.IntFlag{
Name: "pts",
Usage: "Playback start time",
Required: true,
},
},
},
{
Name: "close",
Before: createReplayClient,
Action: closeReplay,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
Usage: "Playback `ID`",
Required: true,
},
},
},
{
Name: "delete",
Before: createReplayClient,
Action: deleteReplay,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
Usage: "Replay `ID`",
Required: true,
},
},
},
},
},
}

replayClient *replayServiceClient
)

func createReplayClient(ctx context.Context, cmd *cli.Command) error {
pc, err := loadProjectDetails(cmd)
if err != nil {
return err
}

url := lksdk.ToHttpURL(pc.URL)
client := replay.NewReplayProtobufClient(url, &http.Client{}, withDefaultClientOpts(pc)...)
replayClient = &replayServiceClient{
Replay: client,
apiKey: pc.APIKey,
apiSecret: pc.APISecret,
}
return nil
}

func listReplays(ctx context.Context, _ *cli.Command) error {
ctx, err := replayClient.withAuth(ctx)
if err != nil {
return err
}

req := &replay.ListReplaysRequest{}
res, err := replayClient.ListReplays(ctx, req)
if err != nil {
return err
}

table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"ReplayID"})
for _, info := range res.Replays {
table.Append([]string{
info.ReplayId,
})
}
table.Render()

return nil
}

func loadReplay(ctx context.Context, cmd *cli.Command) error {
ctx, err := replayClient.withAuth(ctx)
if err != nil {
return err
}

req := &replay.LoadReplayRequest{
ReplayId: cmd.String("id"),
RoomName: cmd.String("room"),
StartingPts: cmd.Int("pts"),
}
res, err := replayClient.LoadReplay(ctx, req)
if err != nil {
return err
}
fmt.Println("PlaybackID:", res.PlaybackId)

return nil
}

func seek(ctx context.Context, cmd *cli.Command) error {
ctx, err := replayClient.withAuth(ctx)
if err != nil {
return err
}

req := &replay.RoomSeekRequest{
PlaybackId: cmd.String("id"),
Pts: cmd.Int("pts"),
}
_, err = replayClient.SeekForRoom(ctx, req)
return err
}

func closeReplay(ctx context.Context, cmd *cli.Command) error {
ctx, err := replayClient.withAuth(ctx)
if err != nil {
return err
}

req := &replay.CloseReplayRequest{
PlaybackId: cmd.String("id"),
}
_, err = replayClient.CloseReplay(ctx, req)
return err
}

func deleteReplay(ctx context.Context, cmd *cli.Command) error {
ctx, err := replayClient.withAuth(ctx)
if err != nil {
return err
}

req := &replay.DeleteReplayRequest{
ReplayId: cmd.String("id"),
}
_, err = replayClient.DeleteReplay(ctx, req)
return err
}

// temporary replay service client - will eventually move to go SDK
type replayServiceClient struct {
replay.Replay
apiKey string
apiSecret string
}

func (c *replayServiceClient) withAuth(ctx context.Context) (context.Context, error) {
at := auth.NewAccessToken(c.apiKey, c.apiSecret)
token, err := at.ToJWT()
if err != nil {
return nil, err
}

return twirp.WithHTTPRequestHeaders(ctx, newHeaderWithToken(token))
}

func newHeaderWithToken(token string) http.Header {
header := make(http.Header)
header.Set("Authorization", "Bearer "+token)
return header
}
15 changes: 15 additions & 0 deletions cmd/lk/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ var (
Name: "departure-timeout",
Usage: "Number of `SECS` to keep the room open after the last participant leaves",
},
&cli.BoolFlag{
Name: "replay-enabled",
Usage: "experimental (not yet available)",
Hidden: true,
},
},
},
{
Expand Down Expand Up @@ -341,6 +346,11 @@ var (
Name: "departure-timeout",
Usage: "number of seconds to keep the room open after the last participant leaves",
},
&cli.BoolFlag{
Name: "replay-enabled",
Usage: "experimental (not yet available)",
Hidden: true,
},
},
},
{
Expand Down Expand Up @@ -607,6 +617,11 @@ func createRoom(ctx context.Context, cmd *cli.Command) error {
req.DepartureTimeout = uint32(departureTimeout)
}

if replayEnabled := cmd.Bool("replay-enabled"); replayEnabled {
fmt.Printf("setting replay enabled: %t\n", replayEnabled)
req.ReplayEnabled = replayEnabled
}

room, err := roomClient.CreateRoom(ctx, req)
if err != nil {
return err
Expand Down

0 comments on commit cc3b0c5

Please sign in to comment.