From 8a74f30a460511456fc96622aa03eb15c69024c8 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 9 Oct 2024 18:28:27 +0800 Subject: [PATCH] feat: updated database related api --- core/constants/filter.go | 1 + core/constants/task.go | 13 ++-------- core/database/interfaces/database_service.go | 2 +- core/grpc/server/task_server_v2.go | 12 +-------- core/task/stats/service_v2.go | 26 +++++++++++++++++--- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/constants/filter.go b/core/constants/filter.go index dfbb7c5bc..a8403568c 100644 --- a/core/constants/filter.go +++ b/core/constants/filter.go @@ -3,6 +3,7 @@ package constants const ( FilterQueryFieldConditions = "conditions" FilterQueryFieldAll = "all" + FilterQueryFieldFilter = "filter" ) const ( diff --git a/core/constants/task.go b/core/constants/task.go index 93a7f1a89..aaa71c111 100644 --- a/core/constants/task.go +++ b/core/constants/task.go @@ -15,11 +15,6 @@ const ( RunTypeSelectedNodes = "selected-nodes" ) -const ( - TaskTypeSpider = "spider" - TaskTypeSystem = "system" -) - type TaskSignal int const ( @@ -30,10 +25,6 @@ const ( ) const ( - TaskListQueuePrefixPublic = "tasks:public" - TaskListQueuePrefixNodes = "tasks:nodes" -) - -const ( - TaskKey = "_tid" + TaskKey = "_tid" + SpiderKey = "_sid" ) diff --git a/core/database/interfaces/database_service.go b/core/database/interfaces/database_service.go index 621829c2c..53e9866d7 100644 --- a/core/database/interfaces/database_service.go +++ b/core/database/interfaces/database_service.go @@ -12,7 +12,7 @@ type DatabaseService interface { GetMetadataAllDb(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error) CreateDatabase(id primitive.ObjectID, databaseName string) (err error) DropDatabase(id primitive.ObjectID, databaseName string) (err error) - GetTableMetadata(id primitive.ObjectID, databaseName, tableName string) (table *entity.DatabaseTable, err error) + GetTableMetadata(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}) (table *entity.DatabaseTable, err error) CreateTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error) ModifyTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error) DropTable(id primitive.ObjectID, databaseName, tableName string) (err error) diff --git a/core/grpc/server/task_server_v2.go b/core/grpc/server/task_server_v2.go index 050a5cd15..8703296db 100644 --- a/core/grpc/server/task_server_v2.go +++ b/core/grpc/server/task_server_v2.go @@ -15,7 +15,7 @@ import ( "github.com/crawlab-team/crawlab/core/task/stats" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" - grpc "github.com/crawlab-team/crawlab/grpc" + "github.com/crawlab-team/crawlab/grpc" "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -216,16 +216,6 @@ func (svr TaskServerV2) handleInsertData(msg *grpc.StreamMessage) (err error) { } var records []map[string]interface{} for _, d := range data.Records { - res, ok := d[constants.TaskKey] - if ok { - switch res.(type) { - case string: - id, err := primitive.ObjectIDFromHex(res.(string)) - if err == nil { - d[constants.TaskKey] = id - } - } - } records = append(records, d) } return svr.statsSvc.InsertData(data.TaskId, records...) diff --git a/core/task/stats/service_v2.go b/core/task/stats/service_v2.go index d5a7d73a5..731ed1951 100644 --- a/core/task/stats/service_v2.go +++ b/core/task/stats/service_v2.go @@ -2,6 +2,7 @@ package stats import ( log2 "github.com/apex/log" + "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/database" interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces" "github.com/crawlab-team/crawlab/core/interfaces" @@ -20,6 +21,7 @@ import ( type databaseServiceItem struct { taskId primitive.ObjectID + spiderId primitive.ObjectID dbId primitive.ObjectID dbSvc interfaces2.DatabaseService tableName string @@ -54,7 +56,7 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin tableName := item.tableName if utils.IsPro() && dbSvc != nil { for _, record := range records { - if err := dbSvc.CreateRow(dbId, "", tableName, record); err != nil { + if err := dbSvc.CreateRow(dbId, "", tableName, svc.normalizeRecord(item, record)); err != nil { log2.Errorf("failed to insert data: %v", err) continue } @@ -63,7 +65,7 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin } else { var records2 []interface{} for _, record := range records { - records2 = append(records2, record) + records2 = append(records2, svc.normalizeRecord(item, record)) } _, err = mongo.GetMongoCol(tableName).InsertMany(records2) if err != nil { @@ -118,15 +120,19 @@ func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *d } } - // store in cache - svc.databaseServiceItems[taskId.Hex()] = &databaseServiceItem{ + // item + item = &databaseServiceItem{ taskId: taskId, + spiderId: s.Id, dbId: s.DataSourceId, dbSvc: dbSvc, tableName: s.ColName, time: time.Now(), } + // store in cache + svc.databaseServiceItems[taskId.Hex()] = item + return item, nil } @@ -158,6 +164,18 @@ func (svc *ServiceV2) cleanup() { } } +func (svc *ServiceV2) normalizeRecord(item *databaseServiceItem, record map[string]interface{}) (res map[string]interface{}) { + res = record + + // set task id + res[constants.TaskKey] = item.taskId + + // set spider id + res[constants.SpiderKey] = item.spiderId + + return res +} + func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) { // service svc := &ServiceV2{