Skip to content

Commit 4f52936

Browse files
committed
refactor: integrated database services into task data insert
1 parent 0ed2150 commit 4f52936

File tree

12 files changed

+161
-264
lines changed

12 files changed

+161
-264
lines changed

core/controllers/spider_v2.go

+2-58
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package controllers
33
import (
44
"errors"
55
"github.com/apex/log"
6-
"github.com/crawlab-team/crawlab/core/constants"
76
"github.com/crawlab-team/crawlab/core/fs"
87
"github.com/crawlab-team/crawlab/core/interfaces"
98
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
@@ -48,8 +47,8 @@ func GetSpiderById(c *gin.Context) {
4847
}
4948
}
5049

51-
// data collection
52-
if !s.ColId.IsZero() {
50+
// data collection (compatible to old version) # TODO: remove in the future
51+
if s.ColName == "" && !s.ColId.IsZero() {
5352
col, err := service.NewModelServiceV2[models2.DataCollectionV2]().GetById(s.ColId)
5453
if err != nil {
5554
if !errors.Is(err, mongo2.ErrNoDocuments) {
@@ -252,12 +251,6 @@ func PostSpider(c *gin.Context) {
252251
return
253252
}
254253

255-
// upsert data collection
256-
if err := upsertSpiderDataCollection(&s); err != nil {
257-
HandleErrorInternalServerError(c, err)
258-
return
259-
}
260-
261254
// user
262255
u := GetUserFromContextV2(c)
263256

@@ -311,12 +304,6 @@ func PutSpiderById(c *gin.Context) {
311304
return
312305
}
313306

314-
// upsert data collection
315-
if err := upsertSpiderDataCollection(&s); err != nil {
316-
HandleErrorInternalServerError(c, err)
317-
return
318-
}
319-
320307
u := GetUserFromContextV2(c)
321308

322309
modelSvc := service.NewModelServiceV2[models2.SpiderV2]()
@@ -773,49 +760,6 @@ func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsServiceV2, err
773760
return getSpiderFsSvc(s)
774761
}
775762

776-
func upsertSpiderDataCollection(s *models2.SpiderV2) (err error) {
777-
modelSvc := service.NewModelServiceV2[models2.DataCollectionV2]()
778-
if s.ColId.IsZero() {
779-
// validate
780-
if s.ColName == "" {
781-
return errors.New("data collection name is required")
782-
}
783-
// no id
784-
dc, err := modelSvc.GetOne(bson.M{"name": s.ColName}, nil)
785-
if err != nil {
786-
if errors.Is(err, mongo2.ErrNoDocuments) {
787-
// not exists, add new
788-
dc = &models2.DataCollectionV2{Name: s.ColName}
789-
dcId, err := modelSvc.InsertOne(*dc)
790-
if err != nil {
791-
return err
792-
}
793-
dc.SetId(dcId)
794-
} else {
795-
// error
796-
return err
797-
}
798-
}
799-
s.ColId = dc.Id
800-
801-
// create index
802-
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.TaskKey: 1}})
803-
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.HashKey: 1}})
804-
} else {
805-
// with id
806-
dc, err := modelSvc.GetById(s.ColId)
807-
if err != nil {
808-
return err
809-
}
810-
s.ColId = dc.Id
811-
}
812-
return nil
813-
}
814-
815-
func UpsertSpiderDataCollection(s *models2.SpiderV2) (err error) {
816-
return upsertSpiderDataCollection(s)
817-
}
818-
819763
func getSpiderRootPath(c *gin.Context) (rootPath string, err error) {
820764
// spider id
821765
id, err := primitive.ObjectIDFromHex(c.Param("id"))

core/controllers/task_v2.go

+22-22
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
log2 "github.com/apex/log"
66
"github.com/crawlab-team/crawlab/core/constants"
77
"github.com/crawlab-team/crawlab/core/interfaces"
8-
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
8+
"github.com/crawlab-team/crawlab/core/models/models/v2"
99
"github.com/crawlab-team/crawlab/core/models/service"
1010
"github.com/crawlab-team/crawlab/core/result"
1111
"github.com/crawlab-team/crawlab/core/spider/admin"
@@ -34,7 +34,7 @@ func GetTaskById(c *gin.Context) {
3434
}
3535

3636
// task
37-
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
37+
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
3838
if errors.Is(err, mongo2.ErrNoDocuments) {
3939
HandleErrorNotFound(c, err)
4040
return
@@ -45,7 +45,7 @@ func GetTaskById(c *gin.Context) {
4545
}
4646

4747
// spider
48-
t.Spider, _ = service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId)
48+
t.Spider, _ = service.NewModelServiceV2[models.SpiderV2]().GetById(t.SpiderId)
4949

5050
// skip if task status is pending
5151
if t.Status == constants.TaskStatusPending {
@@ -54,15 +54,15 @@ func GetTaskById(c *gin.Context) {
5454
}
5555

5656
// task stat
57-
t.Stat, _ = service.NewModelServiceV2[models2.TaskStatV2]().GetById(id)
57+
t.Stat, _ = service.NewModelServiceV2[models.TaskStatV2]().GetById(id)
5858

5959
HandleSuccessWithData(c, t)
6060
}
6161

6262
func GetTaskList(c *gin.Context) {
6363
withStats := c.Query("stats")
6464
if withStats == "" {
65-
NewControllerV2[models2.TaskV2]().GetList(c)
65+
NewControllerV2[models.TaskV2]().GetList(c)
6666
return
6767
}
6868

@@ -72,7 +72,7 @@ func GetTaskList(c *gin.Context) {
7272
sort := MustGetSortOption(c)
7373

7474
// get tasks
75-
tasks, err := service.NewModelServiceV2[models2.TaskV2]().GetMany(query, &mongo.FindOptions{
75+
tasks, err := service.NewModelServiceV2[models.TaskV2]().GetMany(query, &mongo.FindOptions{
7676
Sort: sort,
7777
Skip: pagination.Size * (pagination.Page - 1),
7878
Limit: pagination.Size,
@@ -101,14 +101,14 @@ func GetTaskList(c *gin.Context) {
101101
}
102102

103103
// total count
104-
total, err := service.NewModelServiceV2[models2.TaskV2]().Count(query)
104+
total, err := service.NewModelServiceV2[models.TaskV2]().Count(query)
105105
if err != nil {
106106
HandleErrorInternalServerError(c, err)
107107
return
108108
}
109109

110110
// stat list
111-
stats, err := service.NewModelServiceV2[models2.TaskStatV2]().GetMany(bson.M{
111+
stats, err := service.NewModelServiceV2[models.TaskStatV2]().GetMany(bson.M{
112112
"_id": bson.M{
113113
"$in": taskIds,
114114
},
@@ -119,13 +119,13 @@ func GetTaskList(c *gin.Context) {
119119
}
120120

121121
// cache stat list to dict
122-
statsDict := map[primitive.ObjectID]models2.TaskStatV2{}
122+
statsDict := map[primitive.ObjectID]models.TaskStatV2{}
123123
for _, s := range stats {
124124
statsDict[s.Id] = s
125125
}
126126

127127
// spider list
128-
spiders, err := service.NewModelServiceV2[models2.SpiderV2]().GetMany(bson.M{
128+
spiders, err := service.NewModelServiceV2[models.SpiderV2]().GetMany(bson.M{
129129
"_id": bson.M{
130130
"$in": spiderIds,
131131
},
@@ -136,7 +136,7 @@ func GetTaskList(c *gin.Context) {
136136
}
137137

138138
// cache spider list to dict
139-
spiderDict := map[primitive.ObjectID]models2.SpiderV2{}
139+
spiderDict := map[primitive.ObjectID]models.SpiderV2{}
140140
for _, s := range spiders {
141141
spiderDict[s.Id] = s
142142
}
@@ -170,22 +170,22 @@ func DeleteTaskById(c *gin.Context) {
170170
// delete in db
171171
if err := mongo.RunTransaction(func(context mongo2.SessionContext) (err error) {
172172
// delete task
173-
_, err = service.NewModelServiceV2[models2.TaskV2]().GetById(id)
173+
_, err = service.NewModelServiceV2[models.TaskV2]().GetById(id)
174174
if err != nil {
175175
return err
176176
}
177-
err = service.NewModelServiceV2[models2.TaskV2]().DeleteById(id)
177+
err = service.NewModelServiceV2[models.TaskV2]().DeleteById(id)
178178
if err != nil {
179179
return err
180180
}
181181

182182
// delete task stat
183-
_, err = service.NewModelServiceV2[models2.TaskStatV2]().GetById(id)
183+
_, err = service.NewModelServiceV2[models.TaskStatV2]().GetById(id)
184184
if err != nil {
185185
log2.Warnf("delete task stat error: %s", err.Error())
186186
return nil
187187
}
188-
err = service.NewModelServiceV2[models2.TaskStatV2]().DeleteById(id)
188+
err = service.NewModelServiceV2[models.TaskStatV2]().DeleteById(id)
189189
if err != nil {
190190
log2.Warnf("delete task stat error: %s", err.Error())
191191
return nil
@@ -217,7 +217,7 @@ func DeleteList(c *gin.Context) {
217217

218218
if err := mongo.RunTransaction(func(context mongo2.SessionContext) error {
219219
// delete tasks
220-
if err := service.NewModelServiceV2[models2.TaskV2]().DeleteMany(bson.M{
220+
if err := service.NewModelServiceV2[models.TaskV2]().DeleteMany(bson.M{
221221
"_id": bson.M{
222222
"$in": payload.Ids,
223223
},
@@ -226,7 +226,7 @@ func DeleteList(c *gin.Context) {
226226
}
227227

228228
// delete task stats
229-
if err := service.NewModelServiceV2[models2.TaskV2]().DeleteMany(bson.M{
229+
if err := service.NewModelServiceV2[models.TaskV2]().DeleteMany(bson.M{
230230
"_id": bson.M{
231231
"$in": payload.Ids,
232232
},
@@ -261,7 +261,7 @@ func DeleteList(c *gin.Context) {
261261

262262
func PostTaskRun(c *gin.Context) {
263263
// task
264-
var t models2.TaskV2
264+
var t models.TaskV2
265265
if err := c.ShouldBindJSON(&t); err != nil {
266266
HandleErrorBadRequest(c, err)
267267
return
@@ -274,7 +274,7 @@ func PostTaskRun(c *gin.Context) {
274274
}
275275

276276
// spider
277-
s, err := service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId)
277+
s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(t.SpiderId)
278278
if err != nil {
279279
HandleErrorInternalServerError(c, err)
280280
return
@@ -319,7 +319,7 @@ func PostTaskRestart(c *gin.Context) {
319319
}
320320

321321
// task
322-
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
322+
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
323323
if err != nil {
324324
HandleErrorInternalServerError(c, err)
325325
return
@@ -363,7 +363,7 @@ func PostTaskCancel(c *gin.Context) {
363363
}
364364

365365
// task
366-
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
366+
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
367367
if err != nil {
368368
HandleErrorInternalServerError(c, err)
369369
return
@@ -446,7 +446,7 @@ func GetTaskData(c *gin.Context) {
446446
}
447447

448448
// task
449-
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
449+
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
450450
if err != nil {
451451
HandleErrorInternalServerError(c, err)
452452
return
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package interfaces
2+
3+
import (
4+
"go.mongodb.org/mongo-driver/bson/primitive"
5+
)
6+
7+
type DatabaseRegistryService interface {
8+
Start()
9+
CheckStatus()
10+
GetDatabaseService(id primitive.ObjectID) (res DatabaseService, err error)
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package interfaces
2+
3+
import (
4+
"github.com/crawlab-team/crawlab/core/database/entity"
5+
"github.com/crawlab-team/crawlab/core/models/models/v2"
6+
"go.mongodb.org/mongo-driver/bson/primitive"
7+
)
8+
9+
type DatabaseService interface {
10+
TestConnection(id primitive.ObjectID) (err error)
11+
GetMetadata(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error)
12+
GetMetadataAllDb(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error)
13+
CreateDatabase(id primitive.ObjectID, databaseName string) (err error)
14+
DropDatabase(id primitive.ObjectID, databaseName string) (err error)
15+
GetTableMetadata(id primitive.ObjectID, databaseName, tableName string) (table *entity.DatabaseTable, err error)
16+
CreateTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
17+
ModifyTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
18+
DropTable(id primitive.ObjectID, databaseName, tableName string) (err error)
19+
RenameTable(id primitive.ObjectID, databaseName, oldTableName, newTableName string) (err error)
20+
GetColumnTypes(query string) (types []string)
21+
ReadRows(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}, skip, limit int) ([]map[string]interface{}, int64, error)
22+
CreateRow(id primitive.ObjectID, databaseName, tableName string, row map[string]interface{}) error
23+
UpdateRow(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}, update map[string]interface{}) error
24+
DeleteRow(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}) error
25+
Query(id primitive.ObjectID, databaseName, query string) (results *entity.DatabaseQueryResults, err error)
26+
GetCurrentMetric(id primitive.ObjectID) (m *models.DatabaseMetricV2, err error)
27+
}

core/database/registry_service.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package database
2+
3+
import (
4+
"github.com/crawlab-team/crawlab/core/database/interfaces"
5+
)
6+
7+
var serviceInstance interfaces.DatabaseRegistryService
8+
9+
func SetDatabaseRegistryService(svc interfaces.DatabaseRegistryService) {
10+
serviceInstance = svc
11+
}
12+
13+
func GetDatabaseRegistryService() interfaces.DatabaseRegistryService {
14+
return serviceInstance
15+
}

core/grpc/server/task_server_v2.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (svr TaskServerV2) handleInsertData(msg *grpc.StreamMessage) (err error) {
214214
if err != nil {
215215
return err
216216
}
217-
var records []interface{}
217+
var records []map[string]interface{}
218218
for _, d := range data.Records {
219219
res, ok := d[constants.TaskKey]
220220
if ok {

core/models/models/v2/spider_v2.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ type SpiderV2 struct {
88
any `collection:"spiders"`
99
BaseModelV2[SpiderV2] `bson:",inline"`
1010
Name string `json:"name" bson:"name"` // spider name
11-
ColId primitive.ObjectID `json:"col_id" bson:"col_id"` // data collection id
12-
ColName string `json:"col_name,omitempty" bson:"-"` // data collection name
11+
ColId primitive.ObjectID `json:"col_id" bson:"col_id"` // data collection id (deprecated) # TODO: remove this field in the future
12+
ColName string `json:"col_name,omitempty" bson:"col_name"` // data collection name
1313
DataSourceId primitive.ObjectID `json:"data_source_id" bson:"data_source_id"` // data source id
1414
DataSource *DatabaseV2 `json:"data_source,omitempty" bson:"-"` // data source
1515
Description string `json:"description" bson:"description"` // description

core/result/service.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.Res
3838

3939
var store = sync.Map{}
4040

41-
func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfaces.ResultService, err error) {
41+
func GetResultService(spiderId primitive.ObjectID) (svc2 interfaces.ResultService, err error) {
4242
// model service
4343
modelSvc, err := service.GetService()
4444
if err != nil {
@@ -51,12 +51,6 @@ func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfa
5151
return nil, trace.TraceError(err)
5252
}
5353

54-
// apply options
55-
_opts := &Options{}
56-
for _, opt := range opts {
57-
opt(_opts)
58-
}
59-
6054
// store key
6155
storeKey := s.ColId.Hex() + ":" + s.DataSourceId.Hex()
6256

0 commit comments

Comments
 (0)