This repository has been archived by the owner on Feb 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.go
102 lines (86 loc) · 1.95 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package metric
import (
"fmt"
)
// DataProvider provide the data
type DataProvider interface {
// Read the data from data provider
// data provider send the data to the channel one by one
ReadData(ch chan *Item) error
}
// Closer offer close function for repo
type Closer interface {
// close the repo
Close() error
}
// Inserter the data to backend
type Inserter interface {
Insert(logs []Log) error
}
// Repo is backend
type Repo interface {
Inserter
Closer
}
// Service is metrics item service
type Service interface {
Save() error
Read() ([]*Item, error)
}
type service struct {
provider DataProvider
repo Repo
}
// NewService return a new log service
func NewService(p DataProvider, r Repo) Service {
return &service{
provider: p,
repo: r,
}
}
// Save the data read from the data provider
func (ser *service) Save() error {
if ser.repo == nil {
panic("the repo backend is nil")
}
if ser.provider == nil {
panic("the data provider is nil")
}
defer ser.repo.Close()
ch := make(chan *Item)
go ser.provider.ReadData(ch)
logs := make([]Log, 0, 1000)
for {
item, isOpen := <-ch
// fmt.Println(item)
if !isOpen && len(logs) != 0 {
if err := ser.repo.Insert(logs); err != nil {
return fmt.Errorf("service save data failure. %w", err)
}
break
} else if len(logs) < 1000 {
logs = append(logs, item.Log)
} else if len(logs) == 1000 {
logs = append(logs, item.Log)
if err := ser.repo.Insert(logs); err != nil {
return fmt.Errorf("service save data failure. %w", err)
}
logs = nil
}
}
return nil
}
// Read the data from data providr
func (ser *service) Read() ([]*Item, error) {
// 判断 service 结构体里是否有 provider. 如果没有则 panic
if ser.provider == nil {
panic(fmt.Sprintf("data provider is null. cant not read data"))
}
ch := make(chan *Item)
go ser.provider.ReadData(ch)
var items []*Item
for item := range ch {
items = append(items, item)
}
return items, nil
}