Skip to content

Commit

Permalink
dumper concurrent reading (#19)
Browse files Browse the repository at this point in the history
* dumper concurrent reading

* fix

* update ipv6 regexp rule
  • Loading branch information
sjzar authored Nov 21, 2023
1 parent c446a8f commit e6b2afe
Show file tree
Hide file tree
Showing 18 changed files with 791 additions and 35 deletions.
1 change: 1 addition & 0 deletions cmd/ips/cmd_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
dumpCmd.Flags().StringVarP(&readerOption, "input-option", "", "", UsageReaderOption)
dumpCmd.Flags().StringVarP(&hybridMode, "hybrid-mode", "", "aggregation", UsageHybridMode)
dumpCmd.Flags().StringVarP(&outputFile, "output-file", "o", "", UsageDumpOutputFile)
dumpCmd.Flags().IntVarP(&readerJobs, "reader-jobs", "", 0, UsageReaderJobs)

}

Expand Down
1 change: 1 addition & 0 deletions cmd/ips/cmd_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
packCmd.Flags().StringVarP(&outputFile, "output-file", "o", "", UsagePackOutputFile)
packCmd.Flags().StringVarP(&outputFormat, "output-format", "", "", UsagePackOutputFormat)
packCmd.Flags().StringVarP(&writerOption, "output-option", "", "", UsageWriterOption)
packCmd.Flags().IntVarP(&readerJobs, "reader-jobs", "", 0, UsageReaderJobs)

}

