Skip to content

Commit

Permalink
feat: updated database related api
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Oct 9, 2024
1 parent 4f52936 commit 8a74f30
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 27 deletions.
1 change: 1 addition & 0 deletions core/constants/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package constants
const (
FilterQueryFieldConditions = "conditions"
FilterQueryFieldAll = "all"
FilterQueryFieldFilter = "filter"
)

const (
Expand Down
13 changes: 2 additions & 11 deletions core/constants/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ const (
RunTypeSelectedNodes = "selected-nodes"
)

const (
TaskTypeSpider = "spider"
TaskTypeSystem = "system"
)

type TaskSignal int

const (
Expand All @@ -30,10 +25,6 @@ const (
)

const (
TaskListQueuePrefixPublic = "tasks:public"
TaskListQueuePrefixNodes = "tasks:nodes"
)

const (
TaskKey = "_tid"
TaskKey = "_tid"
SpiderKey = "_sid"
)
2 changes: 1 addition & 1 deletion core/database/interfaces/database_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type DatabaseService interface {
GetMetadataAllDb(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error)
CreateDatabase(id primitive.ObjectID, databaseName string) (err error)
DropDatabase(id primitive.ObjectID, databaseName string) (err error)
GetTableMetadata(id primitive.ObjectID, databaseName, tableName string) (table *entity.DatabaseTable, err error)
GetTableMetadata(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}) (table *entity.DatabaseTable, err error)
CreateTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
ModifyTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
DropTable(id primitive.ObjectID, databaseName, tableName string) (err error)
Expand Down
12 changes: 1 addition & 11 deletions core/grpc/server/task_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/crawlab-team/crawlab/core/task/stats"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
grpc "github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -216,16 +216,6 @@ func (svr TaskServerV2) handleInsertData(msg *grpc.StreamMessage) (err error) {
}
var records []map[string]interface{}
for _, d := range data.Records {
res, ok := d[constants.TaskKey]
if ok {
switch res.(type) {
case string:
id, err := primitive.ObjectIDFromHex(res.(string))
if err == nil {
d[constants.TaskKey] = id
}
}
}
records = append(records, d)
}
return svr.statsSvc.InsertData(data.TaskId, records...)
Expand Down
26 changes: 22 additions & 4 deletions core/task/stats/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
log2 "github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/database"
interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces"
"github.com/crawlab-team/crawlab/core/interfaces"
Expand All @@ -20,6 +21,7 @@ import (

type databaseServiceItem struct {
taskId primitive.ObjectID
spiderId primitive.ObjectID
dbId primitive.ObjectID
dbSvc interfaces2.DatabaseService
tableName string
Expand Down Expand Up @@ -54,7 +56,7 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin
tableName := item.tableName
if utils.IsPro() && dbSvc != nil {
for _, record := range records {
if err := dbSvc.CreateRow(dbId, "", tableName, record); err != nil {
if err := dbSvc.CreateRow(dbId, "", tableName, svc.normalizeRecord(item, record)); err != nil {
log2.Errorf("failed to insert data: %v", err)
continue
}
Expand All @@ -63,7 +65,7 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin
} else {
var records2 []interface{}
for _, record := range records {
records2 = append(records2, record)
records2 = append(records2, svc.normalizeRecord(item, record))
}
_, err = mongo.GetMongoCol(tableName).InsertMany(records2)
if err != nil {
Expand Down Expand Up @@ -118,15 +120,19 @@ func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *d
}
}

// store in cache
svc.databaseServiceItems[taskId.Hex()] = &databaseServiceItem{
// item
item = &databaseServiceItem{
taskId: taskId,
spiderId: s.Id,
dbId: s.DataSourceId,
dbSvc: dbSvc,
tableName: s.ColName,
time: time.Now(),
}

// store in cache
svc.databaseServiceItems[taskId.Hex()] = item

return item, nil
}

Expand Down Expand Up @@ -158,6 +164,18 @@ func (svc *ServiceV2) cleanup() {
}
}

func (svc *ServiceV2) normalizeRecord(item *databaseServiceItem, record map[string]interface{}) (res map[string]interface{}) {
res = record

// set task id
res[constants.TaskKey] = item.taskId

// set spider id
res[constants.SpiderKey] = item.spiderId

return res
}

func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) {
// service
svc := &ServiceV2{
Expand Down

0 comments on commit 8a74f30

Please sign in to comment.