Skip to content

Commit 91db3e3

Browse files
committed
support bulk load time limits
1 parent 97aed20 commit 91db3e3

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

cmd/bulk_load_influx/main.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import (
2525

2626
// Program option vars:
2727
var (
28-
csvDaemonUrls string
29-
daemonUrls []string
28+
csvDaemonUrls string
29+
daemonUrls []string
3030
dbName string
3131
replicationFactor int
3232
workers int
3333
lineLimit int64
3434
batchSize int
3535
backoff time.Duration
36+
timeLimit time.Duration
3637
doLoad bool
3738
doDBCreate bool
3839
useGzip bool
@@ -42,10 +43,10 @@ var (
4243

4344
// Global vars
4445
var (
45-
bufPool sync.Pool
46-
batchChan chan *bytes.Buffer
47-
inputDone chan struct{}
48-
workersGroup sync.WaitGroup
46+
bufPool sync.Pool
47+
batchChan chan *bytes.Buffer
48+
inputDone chan struct{}
49+
workersGroup sync.WaitGroup
4950
backingOffChans []chan bool
5051
backingOffDones []chan struct{}
5152
)
@@ -59,6 +60,7 @@ func init() {
5960
flag.IntVar(&workers, "workers", 1, "Number of parallel requests to make.")
6061
flag.Int64Var(&lineLimit, "line-limit", -1, "Number of lines to read from stdin before quitting.")
6162
flag.DurationVar(&backoff, "backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.")
63+
flag.DurationVar(&timeLimit, "time-limit", -1, "Maximum duration to run (-1 is the default: no limit).")
6264
flag.BoolVar(&useGzip, "gzip", false, "Whether to gzip encode requests.")
6365
flag.BoolVar(&doLoad, "do-load", true, "Whether to write data. Set this flag to false to check input read speed.")
6466
flag.BoolVar(&doDBCreate, "do-db-create", true, "Whether to create the database.")
@@ -99,7 +101,7 @@ func main() {
99101
if err != nil {
100102
log.Fatal(err)
101103
}
102-
time.Sleep(2000*time.Millisecond)
104+
time.Sleep(1000 * time.Millisecond)
103105
}
104106
}
105107

@@ -121,9 +123,9 @@ func main() {
121123
backingOffDones[i] = make(chan struct{})
122124
workersGroup.Add(1)
123125
cfg := HTTPWriterConfig{
124-
DebugInfo: fmt.Sprintf("worker #%d, dest url: %s", i, daemonUrl),
125-
Host: daemonUrl,
126-
Database: dbName,
126+
DebugInfo: fmt.Sprintf("worker #%d, dest url: %s", i, daemonUrl),
127+
Host: daemonUrl,
128+
Database: dbName,
127129
BackingOffChan: backingOffChans[i],
128130
BackingOffDone: backingOffDones[i],
129131
}
@@ -159,8 +161,13 @@ func scan(linesPerBatch int) int64 {
159161
var n int
160162
var itemsRead int64
161163
newline := []byte("\n")
164+
var deadline time.Time
165+
if timeLimit >= 0 {
166+
deadline = time.Now().Add(timeLimit)
167+
}
162168

163169
scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 4*1024*1024))
170+
outer:
164171
for scanner.Scan() {
165172
if itemsRead == lineLimit {
166173
break
@@ -173,6 +180,9 @@ func scan(linesPerBatch int) int64 {
173180

174181
n++
175182
if n >= linesPerBatch {
183+
if timeLimit >= 0 && time.Now().After(deadline) {
184+
break outer
185+
}
176186
batchChan <- buf
177187
buf = bufPool.Get().(*bytes.Buffer)
178188
n = 0
@@ -212,7 +222,6 @@ func processBatches(w *HTTPWriter, backoffSrc chan bool, backoffDst chan struct{
212222
_, err = w.WriteLineProtocol(batch.Bytes(), false)
213223
}
214224

215-
216225
if err == BackoffError {
217226
backoffSrc <- true
218227
time.Sleep(backoff)

0 commit comments

Comments
 (0)