Skip to content

Commit

Permalink
go(feature): gRPC utils (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis authored Nov 15, 2024
1 parent d3a1a58 commit 1797b1a
Show file tree
Hide file tree
Showing 102 changed files with 939 additions and 818 deletions.
44 changes: 42 additions & 2 deletions .github/workflows/go_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,50 @@ jobs:
lint-go:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Checkout sources
uses: actions/checkout@v2
with:
fetch-depth: 1
- uses: dominikh/staticcheck-action@v1

- name: staticcheck
uses: dominikh/staticcheck-action@v1
with:
version: "latest"
working-directory: go

- name: go fmt
uses: Jerome1337/[email protected]
with:
gofmt-path: './go'
gofmt-flags: '-l -d'

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.22

- name: Verify dependencies
run: go mod verify
working-directory: go

- name: Build
run: go build -v ./...
working-directory: go

- name: Run go vet
run: go vet ./...
working-directory: go

- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@latest
working-directory: go

- name: Run staticcheck
run: staticcheck ./...
working-directory: go

- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
working-directory: go
version: v1.60
32 changes: 32 additions & 0 deletions .github/workflows/go_release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Go Module Publish

on:
workflow_dispatch:

jobs:
go-ci:
if: github.event_name == 'workflow_dispatch' && startsWith(github.ref, 'refs/tags')
uses: ./.github/workflows/go_ci.yaml

publish-to-pkg-go-dev:
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.22'

- name: Extract version from tag
id: extract_version
run: |
TAG="${GITHUB_REF#refs/tags/}"
VERSION="${TAG#go/}"
echo "version=$VERSION" >> $GITHUB_ENV
- name: Publish Go module
run: |
GOPROXY=proxy.golang.org go list -m github.com/sift-stack/sift/go@$VERSION
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ the [Manual Protobuf Compilation](#manual-protobuf-compilation) section.

### Installation via Package Managers

The following demonstrates how to install the Sift client library for each supported language. Packages that are downloaded
from Github are in the process of being moved to their language's official package repository.
The following demonstrates how to install the Sift client library for each supported language.

#### Python

Expand All @@ -45,7 +44,7 @@ $ cargo add sift_rs
#### Go

```
$ go get github.com/sift-stack/sift/go@main && go mod tidy
$ go get github.com/sift-stack/sift/go
```

### Manual Protobuf Compilation
Expand Down
17 changes: 17 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Sift Go

This library offers a Go API on top of Sift's protocol buffers to ergonomically interface with the Sift gRPC API.

## Installation

```
$ go get github.com/sift-stack/sift/go
```

## Examples

Various examples can be found in the [examples](./examples) directory. To run any of those examples clone this repo do the following:

```
$ SIFT_URI=<sift uri> SIFT_API_KEY=<api key> go run examples/ping/main.go
```
8 changes: 4 additions & 4 deletions go/buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: v1
managed:
enabled: true
go_package_prefix:
default: "github.com/sift-stack/sift/go/gen/protos/go"
default: "github.com/sift-stack/sift/go/gen"
plugins:
- plugin: buf.build/protocolbuffers/go:v1.28.1
out: gen/protos/go
out: gen
opt: paths=source_relative
- plugin: go-vtproto
out: gen/protos/go
out: gen
opt: paths=source_relative
- plugin: buf.build/grpc-ecosystem/gateway:v2.16.2
out: gen/protos/go
out: gen
opt: paths=source_relative
10 changes: 0 additions & 10 deletions go/examples/.env-example

This file was deleted.

26 changes: 0 additions & 26 deletions go/examples/README.md

This file was deleted.

20 changes: 0 additions & 20 deletions go/examples/go.mod

This file was deleted.

26 changes: 0 additions & 26 deletions go/examples/go.sum

This file was deleted.

169 changes: 169 additions & 0 deletions go/examples/ingestion/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package main

import (
"context"
"fmt"
"log"
"math/rand"
"os"
"time"

"github.com/sift-stack/sift/go/gen/sift/common/type/v1"
ingestv1 "github.com/sift-stack/sift/go/gen/sift/ingest/v1"
"github.com/sift-stack/sift/go/gen/sift/ingestion_configs/v1"
"github.com/sift-stack/sift/go/gen/sift/runs/v2"
"github.com/sift-stack/sift/go/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
assetName = "Sift-LV-426"
clientKey = "sift-lv-426-v1"
)

func main() {
ctx := context.Background()

grpcChannel, err := grpc.UseSiftChannel(ctx, grpc.SiftChannelConfig{
Uri: os.Getenv("SIFT_URI"),
Apikey: os.Getenv("SIFT_API_KEY"),
})
if err != nil {
log.Fatalln(err)
}

ingestionConfig, err := getOrCreateIngestionConfig(ctx, grpcChannel, assetName, clientKey)
if err != nil {
log.Fatalln(err)
}
log.Printf("initialized ingestion config %s\n", ingestionConfig.ClientKey)

run, err := createRun(ctx, grpcChannel, assetName)
if err != nil {
log.Fatalln(err)
}
log.Printf("initialized run %s\n", run.Name)

siftStream, err := ingestv1.NewIngestServiceClient(grpcChannel).IngestWithConfigDataStream(ctx)
if err != nil {
log.Fatalln(err)
}

dataStream := dataSource()

for data := range dataStream {
req := &ingestv1.IngestWithConfigDataStreamRequest{
IngestionConfigId: ingestionConfig.IngestionConfigId,
RunId: run.RunId,
Flow: "velocity_reading",
Timestamp: timestamppb.New(data.Timestamp),
ChannelValues: []*ingestv1.IngestWithConfigDataChannelValue{
{Type: &ingestv1.IngestWithConfigDataChannelValue_Double{Double: data.Value}},
},
// Set this flag to `true` only for debugging purposes to get real-time data validation from
// the Sift API. Do not use in production as it will hurt performance.
EndStreamOnValidationError: false,
}
if err := siftStream.Send(req); err != nil {
log.Fatalln(err)
}
log.Println("ingested a velocity_reading flow")
}

// Close the stream when finished and check if there are any errors
if _, err := siftStream.CloseAndRecv(); err != nil {
log.Fatalln(err)
}

log.Println("done.")
}

type dataPoint struct {
Timestamp time.Time
Value float64
}

func dataSource() <-chan dataPoint {
dataChannel := make(chan dataPoint)
go func() {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
duration := 60 * time.Second
start := time.Now()

for time.Since(start) < duration {
dataChannel <- dataPoint{
Timestamp: time.Now(),
Value: rng.Float64(),
}
time.Sleep(500 * time.Millisecond)
}
}()
return dataChannel
}

// Flow and channel configuration
func config() []*ingestion_configsv1.FlowConfig {
return []*ingestion_configsv1.FlowConfig{
{
Name: "velocity_reading",
Channels: []*ingestion_configsv1.ChannelConfig{
{
Name: "velocity",
Component: "mainmotor",
Unit: "km/hr",
Description: "vehicle speed",
DataType: typev1.ChannelDataType_CHANNEL_DATA_TYPE_DOUBLE,
},
},
},
}
}

// Retrieves an existing ingestion config or create it.
func getOrCreateIngestionConfig(
ctx context.Context,
grpcChannel grpc.SiftChannel,
assetName,
clientKey string,
) (*ingestion_configsv1.IngestionConfig, error) {
svc := ingestion_configsv1.NewIngestionConfigServiceClient(grpcChannel)

listRes, err := svc.ListIngestionConfigs(ctx, &ingestion_configsv1.ListIngestionConfigsRequest{
Filter: fmt.Sprintf("client_key == '%s'", clientKey),
})
if err != nil {
return nil, err
}
if listRes != nil && len(listRes.IngestionConfigs) > 0 {
return listRes.IngestionConfigs[0], nil
}

createRes, err := svc.CreateIngestionConfig(ctx, &ingestion_configsv1.CreateIngestionConfigRequest{
AssetName: assetName,
ClientKey: clientKey,
Flows: config(),
})
if err != nil {
return nil, err
}
return createRes.IngestionConfig, nil
}

// Create a run to use to group all the data ingested during this period.
func createRun(
ctx context.Context,
grpcChannel grpc.SiftChannel,
runName string,
) (*runsv2.Run, error) {
svc := runsv2.NewRunServiceClient(grpcChannel)
ts := timestamppb.Now()

createRes, err := svc.CreateRun(ctx, &runsv2.CreateRunRequest{
Name: fmt.Sprintf("[%s].%d", runName, ts.Seconds),
StartTime: ts,
})
if err != nil {
return nil, err
}
return createRes.Run, nil
}
Loading

0 comments on commit 1797b1a

Please sign in to comment.