Skip to content

Commit d3e0d0d

Browse files
Added suport for multi-collection workloads and upserts
1 parent 0ebaf2a commit d3e0d0d

File tree

3 files changed

+55
-21
lines changed

3 files changed

+55
-21
lines changed

README.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ tar -xzvf plgm-darwin-arm64.tar.gz
4343
This project includes a `Makefile` to simplify building and packaging.
4444

4545
```bash
46-
git clone <repository-url>
46+
git clone https://github.com/Percona-Lab/percona-load-generator-mongodb.git
4747
cd plgm
4848
go mod tidy
4949

@@ -182,8 +182,8 @@ You can override any setting in `config.yaml` using environment variables. This
182182
| `concurrency` | `PERCONALOAD_CONCURRENCY` | Number of active worker goroutines | `50` |
183183
| `duration` | `PERCONALOAD_DURATION` | Test duration (Go duration string) | `5m`, `60s` |
184184
| `default_workload` | `PERCONALOAD_DEFAULT_WORKLOAD` | Use built-in "Flights" workload (`true`/`false`) | `false` |
185-
| `collections_path` | `PERCONALOAD_COLLECTIONS_PATH` | Path to custom collection JSON files | `./schemas` |
186-
| `queries_path` | `PERCONALOAD_QUERIES_PATH` | Path to custom query JSON files | `./queries` |
185+
| `collections_path` | `PERCONALOAD_COLLECTIONS_PATH` | Path to custom collection JSON files (supports directories for multi-collection load) | `./schemas` |
186+
| `queries_path` | `PERCONALOAD_QUERIES_PATH` | Path to custom query JSON files or directory. | `./queries` |
187187
| `documents_count` | `PERCONALOAD_DOCUMENTS_COUNT` | Number of documents to seed initially | `10000` |
188188
| `drop_collections` | `PERCONALOAD_DROP_COLLECTIONS` | Drop collections before starting (`true`/`false`) | `true` |
189189
| `skip_seed` | `PERCONALOAD_SKIP_SEED` | Do not seed initial data on start (`true`/`false`) | `true` |
@@ -225,8 +225,9 @@ When executed, plgm performs the following steps:
225225
* (Optional) Seeds initial data with the number of documents defined by `documents_count` in the config.
226226
3. **Workload Execution:**
227227
* Spawns the configured number of **Active Workers**.
228-
* Continuously generates and executes queries (Find, Insert, Update, Delete, Aggregate) based on your configured ratios.
228+
* Continuously generates and executes queries (Find, Insert, Update, Delete, Aggregate, Upsert) based on your configured ratios.
229229
* Generates realistic BSON data for Inserts and Updates (supports recursion and complex schemas).
230+
* Workers pick a random collection from the provided list for every operation.
230231
4. **Reporting:**
231232
* Outputs a real-time status report every N seconds (configurable).
232233
* Prints a detailed summary table at the end of the run.
@@ -323,6 +324,14 @@ To run your own workload against your own schema:
323324
"operation": "find",
324325
"filter": { "customer_name": "<string>" },
325326
"limit": 10
327+
},
328+
{
329+
"database": "ecommerce",
330+
"collection": "orders",
331+
"operation": "updateOne",
332+
"filter": { "order_uuid": "<string>" },
333+
"update": { "$set": { "status": "processed" } },
334+
"upsert": true
326335
}
327336
]
328337
```
@@ -383,6 +392,7 @@ Please note:
383392
* If there are no aggregation queries defined in queries.json, the aggregate_percent value is also ignored.
384393
* Aggregate operations will only generate activity if at least one query with "operation": "aggregate" is defined in your active JSON query files.
385394
* The maximum number of operations within a transaction is defined in the config file via `max_transaction_ops` or the env var `PERCONALOAD_MAX_TRANSACTION_OPS`. The number of operations per transaction will be randomized, with the max number being set as explained above.
395+
* Multi-Collection Load: If multiple collections are defined in your collections_path, each worker will randomly select a collection for every operation. This includes operations within a transaction, allowing for cross-collection atomic updates.
386396

