Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Long Lived Brach and heavy WIP] DHT Introspection work. Do NOT judge the programmer yet. #590

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/event"
"math"
"sync"
"time"
Expand Down Expand Up @@ -116,6 +117,8 @@ type IpfsDHT struct {
successfulOutboundQueryGracePeriod time.Duration

fixLowPeersChan chan struct{}

eventsEmitter event.Emitter
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -184,6 +187,12 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())

// create event emitters
dht.eventsEmitter, err = h.EventBus().Emitter(new(event.DhtEvent))
if err != nil {
return nil, fmt.Errorf("failed to create emitter for DHT events, err=%s", err)
}

dht.startSelfLookup()
dht.startRefreshing()

Expand Down
2 changes: 1 addition & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"

"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
Expand Down
19 changes: 18 additions & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"context"
"encoding/json"
"github.com/libp2p/go-libp2p-core/event"
"sync"

"github.com/google/uuid"
Expand Down Expand Up @@ -235,7 +236,9 @@ var LookupEventBufferSize = 16

// PublishLookupEvent publishes a query event to the query event channel
// associated with the given context, if any.
func PublishLookupEvent(ctx context.Context, ev *LookupEvent) {
func (dht *IpfsDHT) PublishLookupEvent(ctx context.Context, ev *LookupEvent) {
dht.emitLookupEventToBus(ev)

ich := ctx.Value(routingLookupKey{})
if ich == nil {
return
Expand All @@ -244,4 +247,18 @@ func PublishLookupEvent(ctx context.Context, ev *LookupEvent) {
// We *want* to panic here.
ech := ich.(*lookupEventChannel)
ech.send(ctx, ev)

}

func (dht *IpfsDHT) emitLookupEventToBus(ev *LookupEvent) {
js, err := json.Marshal(ev)
if err != nil {
logger.Errorf("failed to marshal lookup event to Json,err=%s", err)
return
}
// send it on the eventbus
lookUpEvent := event.DhtEvent{EventType: "LookupEvent", EventJson: event.JSString(string(js))}
if err := dht.eventsEmitter.Emit(lookUpEvent); err != nil {
logger.Errorf("failed to emit DhtEvent on the bus, err=%s", err)
}
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ require (
github.com/ipfs/go-ipns v0.0.2
github.com/ipfs/go-log v1.0.3
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.0
github.com/libp2p/go-libp2p-core v0.5.1
github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4
github.com/libp2p/go-libp2p v0.8.1-0.20200417064624-9fa05d6ef67b
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417081628-b224967ed43f
github.com/libp2p/go-libp2p-kbucket v0.4.1
github.com/libp2p/go-libp2p-peerstore v0.2.3
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.1
github.com/libp2p/go-libp2p-swarm v0.2.3
github.com/libp2p/go-libp2p-swarm v0.2.4-0.20200417062831-28eda91e270f
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-msgio v0.0.4
github.com/libp2p/go-netroute v0.1.2
Expand Down
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/go-cid v0.0.1 h1:GBjWPktLnNyX0JiQCNFpUuUSoMw5KMyqrsejHYlILBE=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down Expand Up @@ -135,10 +136,12 @@ github.com/ipfs/go-ipns v0.0.2 h1:oq4ErrV4hNQ2Eim257RTYRgfOSV/s8BDaf9iIl4NwFs=
github.com/ipfs/go-ipns v0.0.2/go.mod h1:WChil4e0/m9cIINWLxZe1Jtf77oz5L05rO2ei/uKJ5U=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I=
github.com/ipfs/go-log v1.0.2 h1:s19ZwJxH8rPWzypjcDpqPLIyV7BnbLqvpli3iZoqYK0=
github.com/ipfs/go-log v1.0.2/go.mod h1:1MNjMxe0u6xvJZgeqbJ8vdo2TKaGwZ1a0Bpza+sr2Sk=
github.com/ipfs/go-log v1.0.3 h1:Gg7SUYSZ7BrqaKMwM+hRgcAkKv4QLfzP4XPQt5Sx/OI=
github.com/ipfs/go-log v1.0.3/go.mod h1:OsLySYkwIbiSUR/yBTdv1qPtcE4FW3WPWk/ewz9Ru+A=
github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.2 h1:xguurydRdfKMJjKyxNXNU8lYP0VZH1NUwJRwUorjuEw=
github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.3 h1:Q2gXcBoCALyLN/pUQlz1qgu0x3uFV6FzP9oXhpfyJpc=
Expand Down Expand Up @@ -189,6 +192,8 @@ github.com/libp2p/go-conn-security-multistream v0.1.0 h1:aqGmto+ttL/uJgX0JtQI0tD
github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc=
github.com/libp2p/go-eventbus v0.1.0 h1:mlawomSAjjkk97QnYiEmHsLu7E136+2oCWSHRUvMfzQ=
github.com/libp2p/go-eventbus v0.1.0/go.mod h1:vROgu5cs5T7cv7POWlWxBaVLxfSegC5UGQf8A2eEmx4=
github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4 h1:m6ax5Jpc3XCVSs5sPRKsNqI7Zo03dz7u6ndUOFiNqnA=
github.com/libp2p/go-eventbus v0.1.1-0.20200417061519-63254f6c0da4/go.mod h1:6I+OKIbCq7cIh7X7wDxGGRcEYnsTJfcqg2i0HEIetqc=
github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xOVR02CZs=
Expand All @@ -202,6 +207,8 @@ github.com/libp2p/go-libp2p v0.7.4 h1:xVj1oSlN0C+FlxqiLuHC8WruMvq24xxfeVxmNhTG0r
github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw=
github.com/libp2p/go-libp2p v0.8.0 h1:8t8kAJM+o4rR91bfwbgbtykbdqPJv819+CTSPkXDT1A=
github.com/libp2p/go-libp2p v0.8.0/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o=
github.com/libp2p/go-libp2p v0.8.1-0.20200417064624-9fa05d6ef67b h1:PLE8sr+TrNzi6a68UfGyW0ySUJ8pfrmakQHheL/9hWg=
github.com/libp2p/go-libp2p v0.8.1-0.20200417064624-9fa05d6ef67b/go.mod h1:/uwao18FdRe/vHm5eUwfOzzbiYcRzm1dMxlLpD8X5Co=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-autonat v0.2.0 h1:Kok+0M/4jiz6TTmxtBqAa5tLyHb/U+G/7o/JEeW7Wok=
Expand Down Expand Up @@ -237,12 +244,18 @@ github.com/libp2p/go-libp2p-core v0.5.0 h1:FBQ1fpq2Fo/ClyjojVJ5AKXlKhvNc/B6U0O+7
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.1 h1:6Cu7WljPQtGY2krBlMoD8L/zH3tMUsCbqNFH7cZwCoI=
github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417060929-6957bf8a421d/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417080253-365e59346b4b h1:XCVQYu1HFfc1YFBFHSS+buZl1VGIZ2JPlDxQTZBOreM=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417080253-365e59346b4b/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417081628-b224967ed43f h1:gOu3iEE2lxXDOucfRkn6dx0S8T9zpYAedSZuO0f81Jc=
github.com/libp2p/go-libp2p-core v0.5.2-0.20200417081628-b224967ed43f/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMTZkQfbw+UrGA=
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=
github.com/libp2p/go-libp2p-introspector v0.0.5-0.20200417062351-cc3f2009f930/go.mod h1:1HwKb2Qa9wg0T5gKyB5MkNHIs4SOIim8oQ0htYmp5G4=
github.com/libp2p/go-libp2p-kbucket v0.4.1 h1:6FyzbQuGLPzbMv3HiD232zqscIz5iB8ppJwb380+OGI=
github.com/libp2p/go-libp2p-kbucket v0.4.1/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
Expand Down Expand Up @@ -295,6 +308,8 @@ github.com/libp2p/go-libp2p-swarm v0.2.2 h1:T4hUpgEs2r371PweU3DuH7EOmBIdTBCwWs+F
github.com/libp2p/go-libp2p-swarm v0.2.2/go.mod h1:fvmtQ0T1nErXym1/aa1uJEyN7JzaTNyBcHImCxRpPKU=
github.com/libp2p/go-libp2p-swarm v0.2.3 h1:uVkCb8Blfg7HQ/f30TyHn1g/uCwXsAET7pU0U59gx/A=
github.com/libp2p/go-libp2p-swarm v0.2.3/go.mod h1:P2VO/EpxRyDxtChXz/VPVXyTnszHvokHKRhfkEgFKNM=
github.com/libp2p/go-libp2p-swarm v0.2.4-0.20200417062831-28eda91e270f h1:LX+udu+3auqLiYpHrd1+MXOdMNTfB4PRtIO0++pbadM=
github.com/libp2p/go-libp2p-swarm v0.2.4-0.20200417062831-28eda91e270f/go.mod h1:G6UrNovm3+thMwE2MU8xCw5v2Ov1eZOp+zcmv4aU6aI=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
Expand Down Expand Up @@ -478,6 +493,7 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
6 changes: 3 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryU
return
}

PublishLookupEvent(ctx,
q.dht.PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
Expand Down Expand Up @@ -356,7 +356,7 @@ func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason
return
}

PublishLookupEvent(ctx,
q.dht.PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
Expand Down Expand Up @@ -428,7 +428,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) {
if q.terminated {
panic("update should not be invoked after the logical lookup termination")
}
PublishLookupEvent(ctx,
q.dht.PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
Expand Down