Skip to content

Commit

Permalink
fix alpha serve for clickhouse, improve a log line
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jan 23, 2024
1 parent 89758b3 commit 2692bba
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Unreleased

* Fixed the 'local substreams alpha service serve' clickhouse deployment (was failing with a message regarding fork handling...)
* Added some output-stream info to logs

## v1.3.1

### Server
Expand Down
4 changes: 4 additions & 0 deletions orchestrator/execout/execout_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (r *Walker) sendItems(sortedItems []*pboutput.Item) error {
return nil
}

func (r *Walker) Progress() (first, current, last int) {
return r.fileWalker.Progress()
}

func (r *Walker) NextSegment() {
r.fileWalker.Next()
}
Expand Down
15 changes: 14 additions & 1 deletion orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

func (s *Scheduler) cmdShutdownWhenComplete() loop.Cmd {
if s.outputStreamCompleted && s.storesSyncCompleted {

var fields []zap.Field
if s.ExecOutWalker != nil {
start, current, end := s.ExecOutWalker.Progress()
fields = append(fields, zap.Int("cached_output_start", start), zap.Int("cached_output_current", current), zap.Int("cached_output_end", end))
}
s.logger.Info("scheduler: stores and cached_outputs stream completed, switching to live", fields...)
return func() loop.Msg {
err := s.Stages.WaitAsyncWork()
return loop.Quit(err)()
Expand All @@ -171,7 +178,13 @@ func (s *Scheduler) cmdShutdownWhenComplete() loop.Cmd {
s.logger.Info("scheduler: waiting for output stream and stores to complete")
}
if !s.outputStreamCompleted && s.storesSyncCompleted {
s.logger.Info("scheduler: waiting for output stream to complete, stores ready")

var fields []zap.Field
if s.ExecOutWalker != nil {
start, current, end := s.ExecOutWalker.Progress()
fields = append(fields, zap.Int("cached_output_start", start), zap.Int("cached_output_current", current), zap.Int("cached_output_end", end))
}
s.logger.Info("scheduler: waiting for output stream to complete, stores ready", fields...)
}
if s.outputStreamCompleted && !s.storesSyncCompleted {
s.logger.Info("scheduler: waiting for stores to complete, output stream completed")
Expand Down
8 changes: 6 additions & 2 deletions sink-server/docker/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,19 @@ func (e *DockerEngine) newSink(deploymentID string, dbService string, pkg *pbsub
withEndpoint = "-e " + e.endpoint
}

withBuffer := ""
if serviceName == "clickhouse" {
withBuffer = "--undo-buffer-size=12"
}
startScript := []byte(fmt.Sprintf(`#!/bin/bash
set -xeu
if [ ! -f /opt/subservices/data/setup-complete ]; then
/app/substreams-sink-sql setup $DSN /opt/subservices/config/substreams.spkg %s && touch /opt/subservices/data/setup-complete
fi
/app/substreams-sink-sql run $DSN /opt/subservices/config/substreams.spkg --on-module-hash-mistmatch=warn %s
`, withPostgraphile, withEndpoint))
/app/substreams-sink-sql run $DSN /opt/subservices/config/substreams.spkg --on-module-hash-mistmatch=warn %s %s
`, withPostgraphile, withBuffer, withEndpoint))
if err := os.WriteFile(filepath.Join(configFolder, "start.sh"), startScript, 0755); err != nil {
fmt.Println("")
return conf, motd, fmt.Errorf("writing file: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions storage/execout/filewalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ func (fw *FileWalker) Next() {
func (fw *FileWalker) IsDone() bool {
return fw.segment > fw.segmenter.LastIndex()
}

func (fw *FileWalker) Progress() (first, current, last int) {
return fw.segmenter.FirstIndex(), fw.segment, fw.segmenter.LastIndex()
}

0 comments on commit 2692bba

Please sign in to comment.