Skip to content

Commit

Permalink
sink: add ipfs
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Oct 12, 2023
1 parent 6edb1cb commit d79f16b
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 49 deletions.
100 changes: 51 additions & 49 deletions sink-server/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,19 @@ func (e *DockerEngine) createSQLManifest(deploymentID string, token string, pkg
services[postgraphile.Name] = motd
config.Services = append(config.Services, postgraphile)
}
// clickhouse:
// container_name: clickhouse-ssp
// image: clickhouse/clickhouse-server
// user: "101:101"
// hostname: clickhouse
// volumes:
// - ${PWD}/devel/clickhouse-server/config.xml:/etc/clickhouse-server/config.d/config.xml
// - ${PWD}/devel/clickhouse-server/users.xml:/etc/clickhouse-server/users.d/users.xml
// ports:
// - "8123:8123"
// - "9000:9000"
// - "9005:9005"
// sink:

for _, svc := range config.Services {
for _, port := range svc.Ports {
Expand All @@ -542,58 +555,59 @@ func (e *DockerEngine) createSQLManifest(deploymentID string, token string, pkg

content, err = yaml.Marshal(config)
return

}

func (e *DockerEngine) createSubgraphManifest(deploymentID string, token string, pkg *pbsubstreams.Package) (content []byte, usedPorts []uint32, services map[string]string, runMeFirst []string, err error) {

//
subgraphSvc := &pbsubgraph.Service{}

if err := pkg.SinkConfig.UnmarshalTo(sqlSvc); err != nil {
return nil, nil, nil, nil, fmt.Errorf("cannot unmarshal sinkconfig: %w", err)
}

services = make(map[string]string)

pg, pgMotd, err := e.newPostgres(deploymentID, pkg)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("creating postgres deployment: %w", err)
}
services[pg.Name] = pgMotd

// FIXME: need IPFS
//ipfs, ipfsMotd, err := e.newIPFS()
ipfs, ipfsMotd, err := e.newIPFS(deploymentID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("creating ipfs deployment: %w", err)
}
services[ipfs.Name] = ipfsMotd

runMeFirst = append(runMeFirst, pg.Name, ipfs.Name)

// note: graphnode is the sink
graphnode, graphnodeMotd, err := e.newGraphnode(deploymentID, pkg, pg.Name, ipfs.Name)
//pg, pgMotd, err := e.newPostgres(deploymentID, pkg)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("creating postgres deployment: %w", err)
}
services[graphnode.Name] = graphnodeMotd
// subgraphSvc := &pbsubgraph.Service{}
// if err := pkg.SinkConfig.UnmarshalTo(subgraphSvc); err != nil {
// return nil, nil, nil, nil, fmt.Errorf("cannot unmarshal sinkconfig: %w", err)
// }

//// note: graphnode is the sink
//graphnode, graphnodeMotd, err := e.newGraphnode(deploymentID, pkg, pg.Name, ipfs.Name)
////pg, pgMotd, err := e.newPostgres(deploymentID, pkg)
//if err != nil {
// return nil, nil, nil, nil, fmt.Errorf("creating postgres deployment: %w", err)
//}
//services[graphnode.Name] = graphnodeMotd

config := types.Config{
Version: "3",
Services: []types.ServiceConfig{
pg,
ipfs,
graphnode,
//graphnode,
},
}

if sqlSvc.PgwebFrontend != nil && sqlSvc.PgwebFrontend.Enabled {
pgweb, motd := e.newPGWeb(deploymentID, pg.Name)
services[pgweb.Name] = motd
config.Services = append(config.Services, pgweb)
}
//if sqlSvc.PgwebFrontend != nil && sqlSvc.PgwebFrontend.Enabled {
// pgweb, motd := e.newPGWeb(deploymentID, pg.Name)
// services[pgweb.Name] = motd
// config.Services = append(config.Services, pgweb)
//}

if sqlSvc.PostgraphileFrontend != nil && sqlSvc.PostgraphileFrontend.Enabled {
postgraphile, motd := e.newPostgraphile(deploymentID, pg.Name)
services[postgraphile.Name] = motd
config.Services = append(config.Services, postgraphile)
}
//if sqlSvc.PostgraphileFrontend != nil && sqlSvc.PostgraphileFrontend.Enabled {
// postgraphile, motd := e.newPostgraphile(deploymentID, pg.Name)
// services[postgraphile.Name] = motd
// config.Services = append(config.Services, postgraphile)
//}

for _, svc := range config.Services {
for _, port := range svc.Ports {
Expand All @@ -608,25 +622,13 @@ func (e *DockerEngine) createSubgraphManifest(deploymentID string, token string,

func (e *DockerEngine) createManifest(deploymentID string, token string, pkg *pbsubstreams.Package) (content []byte, usedPorts []uint32, services map[string]string, runMeFirst []string, err error) {

switch pkg.SinkConfig.TypeUrl {
case "sf.substreams.sink.sql.v1.Service":
return e.createSQLManifest(deploymentID, token, pkg)
case "sf.substreams.sink.subgraph.v1.Service":
return e.createSubgraphManifest(deploymentID, token, pkg)
default:
return nil, nil, nil, nil, fmt.Errorf("invalid sinkconfig type: %q. Only sf.substreams.sink.sql.v1.Service and sf.substreams.sink.subgraph.v1.Service are supported for now.", pkg.SinkConfig.TypeUrl)
}
// clickhouse:
// container_name: clickhouse-ssp
// image: clickhouse/clickhouse-server
// user: "101:101"
// hostname: clickhouse
// volumes:
// - ${PWD}/devel/clickhouse-server/config.xml:/etc/clickhouse-server/config.d/config.xml
// - ${PWD}/devel/clickhouse-server/users.xml:/etc/clickhouse-server/users.d/users.xml
// ports:
// - "8123:8123"
// - "9000:9000"
// - "9005:9005"
// sink:
// FIXME
//switch pkg.SinkConfig.TypeUrl {
//case "sf.substreams.sink.sql.v1.Service":
// return e.createSQLManifest(deploymentID, token, pkg)
//case "sf.substreams.sink.subgraph.v1.Service":
return e.createSubgraphManifest(deploymentID, token, pkg)
//default:
// return nil, nil, nil, nil, fmt.Errorf("invalid sinkconfig type: %q. Only sf.substreams.sink.sql.v1.Service and sf.substreams.sink.subgraph.v1.Service are supported for now.", pkg.SinkConfig.TypeUrl)
//}
}
55 changes: 55 additions & 0 deletions sink-server/docker/ipfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package docker

import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/docker/cli/cli/compose/types"
)

func (e *DockerEngine) newIPFS(deploymentID string) (conf types.ServiceConfig, motd string, err error) {

name := fmt.Sprintf("%s-ipfs", deploymentID)
localPort := uint32(5001) // TODO: assign dynamically

dataFolder := filepath.Join(e.dir, deploymentID, "data", "ipfs")
if err := os.MkdirAll(dataFolder, 0755); err != nil {
return types.ServiceConfig{}, "", fmt.Errorf("creating folder %q: %w", dataFolder, err)
}

conf = types.ServiceConfig{
Name: name,
ContainerName: name,
Image: "ipfs/kubo:v0.23.0",
Restart: "on-failure",
Ports: []types.ServicePortConfig{
{
Published: localPort,
Target: 5001,
},
},
Volumes: []types.ServiceVolumeConfig{
{
Type: "bind",
Source: "./data/ipfs",
Target: "/data/ipfs",
},
},
HealthCheck: &types.HealthCheckConfig{
Test: []string{"CMD", "nc", "-z", "localhost:5001"},
Interval: toDuration(time.Second * 3),
Timeout: toDuration(time.Second * 2),
Retries: deref(uint64(10)),
},
}

motd = fmt.Sprintf("IPFS service %q available at URL: 'http://localhost:%d'",
name,
localPort,
)

return conf, motd, nil

}

0 comments on commit d79f16b

Please sign in to comment.