-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpacket_dump_handler.go
125 lines (107 loc) · 2.79 KB
/
packet_dump_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package suzu
import (
"context"
"encoding/json"
"io"
"os"
"sync"
"time"
)
func init() {
NewServiceHandlerFuncs.register("dump", NewPacketDumpHandler)
}
type PacketDumpHandler struct {
Config Config
ChannelID string
ConnectionID string
SampleRate uint32
ChannelCount uint16
LanguageCode string
RetryCount int
mu sync.Mutex
OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error
}
func NewPacketDumpHandler(config Config, channelID, connectionID string, sampleRate uint32, channelCount uint16, languageCode string, onResultFunc any) serviceHandlerInterface {
return &PacketDumpHandler{
Config: config,
ChannelID: channelID,
ConnectionID: connectionID,
SampleRate: sampleRate,
ChannelCount: channelCount,
LanguageCode: languageCode,
OnResultFunc: onResultFunc.(func(context.Context, io.WriteCloser, string, string, string, any) error),
}
}
type PacketDumpResult struct {
Timestamp int64 `json:"timestamp"`
ChannelID string `json:"channel_id"`
ConnectionID string `json:"connection_id"`
LanguageCode string `json:"language_code"`
SampleRate uint32 `json:"sample_rate"`
ChannelCount uint16 `json:"channel_count"`
Payload []byte `json:"payload"`
}
func (h *PacketDumpHandler) UpdateRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount++
return h.RetryCount
}
func (h *PacketDumpHandler) GetRetryCount() int {
return h.RetryCount
}
func (h *PacketDumpHandler) ResetRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount = 0
return h.RetryCount
}
func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan opusChannel, header soraHeader) (*io.PipeReader, error) {
c := h.Config
filename := c.DumpFile
channelID := h.ChannelID
connectionID := h.ConnectionID
r, w := io.Pipe()
reader := opusChannelToIOReadCloser(ctx, opusCh)
go func() {
f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
w.CloseWithError(err)
return
}
defer f.Close()
defer w.Close()
mw := io.MultiWriter(f, w)
encoder := json.NewEncoder(mw)
for {
buf := make([]byte, FrameSize)
n, err := reader.Read(buf)
if err != nil {
return
}
if n > 0 {
dump := &PacketDumpResult{
Timestamp: time.Now().UnixMilli(),
ChannelID: channelID,
ConnectionID: connectionID,
LanguageCode: h.LanguageCode,
SampleRate: h.SampleRate,
ChannelCount: h.ChannelCount,
Payload: buf[:n],
}
if h.OnResultFunc != nil {
if err := h.OnResultFunc(ctx, w, h.ChannelID, h.ConnectionID, h.LanguageCode, dump); err != nil {
w.CloseWithError(err)
return
}
} else {
if err := encoder.Encode(dump); err != nil {
w.CloseWithError(err)
return
}
}
}
}
}()
return r, nil
}