Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/ipfs refactor #1137

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func main() {

// Storage service for Gateway
if conf.Mode == types.ModeGateway || conf.Mode == types.ModeCensus {
srv.Storage, err = srv.IPFS(conf.Ipfs)
srv.Storage, err = service.IPFS(conf.Ipfs)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type IPFSCfg struct {
ConnectKey string
// ConnectPeers is the list of ipfsConnect peers
ConnectPeers []string
// LocalDiscovery enables IPFS to communicate with other nodes in local networks (192.168.0.0/16 and such).
// Disabled by default since it creates issues in production deployments, but needed in test environments
LocalDiscovery bool
}

// VochainCfg includes all possible config params needed by the Vochain
Expand Down
44 changes: 1 addition & 43 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ package data

import (
"context"
"fmt"
"io"

"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/types"
)

// Storage is the interface that wraps the basic methods for a distributed data storage provider.
type Storage interface {
Init(d *types.DataStore) error
Init() error
Publish(ctx context.Context, data []byte) (string, error)
PublishReader(ctx context.Context, data io.Reader) (string, error)
Retrieve(ctx context.Context, id string, maxSize int64) ([]byte, error)
Expand All @@ -24,41 +20,3 @@ type Storage interface {
Stats() map[string]any
Stop() error
}

// StorageID is the type for the different storage providers.
// Currently only IPFS is supported.
type StorageID int

const (
// IPFS is the InterPlanetary File System.
IPFS StorageID = iota + 1
)

// StorageIDFromString returns the Storage identifier from a string.
func StorageIDFromString(i string) StorageID {
switch i {
case "IPFS":
return IPFS
default:
return -1
}
}

// IPFSNewConfig returns a new DataStore configuration for IPFS.
func IPFSNewConfig(path string) *types.DataStore {
datastore := new(types.DataStore)
datastore.Datadir = path
return datastore
}

// Init returns a new Storage instance of type `t`.
func Init(t StorageID, d *types.DataStore) (Storage, error) {
switch t {
case IPFS:
s := new(ipfs.Handler)
err := s.Init(d)
return s, err
default:
return nil, fmt.Errorf("bad storage type or DataStore specification")
}
}
8 changes: 5 additions & 3 deletions data/datamocktest.go → data/datamock/datamock.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package data
package datamock

import (
"context"
Expand All @@ -8,11 +8,13 @@ import (
"sync"
"time"

"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/test/testcommon/testutil"
"go.vocdoni.io/dvote/types"
)

var _ data.Storage = &DataMockTest{}

// DataMockTest is a mock data provider for testing purposes.
type DataMockTest struct {
files map[string]string
Expand All @@ -21,7 +23,7 @@ type DataMockTest struct {
rnd testutil.Random
}

func (d *DataMockTest) Init(_ *types.DataStore) error {
func (d *DataMockTest) Init() error {
d.files = make(map[string]string)
d.prefix = "ipfs://"
d.rnd = testutil.NewRandom(0)
Expand Down
6 changes: 3 additions & 3 deletions data/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"testing"

qt "github.com/frankban/quicktest"
"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/datamock"
)

func TestDownloader(t *testing.T) {
stg := data.DataMockTest{}
stg.Init(nil)
stg := datamock.DataMockTest{}
stg.Init()
d := NewDownloader(&stg)
d.Start()
qt.Assert(t, d.QueueSize(), qt.Equals, int32(0))
Expand Down
30 changes: 24 additions & 6 deletions data/ipfs/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {
}

// Init initializes the IPFS node and repository.
func initRepository() error {
func initRepository(enableLocalDiscovery bool) error {
daemonLocked, err := fsrepo.LockedByOtherProcess(ConfigRoot)
if err != nil {
return err
Expand All @@ -66,13 +66,21 @@ func initRepository() error {
if err := installDatabasePlugins(); err != nil {
return err
}
_, err = doInit(io.Discard, ConfigRoot, 2048)
_, err = doInit(io.Discard, ConfigRoot, 2048, enableLocalDiscovery)
return err
}

// StartNode starts the IPFS node.
func startNode() (*ipfscore.IpfsNode, coreiface.CoreAPI, error) {
func (i *Handler) startNode() (*ipfscore.IpfsNode, coreiface.CoreAPI, error) {
log.Infow("starting IPFS node", "config", ConfigRoot)

// check if needs init
if !fsrepo.IsInitialized(ConfigRoot) {
if err := initRepository(i.EnableLocalDiscovery); err != nil {
return nil, nil, err
}
}

r, err := fsrepo.Open(ConfigRoot)
if errors.Is(err, fsrepo.ErrNeedMigration) {
log.Warn("Found outdated ipfs repo, migrations need to be run.")
Expand Down Expand Up @@ -180,7 +188,7 @@ var installDatabasePlugins = sync.OnceValue(func() error {
return nil
})

func doInit(out io.Writer, repoRoot string, nBitsForKeypair int) (*config.Config, error) {
func doInit(out io.Writer, repoRoot string, nBitsForKeypair int, enableLocalDiscovery bool) (*config.Config, error) {
log.Infow("initializing new IPFS repository", "root", repoRoot)
if err := checkWritable(repoRoot); err != nil {
return nil, err
Expand All @@ -195,10 +203,12 @@ func doInit(out io.Writer, repoRoot string, nBitsForKeypair int) (*config.Config
return nil, err
}

conf.Discovery.MDNS.Enabled = false

// Apply `server` configuration profile:
// Disables local host discovery, recommended when running IPFS on machines with public IPv4 addresses
// Prevent from scanning local networks which can trigger netscan alerts.
// See: https://github.com/ipfs/kubo/issues/7985
conf.Discovery.MDNS.Enabled = false
conf.Swarm.DisableNatPortMap = true
conf.Swarm.AddrFilters = []string{
"/ip4/10.0.0.0/ipcidr/8",
"/ip4/100.64.0.0/ipcidr/10",
Expand All @@ -217,6 +227,14 @@ func doInit(out io.Writer, repoRoot string, nBitsForKeypair int) (*config.Config
"/ip6/fc00::/ipcidr/7",
"/ip6/fe80::/ipcidr/10",
}
conf.Addresses.NoAnnounce = conf.Swarm.AddrFilters

if enableLocalDiscovery {
conf.Discovery.MDNS.Enabled = true
conf.Swarm.DisableNatPortMap = false
conf.Swarm.AddrFilters = []string{}
conf.Addresses.NoAnnounce = []string{}
}

if err := fsrepo.Init(repoRoot, conf); err != nil {
return nil, err
Expand Down
28 changes: 15 additions & 13 deletions data/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import (
"github.com/ipfs/kubo/core/corehttp"
"github.com/ipfs/kubo/core/corerepo"
"github.com/ipfs/kubo/core/coreunix"
"github.com/ipfs/kubo/repo/fsrepo"
ipfscrypto "github.com/libp2p/go-libp2p/core/crypto"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
)

var _ data.Storage = &Handler{}

const (
// MaxFileSizeBytes is the maximum size of a file to be published to IPFS
MaxFileSizeBytes = 1024 * 1024 * 100 // 100 MB
Expand All @@ -48,6 +49,9 @@ type Handler struct {
DataDir string
LogLevel string

EnableLocalDiscovery bool
// TODO: Replace DataDir, LogLevel and EnableLocalDiscovery with a config.IPFSCfg?

retrieveCache *lru.Cache[string, []byte]

// cancel helps us stop extra goroutines and listeners which complement
Expand All @@ -56,25 +60,24 @@ type Handler struct {
maddr ma.Multiaddr
}

// New returns a Handler
func New() *Handler {
return &Handler{}
}

// Init initializes the IPFS node handler and repository.
func (i *Handler) Init(d *types.DataStore) error {
func (i *Handler) Init() error {
if i.LogLevel == "" {
i.LogLevel = "ERROR"
}
ipfslog.SetLogLevel("*", i.LogLevel)
if err := installDatabasePlugins(); err != nil {
return err
}
ConfigRoot = d.Datadir
ConfigRoot = i.DataDir
os.Setenv("IPFS_FD_MAX", "4096")

// check if needs init
if !fsrepo.IsInitialized(ConfigRoot) {
if err := initRepository(); err != nil {
return err
}
}
node, coreAPI, err := startNode()
node, coreAPI, err := i.startNode()
if err != nil {
return err
}
Expand All @@ -93,7 +96,7 @@ func (i *Handler) Init(d *types.DataStore) error {
"pubKey", node.PrivateKey.GetPublic(),
)
// start http
cctx := cmdCtx(node, d.Datadir)
cctx := cmdCtx(node, i.DataDir)
cctx.ReqLog = &ipfscmds.ReqLog{}

gatewayOpt := corehttp.GatewayOption(corehttp.WebUIPaths...)
Expand Down Expand Up @@ -126,7 +129,6 @@ func (i *Handler) Init(d *types.DataStore) error {

i.Node = node
i.CoreAPI = coreAPI
i.DataDir = d.Datadir
i.retrieveCache, err = lru.New[string, []byte](RetrievedFileCacheSize)
if err != nil {
return err
Expand Down
14 changes: 9 additions & 5 deletions service/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
)

// IPFS starts the IPFS service
func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage, err error) {
func IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage, err error) {
log.Info("creating ipfs service")
os.Setenv("IPFS_FD_MAX", "1024")
ipfsStore := data.IPFSNewConfig(ipfsconfig.ConfigPath)
storage, err = data.Init(data.StorageIDFromString("IPFS"), ipfsStore)

ipfsStore := ipfs.New()
ipfsStore.DataDir = ipfsconfig.ConfigPath
ipfsStore.EnableLocalDiscovery = ipfsconfig.LocalDiscovery
err = ipfsStore.Init()
if err != nil {
return
return nil, err
}
storage = ipfsStore

go func() {
for {
Expand All @@ -40,5 +44,5 @@ func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage
}
ipfsconn.Start()
}
return
return storage, nil
}
6 changes: 3 additions & 3 deletions test/testcommon/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"go.vocdoni.io/dvote/api/censusdb"
"go.vocdoni.io/dvote/crypto/ethereum"
"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/datamock"
"go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/metadb"
"go.vocdoni.io/dvote/httprouter"
"go.vocdoni.io/dvote/types"
"go.vocdoni.io/dvote/vochain"
"go.vocdoni.io/dvote/vochain/indexer"
"go.vocdoni.io/dvote/vochain/vochaininfo"
Expand Down Expand Up @@ -44,8 +44,8 @@ func (d *APIserver) Start(t testing.TB, apis ...string) {
}

// create the IPFS storage
d.Storage = &data.DataMockTest{}
d.Storage.Init(&types.DataStore{Datadir: t.TempDir()})
d.Storage = &datamock.DataMockTest{}
d.Storage.Init()

// create the API router
router := httprouter.HTTProuter{}
Expand Down
4 changes: 0 additions & 4 deletions types/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package types

type DataStore struct {
Datadir string
}

// TODO: use an array, and possibly declare methods to encode/decode as hex.

type ProcessID = []byte
Expand Down
9 changes: 5 additions & 4 deletions vocone/vocone.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ func NewVocone(dataDir string, keymanager *ethereum.SignKeys, disableIPFS bool,

// Create the IPFS storage layer
if !disableIPFS {
vc.Storage, err = vc.IPFS(&config.IPFSCfg{
ConfigPath: filepath.Join(dataDir, "ipfs"),
ConnectKey: connectKey,
ConnectPeers: connectPeers,
vc.Storage, err = service.IPFS(&config.IPFSCfg{
ConfigPath: filepath.Join(dataDir, "ipfs"),
ConnectKey: connectKey,
ConnectPeers: connectPeers,
LocalDiscovery: true, // vocone is not intended to be deployed in server env. if needed we could turn this into a flag
})
if err != nil {
return nil, err
Expand Down