Skip to content

Commit

Permalink
update lazymap
Browse files Browse the repository at this point in the history
add rate limitting for NotFound
  • Loading branch information
vintikzzz committed Dec 17, 2024
1 parent 4510133 commit b3ca0b3
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 111 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/urfave/cli v1.22.16
github.com/webtor-io/abuse-store v0.0.0-20221220184115-12e6c0615a10
github.com/webtor-io/common-services v0.0.0-20241022160325-d391acd827ab
github.com/webtor-io/lazymap v0.0.0-20221030185154-1799721becef
github.com/webtor-io/lazymap v0.0.0-20241211155941-e81d935cfa1d
golang.org/x/sys v0.26.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.56.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ github.com/webtor-io/common-services v0.0.0-20241022160325-d391acd827ab h1:71AxB
github.com/webtor-io/common-services v0.0.0-20241022160325-d391acd827ab/go.mod h1:6jUeO6R+ytZnEJj7PlcLEQZfWaxw8ovav73BP83MTlI=
github.com/webtor-io/lazymap v0.0.0-20221030185154-1799721becef h1:tSAcIGxgmsxFJnLRyTkhzFhjGAtaOZ5g2osBHG3JYBs=
github.com/webtor-io/lazymap v0.0.0-20221030185154-1799721becef/go.mod h1:za/bioTGK3VjG3+mK7/kpx0TV8++ytZkdOQ1MJ2HTjM=
github.com/webtor-io/lazymap v0.0.0-20241211155941-e81d935cfa1d h1:Xi9E0LCDgK++QliA7ZNFdSI11Bpg5qe7efN3AMWJ3dY=
github.com/webtor-io/lazymap v0.0.0-20241211155941-e81d935cfa1d/go.mod h1:kioEFK4hk8YfHrhg47tGvMG40xawOJM4gcfRQ4EeX4k=
github.com/webtor-io/stoplist v0.0.0-20230128160543-ea87bdc34deb h1:RCjga119RT7hTqYeELSGPTVCYfMUIyaSyc2QCaDk/ik=
github.com/webtor-io/stoplist v0.0.0-20230128160543-ea87bdc34deb/go.mod h1:nlKK64Domln2CfQUQiP2+RcbD0IQPjoBfHDYmiGLuqY=
github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
Expand Down
44 changes: 17 additions & 27 deletions services/abuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
)

type Abuse struct {
lazymap.LazyMap
lazymap.LazyMap[bool]
cl *AbuseClient
}