Expand Down
7 changes: 7 additions & 0 deletions cmd/ips/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ var (
// writerOption specifies the options for the writer.
writerOption string

// readerJobs specifies the number of concurrent reader jobs.
readerJobs int

// myip
// localAddr specifies the local address (in IP format) that should be used for outbound connections.
// Useful in systems with multiple network interfaces.
Expand Down Expand Up @@ -237,6 +240,10 @@ func GetFlagConfig() *ips.Config {
conf.WriterOption = writerOption
}

if readerJobs != 0 {
conf.ReaderJobs = readerJobs
}

if len(addr) != 0 {
conf.Addr = addr
}
Expand Down
1 change: 1 addition & 0 deletions cmd/ips/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
UsageReaderOption = "Additional options for the database reader, if applicable."
UsageWriterOption = "Additional options for the database writer, if applicable."
UsageHybridMode = "Sets mode for multi-IP source handling; 'comparison' to compare, 'aggregation' to merge data."
UsageReaderJobs = "Set the number of concurrent reader jobs. This parameter controls the parallelism level of reading operations."

// Output Flags

Expand Down
11 changes: 11 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [dp_rewriter_files](#dprewriterfiles)
* [reader_option](#readeroption)
* [writer_option](#writeroption)
* [reader_jobs](#readerjobs)
* [myip_count](#myipcount)
* [myip_timeout_s](#myiptimeouts)
* [addr](#addr)
Expand Down Expand Up @@ -330,6 +331,16 @@ $ ips config set rewrite_files "/path/to/rewrite1.txt,/path/to/rewrite2.txt"

例如 `mmdb` 数据库的 `select_languages` 等,具体功能请查阅数据库文档。

### reader_jobs

`reader_jobs` 参数用于控制读取操作的并发作业数量。它定义了可以同时进行的读取操作的最大数目,从而实现高效的数据处理。

默认情况下无需设置 `reader_jobs`,IPS 将根据系统的 CPU 核心数与输出格式自动设置。

值得注意的是,如果并发数设置过高,可能导致系统资源竞争加剧,进而影响程序的整体性能表现。

在某些情况下,增加读取器的并发数并不会带来性能提升。实际上,任务完成时间取决于读取器和写入器中的较慢的一方,尤其是大部分写入器(例如 IPDB 和 MMDB)目前还不支持并发写入,请根据自身情况选择适合的并发数。

### myip_count

在查询本机 IP 地址时,此参数定义了返回相同 IP 地址的最小探测器数量。默认值为 `3`
Expand Down
11 changes: 11 additions & 0 deletions docs/config_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [dp_rewriter_files](#dprewriterfiles)
* [reader_option](#readeroption)
* [writer_option](#writeroption)
* [reader_jobs](#readerjobs)
* [myip_count](#myipcount)
* [myip_timeout_s](#myiptimeouts)
* [addr](#addr)
Expand Down Expand Up @@ -332,6 +333,16 @@ Some database formats provide additional writing options, which can be set durin

For example, `mmdb` database's `select_languages` and so on, please refer to the database documentation for specific functions.

### reader_jobs

The `reader_jobs` parameter is designed to control the number of concurrent jobs for reading operations. It specifies the maximum number of reading operations that can be performed simultaneously, thereby enhancing the efficiency of data processing.

By default, setting `reader_jobs` is not necessary, as IPS will automatically determine the appropriate number based on the system's CPU core count and the output format.

It's important to note that setting an excessively high number of concurrent jobs may lead to intensified competition for system resources, potentially degrading the overall performance of the program.

In some cases, increasing the concurrency of readers does not result in performance improvement. In fact, the completion time of tasks depends on the slower of the readers and writers. This is particularly relevant as most writers (such as IPDB and MMDB) currently do not support concurrent writing. Therefore, choose a suitable concurrency level based on your specific circumstances.

### myip_count

When querying the local IP address, this parameter defines the minimum number of detectors that return the same IP address. The default value is `3`.
Expand Down
5 changes: 5 additions & 0 deletions format/ipdb/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,8 @@ func IntToBinaryBE(num, length int) []byte {
return []byte{}
}
}

// WriterFormat returns the format of the writer.
func (w *Writer) WriterFormat() string {
return DBFormat
}
5 changes: 5 additions & 0 deletions format/mmdb/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func (w *Writer) WriteTo(iw io.Writer) (int64, error) {
return w.writer.WriteTo(iw)
}

// WriterFormat returns the format of the writer.
func (w *Writer) WriterFormat() string {
return DBFormat
}

// ConvertMap converts fields and values to a map.
func (w *Writer) ConvertMap(fields, values []string) map[string]interface{} {
ret := make(map[string]interface{})
Expand Down
5 changes: 5 additions & 0 deletions format/plain/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ func (w *Writer) Header() error {

return nil
}

// WriterFormat returns the format of the writer.
func (w *Writer) WriterFormat() string {
return DBFormat
}
3 changes: 3 additions & 0 deletions format/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Writer interface {

// WriteTo writes data to io.Writer
WriteTo(w io.Writer) (int64, error)

// WriterFormat returns the format of the writer.
WriterFormat() string
}

// NewWriter creates a Writer based on its format or file name.
Expand Down
186 changes: 155 additions & 31 deletions internal/ipio/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@
package ipio

import (
"context"
"net"
"reflect"
"runtime"
"sync"

log "github.com/sirupsen/logrus"

"github.com/sjzar/ips/format"
"github.com/sjzar/ips/format/plain"
"github.com/sjzar/ips/ipnet"
"github.com/sjzar/ips/pkg/errors"
"github.com/sjzar/ips/pkg/model"
)

// ChannelBufferSize why 1000? I don't know :)
const ChannelBufferSize = 1000

// StandardDumper serves as a standard mechanism to transfer IP database from one format to another.
type StandardDumper struct {
format.Reader
Expand All @@ -49,62 +56,179 @@ func NewStandardDumper(r format.Reader, w format.Writer) *StandardDumper {
// Dump is a convenience method to transfer IP data from a reader to a writer.
// It is equivalent to calling NewStandardDumper(r, w).Dump().
func Dump(r format.Reader, w format.Writer) error {
return NewStandardDumper(r, w).Dump()
return NewStandardDumper(r, w).Dump(0)
}

// Dump transfers IP data from the Reader to the Writer.
func (d *StandardDumper) Dump() error {
for d.Next() {
if err := d.Insert(d.info); err != nil {
return err
func (d *StandardDumper) Dump(readerJobs int) error {
if readerJobs <= 0 {
switch d.WriterFormat() {
case plain.DBFormat:
readerJobs = 1
default:
readerJobs = runtime.NumCPU()
}
}

ipStart, ipEnd := net.IPv4(0, 0, 0, 0), ipnet.LastIPv4
if d.Meta().IsIPv6Support() {
ipStart, ipEnd = make(net.IP, net.IPv6len), ipnet.LastIPv6
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

retChan := make(chan *model.IPInfo, ChannelBufferSize)
errChan := make(chan error, readerJobs)
defer close(errChan)
wg := sync.WaitGroup{}

split := ipnet.SplitIPNet(ipStart, ipEnd, readerJobs)
for i := 0; i < len(split)-1; i++ {
wg.Add(1)
go func(ctx context.Context, start, end net.IP) {
defer wg.Done()
sd := SimpleDumper{
Reader: d.Reader,
ipStart: start,
ipEnd: end,
}
if err := sd.Dump(ctx, retChan); err != nil {
errChan <- err
return
}
}(ctx, split[i], split[i+1])
}

go func() {
wg.Wait()
close(retChan)
}()

for {
select {
case ipInfo, ok := <-retChan:
if !ok {
return nil
}
if err := d.Insert(ipInfo); err != nil {
log.Debug("StandardDumper Insert() failed ", ipInfo, err)
return err
}
case err := <-errChan:
if err != nil {
log.Debug("StandardDumper Dump2() failed ", err)
return err
}
}
}
if d.err != nil {
return d.err
}

// SimpleDumper is a structure that facilitates the extraction of IP information within a specified range.
type SimpleDumper struct {
format.Reader
ipStart net.IP
ipEnd net.IP
}

// Dump iterates over IP addresses in the specified range, sending IP information to retChan.
// The operation is context-aware and will stop if the context is cancelled.
func (d *SimpleDumper) Dump(ctx context.Context, retChan chan<- *model.IPInfo) error {
// Validate the IP range before proceeding.
if d.ipStart == nil || d.ipEnd == nil || ipnet.IPLess(d.ipEnd, d.ipStart) {
return errors.ErrInvalidIPRange
}

marker := d.ipStart
info, err := d.Find(marker)
if err != nil {
return err
}

// Adjust marker if it doesn't match the start of the IP range.
if !marker.Equal(info.IPNet.Start) {
marker = ipnet.NextIP(info.IPNet.End.To16())
}

// Iterate over IP addresses until the end of the range is reached.
for done := d.done(marker, false); !done; done = d.done(marker, true) {
select {
case <-ctx.Done(): // Check if the operation was cancelled.
return nil
default:
}
info, err = d.next(marker)
if err != nil {
return err
}
if info == nil {
break
}
retChan <- info // Send IP information to the channel.
marker = ipnet.NextIP(info.IPNet.End.To16())
}

return nil
}

// Next fetches the next IP information from the Reader.
func (d *StandardDumper) Next() bool {
if d.done {
// done checks whether the end of the IP range has been reached.
func (d *SimpleDumper) done(marker net.IP, started bool) bool {
if marker == nil {
return false
}
if d.marker == nil {
if d.Meta().IsIPv6Support() {
d.marker = make(net.IP, net.IPv6len)
} else {
d.marker = net.IPv4(0, 0, 0, 0)
}

// Check if the current marker is outside the IP range.
if !ipnet.Contains(d.ipStart, d.ipEnd, marker) {
return true
}
d.info = nil

// Continuously fetch IP information until either a change in the data is found or the end of the IP range is reached.
// Check if the end of the range is reached.
return started && d.ipEnd.Equal(ipnet.PrevIP(marker))
}

// next retrieves the next IP information from the Reader based on the marker.
func (d *SimpleDumper) next(marker net.IP) (*model.IPInfo, error) {
if marker == nil {
marker = d.ipStart
}

var currentInfo *model.IPInfo
for {
info, err := d.Find(d.marker)
info, err := d.Find(marker)
if err != nil {
log.Debug("StandardDumper Find() failed ", d.marker, info, err)
d.err = err
return false
return nil, err
}

if d.info == nil {
d.info = info
// Determine if a new IP range is encountered.
if currentInfo == nil {
currentInfo = info
} else {
if !reflect.DeepEqual(d.info.Values(), info.Values()) {
if !Equal(currentInfo.Values(), info.Values()) {
break
}
if ok := d.info.IPNet.Join(info.IPNet); !ok {
if ok := currentInfo.IPNet.Join(info.IPNet); !ok {
break
}
}

// Move to the next IP address in the range.
d.marker = ipnet.NextIP(info.IPNet.End)
if ipnet.IsLastIP(info.IPNet.End, d.Meta().IsIPv6Support()) {
d.done = true
marker = ipnet.NextIP(info.IPNet.End)
if d.done(marker, true) {
break
}
}
return currentInfo, nil
}

// Equal compares two slices of strings for equality.
// FIXME use slices.Equal() when go.mod updates to Go 1.18
func Equal(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}
for i := range s1 {
if s1[i] != s2[i] {
return false
}
}
return true
}
4 changes: 4 additions & 0 deletions internal/ips/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ type Config struct {
// WriterOption specifies the options for the writer.
WriterOption string `mapstructure:"writer_option"`

// ReaderJobs specifies the number of concurrent jobs for the reader.
// It controls how many reading operations can be performed in parallel.
ReaderJobs int `mapstructure:"reader_jobs"`

// MyIP
// LocalAddr specifies the local address (in IP format) that should be used for outbound connections.
// Useful in systems with multiple network interfaces.
Expand Down
2 changes: 1 addition & 1 deletion internal/ips/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *Manager) Pack(_format, file []string, _outputFormat, outputFile string)

// Dump data using the dumper
dumper := ipio.NewStandardDumper(reader, writer)
if err := dumper.Dump(); err != nil {
if err := dumper.Dump(m.Conf.ReaderJobs); err != nil {
log.Debug("dumper.Dump error: ", err)
return err
}
Expand Down
Loading

0 comments on commit e6b2afe

Please sign in to comment.