From c50f53f1282cd00b03e45e00869bbf32e7f20793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Fri, 28 Jun 2024 16:48:27 -0400 Subject: [PATCH] add --cursor-table and --history-table to allow multiple concurrent sinks on the same database --- CHANGELOG.md | 5 +++++ cmd/substreams-sink-sql/main.go | 7 +++++++ db/db.go | 4 ++-- db/dialect_postgres.go | 6 +++--- go.mod | 14 +++++++------- go.sum | 28 ++++++++++++++-------------- 6 files changed, 38 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa4e472..146dd4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v4.2.0 + +* Added the --cursor-table and --history-table flags to allow running to sinks on the same database (be careful that you have no collision in table names) +* bumped substreams to v1.8.2, add some default network endpoints + ## v4.1.0 * Bumped substreams to v1.7.3 diff --git a/cmd/substreams-sink-sql/main.go b/cmd/substreams-sink-sql/main.go index 50c2fa2..edb0bb6 100644 --- a/cmd/substreams-sink-sql/main.go +++ b/cmd/substreams-sink-sql/main.go @@ -11,6 +11,7 @@ import ( . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dmetrics" + "github.com/streamingfast/substreams-sink-sql/db" "go.uber.org/zap" ) @@ -33,6 +34,8 @@ func main() { flags.Duration("delay-before-start", 0, "[Operator] Amount of time to wait before starting any internal processes, can be used to perform to maintenance on the pod before actually letting it starts") flags.String("metrics-listen-addr", "localhost:9102", "[Operator] If non-empty, the process will listen on this address for Prometheus metrics request(s)") flags.String("pprof-listen-addr", "localhost:6060", "[Operator] If non-empty, the process will listen on this address for pprof analysis (see https://golang.org/pkg/net/http/pprof/)") + flags.String("cursors-table", "cursors", "[Operator] Name of the table to use for storing cursors") + flags.String("history-table", "substreams_history", "[Operator] Name of the table to use for storing block history, used to handle reorgs") }), AfterAllHook(func(cmd *cobra.Command) { cmd.PersistentPreRun = preStart @@ -41,6 +44,10 @@ func main() { } func preStart(cmd *cobra.Command, _ []string) { + + db.CURSORS_TABLE = sflags.MustGetString(cmd, "cursors-table") + db.HISTORY_TABLE = sflags.MustGetString(cmd, "history-table") + delay := sflags.MustGetDuration(cmd, "delay-before-start") if delay > 0 { zlog.Info("sleeping to respect delay before start setting", zap.Duration("delay", delay)) diff --git a/db/db.go b/db/db.go index 0a45630..d03eb65 100644 --- a/db/db.go +++ b/db/db.go @@ -13,8 +13,8 @@ import ( "go.uber.org/zap/zapcore" ) -const CURSORS_TABLE = "cursors" -const HISTORY_TABLE = "substreams_history" +var CURSORS_TABLE = "cursors" +var HISTORY_TABLE = "substreams_history" // Make the typing a bit easier type OrderedMap[K comparable, V any] struct { diff --git a/db/dialect_postgres.go b/db/dialect_postgres.go index 43152df..a591b6d 100644 --- a/db/dialect_postgres.go +++ b/db/dialect_postgres.go @@ -187,12 +187,12 @@ func (d postgresDialect) GetCreateCursorQuery(schema string, withPostgraphile bo out := fmt.Sprintf(cli.Dedent(` create table if not exists %s.%s ( - id text not null constraint cursor_pk primary key, + id text not null constraint %s primary key, cursor text, block_num bigint, block_id text ); - `), EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE)) + `), EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE), EscapeIdentifier(CURSORS_TABLE+"_pk")) if withPostgraphile { out += fmt.Sprintf("COMMENT ON TABLE %s.%s IS E'@omit';", EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE)) @@ -270,7 +270,7 @@ func (d postgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, usern } func (d postgresDialect) historyTable(schema string) string { - return fmt.Sprintf("%s.%s", EscapeIdentifier(schema), EscapeIdentifier("substreams_history")) + return fmt.Sprintf("%s.%s", EscapeIdentifier(schema), EscapeIdentifier(HISTORY_TABLE)) } func (d postgresDialect) saveInsert(schema string, table string, primaryKey map[string]string, blockNum uint64) string { diff --git a/go.mod b/go.mod index a69068c..a1fba1d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 - github.com/streamingfast/substreams v1.7.3 + github.com/streamingfast/substreams v1.8.3-0.20240628134338-5c9843eeef98 github.com/streamingfast/substreams-sink v0.4.0 github.com/streamingfast/substreams-sink-database-changes v1.1.3 github.com/stretchr/testify v1.9.0 @@ -99,13 +99,13 @@ require ( github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect github.com/subosito/gotenv v1.4.2 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect - github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect + github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.26.0 // indirect go.opentelemetry.io/otel/metric v1.26.0 // indirect go.opentelemetry.io/otel/trace v1.26.0 // indirect - golang.org/x/sync v0.6.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect @@ -159,13 +159,13 @@ require ( go.opencensus.io v0.24.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.21.0 // indirect + golang.org/x/crypto v0.23.0 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.22.0 // indirect golang.org/x/oauth2 v0.18.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/term v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect google.golang.org/api v0.172.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect diff --git a/go.sum b/go.sum index 3af11cb..95451ad 100644 --- a/go.sum +++ b/go.sum @@ -534,8 +534,8 @@ github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd h1:P96NMUr1jD github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd/go.mod h1:XuHkKh98QevgA9M3oWB5Y5Tm6w7iNJ5P3a3ao7UnnfI= github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs= github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= -github.com/streamingfast/substreams v1.7.3 h1:V4YOJt4tAzivaLpC7nSVDdULMuczj7QfJUTyHAT2qkM= -github.com/streamingfast/substreams v1.7.3/go.mod h1:ao5xpjglDohCmwzRUlJtTNGUn+nXGR2AXBnYQEbvQaI= +github.com/streamingfast/substreams v1.8.3-0.20240628134338-5c9843eeef98 h1:fDo2+v07xEbR0J0sRe8m/W7pnPDYq7dFIAdcHElMNDg= +github.com/streamingfast/substreams v1.8.3-0.20240628134338-5c9843eeef98/go.mod h1:XtL4RgQawes9/a9iM9d6bAABacfIuekY+jceszF7u2c= github.com/streamingfast/substreams-sink v0.4.0 h1:sU/E9Q4zXTfKxaGUgQ4OMFC+/NBH5VdZx6SOcBVp7P0= github.com/streamingfast/substreams-sink v0.4.0/go.mod h1:wlF6pAQTBXQGA9k5R1yKg6enHdtXjz1pMlAs4lhZOac= github.com/streamingfast/substreams-sink-database-changes v1.1.3 h1:rXeGb/V2mjC8FftumRkMQxG2jtdLfHdLx9UQVUtAqS8= @@ -564,8 +564,8 @@ github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tsenart/deadcode v0.0.0-20160724212837-210d2dc333e9/go.mod h1:q+QjxYvZ+fpjMXqs+XEriussHjSYqeXVnAdSV1tkMYk= -github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c h1:GGsyl0dZ2jJgVT+VvWBf/cNijrHRhkrTjkmp5wg7li0= -github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c/go.mod h1:xxcJeBb7SIUl/Wzkz1eVKJE/CB34YNrqX2TQI6jY9zs= +github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b h1:wA3QeTsaAXybLL2kb2cKhCAQTHgYTMwuI8lBlJSv5V8= +github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b/go.mod h1:xT1Y5p2JR2PfSZihE0s4mjdJaRGp1waCTf5JzhQLBck= github.com/wk8/go-ordered-map/v2 v2.1.7 h1:aUZ1xBMdbvY8wnNt77qqo4nyT3y0pX4Usat48Vm+hik= github.com/wk8/go-ordered-map/v2 v2.1.7/go.mod h1:9Xvgm2mV2kSq2SAm0Y608tBmu8akTzI7c2bz7/G7ZN4= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -645,8 +645,8 @@ golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -750,8 +750,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -809,15 +809,15 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -829,8 +829,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=