387397

388398
#### Concurrency & Workers
@@ -413,6 +423,7 @@ These settings affect the efficiency of individual database operations and memor
413423
* **`insert_cache_size`**: The buffer size for the document generator channel.
414424
* *Tip:* This decouples document generation from database insertion. A larger buffer ensures workers rarely wait for data generation logic.
415425
* *Default:* `1000`
426+
* **`Upserts`**: Any updateOne or updateMany operation in your query JSON files can include "upsert": true. This will cause MongoDB to create the document if no match is found for the filter.
416427

417428
#### Timeouts & Reliability
418429
Control how plgm reacts to network lag or database pressure.

internal/config/queries.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type QueryDefinition struct {
1717
Projection map[string]interface{} `json:"projection,omitempty" yaml:"projection,omitempty"`
1818
Limit int64 `json:"limit,omitempty" yaml:"limit,omitempty"`
1919
Update map[string]interface{} `json:"update,omitempty" yaml:"update,omitempty"`
20+
Upsert bool `json:"upsert,omitempty" yaml:"upsert,omitempty"`
2021
}
2122

2223
type QueriesFile struct {

internal/mongo/runner.go

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,14 @@ func runTransaction(ctx context.Context, id int, wCfg workloadConfig, rng *rand.
171171
}
172172
defer session.EndSession(ctx)
173173

174-
mainCol := wCfg.collections[0]
175174
start := time.Now()
176175

177176
_, err = session.WithTransaction(ctx, func(sessCtx context.Context) (interface{}, error) {
178-
// generates a random number between 1 and MaxTransactionOps, this controls the number of random operations per transaction
179177
numOps := rng.Intn(wCfg.appConfig.MaxTransactionOps) + 1
180178
for i := 0; i < numOps; i++ {
181-
// Select standard CRUD
179+
// Select random collection for this step of the transaction
180+
currentCol := wCfg.collections[rng.Intn(len(wCfg.collections))]
181+
182182
innerOp := selectOperation(wCfg.percentages, rng)
183183
if innerOp == "aggregate" || innerOp == "transaction" {
184184
innerOp = "find"
@@ -188,10 +188,10 @@ func runTransaction(ctx context.Context, id int, wCfg workloadConfig, rng *rand.
188188
var run bool
189189

190190
if innerOp == "insert" {
191-
q = generateInsertQuery(mainCol, rng, wCfg.appConfig)
191+
q = generateInsertQuery(currentCol, rng, wCfg.appConfig)
192192
run = true
193193
} else {
194-
q, run = selectRandomQueryByType(sessCtx, wCfg.database, innerOp, wCfg.queryMap, mainCol, wCfg.debug, rng, wCfg.primaryFilterField, wCfg.appConfig)
194+
q, run = selectRandomQueryByType(sessCtx, wCfg.database, innerOp, wCfg.queryMap, currentCol, wCfg.debug, rng, wCfg.primaryFilterField, wCfg.appConfig)
195195
}
196196

197197
if !run {
@@ -210,8 +210,12 @@ func runTransaction(ctx context.Context, id int, wCfg workloadConfig, rng *rand.
210210
}
211211
_ = cursor.Close(sessCtx)
212212
}
213-
case "updateOne", "updateMany":
214-
_, err = coll.UpdateOne(sessCtx, filter, q.Update)
213+
case "updateOne":
214+
opts := options.UpdateOne().SetUpsert(q.Upsert)
215+
_, err = coll.UpdateOne(sessCtx, filter, q.Update, opts)
216+
case "updateMany":
217+
opts := options.UpdateMany().SetUpsert(q.Upsert)
218+
_, err = coll.UpdateMany(sessCtx, filter, q.Update, opts)
215219
case "deleteOne", "deleteMany":
216220
_, err = coll.DeleteOne(sessCtx, filter)
217221
case "insert":
@@ -238,7 +242,6 @@ func runTransaction(ctx context.Context, id int, wCfg workloadConfig, rng *rand.
238242
func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg workloadConfig, rng *rand.Rand) {
239243
defer wg.Done()
240244
dbOpCtx := context.Background()
241-
mainCol := wCfg.collections[0]
242245

243246
for {
244247
select {
@@ -247,23 +250,28 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
247250
default:
248251
}
249252

253+
// Pick random collection for this operation
254+
currentCol := wCfg.collections[rng.Intn(len(wCfg.collections))]
255+
250256
opType := selectOperation(wCfg.percentages, rng)
251257

252-
// Handle Transaction Block
253258
if opType == "transaction" {
254-
runTransaction(ctx, id, wCfg, rng)
255-
continue
259+
if wCfg.appConfig.UseTransactions {
260+
runTransaction(ctx, id, wCfg, rng)
261+
continue
262+
}
263+
opType = "find"
256264
}
257265

258266
var q config.QueryDefinition
259267
var run bool
260268

261269
switch opType {
262270
case "insert":
263-
q = generateInsertQuery(mainCol, rng, wCfg.appConfig)
271+
q = generateInsertQuery(currentCol, rng, wCfg.appConfig)
264272
run = true
265273
case "find", "updateOne", "updateMany", "deleteOne", "deleteMany", "aggregate":
266-
q, run = selectRandomQueryByType(dbOpCtx, wCfg.database, opType, wCfg.queryMap, mainCol, wCfg.debug, rng, wCfg.primaryFilterField, wCfg.appConfig)
274+
q, run = selectRandomQueryByType(dbOpCtx, wCfg.database, opType, wCfg.queryMap, currentCol, wCfg.debug, rng, wCfg.primaryFilterField, wCfg.appConfig)
267275
default:
268276
time.Sleep(100 * time.Microsecond)
269277
continue
@@ -318,8 +326,20 @@ func independentWorker(ctx context.Context, id int, wg *sync.WaitGroup, wCfg wor
318326
}
319327
_ = cursor.Close(dbOpCtx)
320328
}
321-
case "updateOne", "updateMany":
322-
coll.UpdateOne(dbOpCtx, filter, q.Update)
329+
case "updateOne":
330+
// Use UpdateOne() for single document updates in v2
331+
opts := options.UpdateOne().SetUpsert(q.Upsert)
332+
_, err := coll.UpdateOne(dbOpCtx, filter, q.Update, opts)
333+
if err != nil && wCfg.debug {
334+
log.Printf("[Worker %d] UpdateOne error: %v", id, err)
335+
}
336+
case "updateMany":
337+
// Use UpdateMany() for multiple document updates in v2
338+
opts := options.UpdateMany().SetUpsert(q.Upsert)
339+
_, err := coll.UpdateMany(dbOpCtx, filter, q.Update, opts)
340+
if err != nil && wCfg.debug {
341+
log.Printf("[Worker %d] UpdateMany error: %v", id, err)
342+
}
323343
case "deleteOne", "deleteMany":
324344
coll.DeleteOne(dbOpCtx, filter)
325345
case "insert":
@@ -435,8 +455,10 @@ func runContinuousWorkload(ctx context.Context, wCfg workloadConfig) error {
435455
workloadCtx, cancel := context.WithTimeout(ctx, wCfg.duration)
436456
defer cancel()
437457

438-
mainCol := wCfg.collections[0]
439-
go insertDocumentProducer(workloadCtx, mainCol, wCfg.maxInsertCache, wCfg.appConfig)
458+
// Handle initial document producer for multiple collections
459+
for _, col := range wCfg.collections {
460+
go insertDocumentProducer(workloadCtx, col, wCfg.maxInsertCache, wCfg.appConfig)
461+
}
440462

441463
monitorDone := make(chan struct{})
442464
go func() {

0 commit comments

Comments
 (0)