Expand All @@ -39,36 +39,26 @@ func NewAbuse(c *cli.Context, cl *AbuseClient) *Abuse {
}
return &Abuse{
cl: cl,
LazyMap: lazymap.New(&lazymap.Config{
LazyMap: lazymap.New[bool](&lazymap.Config{
Expire: time.Minute,
ErrorExpire: 10 * time.Second,
StoreErrors: false,
}),
}
}

func (s *Abuse) get(h string) error {
cl, err := s.cl.Get()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
r, err := cl.Check(ctx, &as.CheckRequest{Infohash: h})
if err != nil {
return err
}
if r.GetExists() {
return ErrAbuse
}
return nil
}

func (s *Abuse) Get(h string) error {
_, err := s.LazyMap.Get(h, func() (interface{}, error) {
return nil, s.get(h)
func (s *Abuse) Get(ctx context.Context, h string) (bool, error) {
return s.LazyMap.Get(h, func() (bool, error) {
cl, err := s.cl.Get()
if err != nil {
return false, err
}
r, err := cl.Check(ctx, &as.CheckRequest{Infohash: h})
if err != nil {
return false, err
}
if r.GetExists() {
return true, nil
}
return false, nil
})
if err != nil {
return err
}
return nil
}
15 changes: 10 additions & 5 deletions services/providers/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ func (s *Badger) Name() string {
return "badger"
}

func (s *Badger) Touch(_ context.Context, h string) (err error) {
func (s *Badger) Touch(_ context.Context, h string) (ok bool, err error) {
err = s.db.Update(func(txn *badger.Txn) error {
i, err := txn.Get([]byte(h))
if errors.Is(err, badger.ErrKeyNotFound) {
return ss.ErrNotFound

} else {
err = i.Value(func(val []byte) error {
e := badger.NewEntry([]byte(h), val).WithTTL(s.exp)
Expand All @@ -66,15 +65,21 @@ func (s *Badger) Touch(_ context.Context, h string) (err error) {
return err
}
})
return
if err != nil {
return false, err
}
return true, nil
}

func (s *Badger) Push(_ context.Context, h string, torrent []byte) (err error) {
func (s *Badger) Push(_ context.Context, h string, torrent []byte) (ok bool, err error) {
err = s.db.Update(func(txn *badger.Txn) error {
e := badger.NewEntry([]byte(h), torrent).WithTTL(s.exp)
return txn.SetEntry(e)
})
return
if err != nil {
return false, err
}
return true, nil
}

func (s *Badger) Pull(_ context.Context, h string) (torrent []byte, err error) {
Expand Down
16 changes: 10 additions & 6 deletions services/providers/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,27 @@ func (s *Redis) Name() string {
return "redis"
}

func (s *Redis) Touch(ctx context.Context, h string) (err error) {
func (s *Redis) Touch(ctx context.Context, h string) (ok bool, err error) {
cl := s.cl.Get()

res, err := cl.Expire(ctx, h, s.exp).Result()

if err != nil {
return err
return false, err
}
if !res {
return ss.ErrNotFound
return false, ss.ErrNotFound
}
return nil
return true, nil
}

func (s *Redis) Push(ctx context.Context, h string, torrent []byte) (err error) {
func (s *Redis) Push(ctx context.Context, h string, torrent []byte) (ok bool, err error) {
cl := s.cl.Get()
return cl.Set(ctx, h, torrent, s.exp).Err()
err = cl.Set(ctx, h, torrent, s.exp).Err()
if err != nil {
return false, err
}
return true, nil
}

func (s *Redis) Pull(ctx context.Context, h string) (torrent []byte, err error) {
Expand Down
39 changes: 21 additions & 18 deletions services/providers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,22 @@ func (s *S3) Name() string {
return "s3"
}

func (s *S3) Touch(ctx context.Context, h string) (err error) {
//cl := s.cl.Get()
//r, err := cl.GetObjectWithContext(ctx, &s3.GetObjectInput{
// Bucket: aws.String(s.bucket),
// Key: aws.String(h),
//})
//if err != nil {
// if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == s3.ErrCodeNoSuchKey {
// return ss.ErrNotFound
// }
// return err
//}
//defer func(Body io.ReadCloser) {
// _ = Body.Close()
//}(r.Body)
return nil
func (s *S3) Touch(ctx context.Context, h string) (ok bool, err error) {
cl := s.cl.Get()
r, err := cl.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(h),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == s3.ErrCodeNoSuchKey {
return false, ss.ErrNotFound
}
return false, err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(r.Body)
return true, nil
}

func (s *S3) makeAWSMD5(b []byte) *string {
Expand All @@ -78,7 +78,7 @@ func (s *S3) makeAWSMD5(b []byte) *string {
return aws.String(m)
}

func (s *S3) Push(ctx context.Context, h string, torrent []byte) (err error) {
func (s *S3) Push(ctx context.Context, h string, torrent []byte) (ok bool, err error) {
cl := s.cl.Get()
_, err = cl.PutObjectWithContext(ctx,
&s3.PutObjectInput{
Expand All @@ -87,7 +87,10 @@ func (s *S3) Push(ctx context.Context, h string, torrent []byte) (err error) {
Body: bytes.NewReader(torrent),
ContentMD5: s.makeAWSMD5(torrent),
})
return
if err != nil {
return false, err
}
return true, nil
}

func (s *S3) Pull(ctx context.Context, h string) (torrent []byte, err error) {
Expand Down
46 changes: 23 additions & 23 deletions services/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,23 @@ func NewServer(s *Store, a *Abuse, sl *Stoplist) *Server {

func (s *Server) Pull(ctx context.Context, in *pb.PullRequest) (*pb.PullReply, error) {
t := time.Now()
hLog := log.WithField("infoHash", in.GetInfoHash())

hLog := log.WithField("infoHash", in.GetInfoHash()).WithField("method", "pull")
hLog.Info("pull torrent request")

err := s.isAbused(in.GetInfoHash())
if errors.Is(err, ErrAbuse) {
hLog.WithField("duration", time.Since(t)).Warn("abused")
return nil, status.Errorf(codes.PermissionDenied, "Restricted by the rightholder infoHash=%v", in.GetInfoHash())
} else if err != nil {
abused, err := s.isAbused(ctx, in.GetInfoHash())
if err != nil {
hLog.WithField("duration", time.Since(t)).WithError(err).Error("failed to check abuse")
return nil, errors.Wrapf(err, "failed to check abuse infoHash=%v", in.GetInfoHash())
}

if abused {
hLog.WithField("duration", time.Since(t)).Warn("abused")
return nil, status.Errorf(codes.PermissionDenied, "restricted by the rightholder infoHash=%v", in.GetInfoHash())
}
torrent, err := s.s.Pull(ctx, in.GetInfoHash())
if errors.Is(err, ErrNotFound) {
hLog.WithField("duration", time.Since(t)).Info("torrent not found")
return nil, status.Errorf(codes.NotFound, "Unable to find torrent for infoHash=%v", in.GetInfoHash())
return nil, status.Errorf(codes.NotFound, "unable to find torrent for infoHash=%v", in.GetInfoHash())
} else if err != nil {
hLog.WithField("duration", time.Since(t)).WithError(err).Error("failed to pull")
return nil, errors.Wrapf(err, "failed to pull torrent infoHash=%v", in.GetInfoHash())
Expand All @@ -69,7 +70,7 @@ func (s *Server) checkStoplist(torrent []byte, log *log.Entry, t time.Time, hash
}
if cr.Found {
log.WithField("duration", time.Since(t)).Warnf("found in stoplist %v", cr.String())
return status.Errorf(codes.PermissionDenied, "Found in stoplist infoHash=%v", hash)
return status.Errorf(codes.PermissionDenied, "found in stoplist infoHash=%v", hash)
}
return nil
}
Expand All @@ -83,23 +84,25 @@ func (s *Server) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushReply, e
return nil, err
}
infoHash := mi.HashInfoBytes().HexString()
hLog := log.WithField("infoHash", infoHash)
hLog := log.WithField("infoHash", infoHash).WithField("method", "push")
hLog.Info("push torrent request")

err = s.checkStoplist(in.GetTorrent(), hLog, t, infoHash)
if err != nil {
return nil, err
}

err = s.isAbused(infoHash)
if errors.Is(err, ErrAbuse) {
hLog.WithField("duration", time.Since(t)).Warn("abused")
return nil, status.Errorf(codes.PermissionDenied, "Restricted by the rightholder infoHash=%v", infoHash)
} else if err != nil {
abused, err := s.isAbused(ctx, infoHash)
if err != nil {
hLog.WithField("duration", time.Since(t)).WithError(err).Error("failed to check abuse")
return nil, errors.Wrapf(err, "failed to check abuse infoHash=%v", infoHash)
}
err = s.s.Push(ctx, infoHash, in.GetTorrent())
if abused {
hLog.WithField("duration", time.Since(t)).Warn("abused")
return nil, status.Errorf(codes.PermissionDenied, "restricted by the rightholder infoHash=%v", infoHash)
}

_, err = s.s.Push(ctx, infoHash, in.GetTorrent())
if err != nil {
hLog.WithField("duration", time.Since(t)).WithError(err).Error("failed to push")
return nil, errors.Wrapf(err, "failed to push torrent infoHash=%v", infoHash)
Expand All @@ -109,20 +112,17 @@ func (s *Server) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushReply, e
return &pb.PushReply{InfoHash: infoHash}, nil
}

func (s *Server) isAbused(h string) error {
if s.a == nil {
return nil
}
return s.a.Get(h)
func (s *Server) isAbused(ctx context.Context, h string) (bool, error) {
return s.a.Get(ctx, h)
}

func (s *Server) Touch(ctx context.Context, in *pb.TouchRequest) (*pb.TouchReply, error) {
t := time.Now()
infoHash := in.GetInfoHash()
hLog := log.WithField("infoHash", infoHash)
hLog := log.WithField("infoHash", infoHash).WithField("method", "touch")
hLog.Info("touch torrent request")

err := s.s.Touch(ctx, infoHash)
_, err := s.s.Touch(ctx, infoHash)
if errors.Is(err, ErrNotFound) {
hLog.WithField("duration", time.Since(t)).Info("torrent not found")
return nil, status.Errorf(codes.NotFound, "torrent not found infoHash=%v", infoHash)
Expand Down
Loading

0 comments on commit b3ca0b3

Please sign in to comment.