Skip to content

Commit eb173f2

Browse files
author
cfbber
committed
Resolve conflicts with upstream/master
2 parents 59fe680 + c0cd939 commit eb173f2

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

build.sh

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
2020

2121
rm -rf version.go
22+
# Formatting Go code with gofmt
23+
find . -name '*.go' -exec gofmt -w {} \;
2224
go generate
2325
go build
2426
echo "Build success. Output: ${ROOT}/doris-streamloader"

loader/stream_loader.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package loader
1919

2020
import (
21+
"doris-streamloader/report"
2122
"encoding/json"
2223
"fmt"
2324
"io"
2425
"io/ioutil"
2526
"net/http"
27+
"strconv"
2628
"strings"
2729
"sync"
2830
"sync/atomic"
@@ -31,7 +33,6 @@ import (
3133

3234
"github.com/pierrec/lz4/v4"
3335
log "github.com/sirupsen/logrus"
34-
"doris-streamloader/report"
3536
)
3637

3738
type StreamLoadOption struct {
@@ -143,7 +144,7 @@ func (s *StreamLoad) createUrl() string {
143144
}
144145

145146
// stream load create http request with string data
146-
func (s *StreamLoad) createRequest(url string, reader io.Reader) (req *http.Request, err error) {
147+
func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex int, taskIndex int) (req *http.Request, err error) {
147148
req, err = http.NewRequest("PUT", url, reader)
148149
if err != nil {
149150
return
@@ -155,6 +156,19 @@ func (s *StreamLoad) createRequest(url string, reader io.Reader) (req *http.Requ
155156
req.Header.Set("Content-Type", "text/plain")
156157
for k, v := range s.headers {
157158
req.Header.Set(k, v)
159+
// If a label has already been set in the headers, to prevent conflicts,
160+
//generate a unique label by combining the original label, worker index, and task index.
161+
if k == "label" {
162+
var builder strings.Builder
163+
builder.WriteString(v)
164+
builder.WriteString("_")
165+
builder.WriteString(strconv.Itoa(workerIndex))
166+
builder.WriteString("_")
167+
builder.WriteString(strconv.Itoa(taskIndex))
168+
169+
req.Header.Set("label", builder.String())
170+
}
171+
158172
}
159173

160174
if s.Compress {
@@ -228,10 +242,10 @@ func (s *StreamLoad) readData(isEOS *atomic.Bool, rawWriter *io.PipeWriter, read
228242
}
229243
}
230244

231-
func (s *StreamLoad) send(url string, reader io.Reader) (*http.Response, error) {
245+
func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, taskIndex int) (*http.Response, error) {
232246
realUrl := url
233247
for {
234-
req, err := s.createRequest(realUrl, reader)
248+
req, err := s.createRequest(realUrl, reader, workerIndex, taskIndex)
235249
if err != nil {
236250
if req == nil {
237251
return nil, err
@@ -347,7 +361,7 @@ func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, maxBytesPerTask int,
347361
workerIndex: workerIndex,
348362
taskIndex: taskIndex,
349363
})
350-
if resp, err := s.send(url, NopCloser(pr)); err != nil {
364+
if resp, err := s.send(url, NopCloser(pr), workerIndex, taskIndex); err != nil {
351365
s.handleSendError(workerIndex, taskIndex)
352366
log.Errorf("Send error, resp: %v error message: %v", resp, err)
353367
return

reader/reader.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package file
1919

2020
import (
2121
"bufio"
22+
"doris-streamloader/loader"
23+
"doris-streamloader/report"
2224
"io"
2325
"os"
2426
"path/filepath"
@@ -27,9 +29,6 @@ import (
2729
"sync/atomic"
2830
"time"
2931

30-
loader "doris-streamloader/loader"
31-
report "doris-streamloader/report"
32-
3332
log "github.com/sirupsen/logrus"
3433
)
3534

@@ -138,7 +137,7 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas
138137
break
139138
} else if err != nil {
140139
log.Errorf("Read file failed, error message: %v, before retrying, we suggest:\n1.Check the input data files and fix if there is any problem.\n2.Do select count(*) to check whether data is partially loaded.\n3.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n4.Otherwise, just retry.\n", err)
141-
if len(line) !=0 {
140+
if len(line) != 0 {
142141
log.Error("5.When using a specified line delimiter, the file must end with that delimiter.")
143142
}
144143
os.Exit(1)

0 commit comments

Comments
 (0)