Skip to content

Commit ed093db

Browse files
committed
conn & memlim
1 parent 8cb4933 commit ed093db

File tree

4 files changed

+95
-45
lines changed

4 files changed

+95
-45
lines changed

MemLimit.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ func (m *MemLimiter) MemAvail() (retbool bool) {
7171
}
7272

7373
func (m *MemLimiter) MemLockWait(item *segmentChanItem, who string) {
74+
item.mux.Lock()
75+
if item.memlocked > 0 {
76+
item.mux.Unlock()
77+
dlog(always, "MemLockWait called on already memlocked item seg.Id='%s' who='%s'", item.segment.Id, who)
78+
return
79+
}
80+
item.mux.Unlock()
7481

7582
GCounter.Incr("MemLockWait")
7683
defer GCounter.Decr("MemLockWait")
@@ -102,6 +109,9 @@ func (m *MemLimiter) MemLockWait(item *segmentChanItem, who string) {
102109
} // end for waithere
103110
*/
104111
<-m.memchan // infinite wait to get a slot from chan
112+
item.mux.Lock()
113+
item.memlocked++
114+
item.mux.Unlock()
105115

106116
m.mux.Lock()
107117
m.waiting--
@@ -115,7 +125,12 @@ func (m *MemLimiter) MemLockWait(item *segmentChanItem, who string) {
115125
func (m *MemLimiter) MemReturn(who string, item *segmentChanItem) {
116126
//dlog(cfg.opt.DebugMemlim, "MemReturn free seg.Id='%s' who='%s'", item.segment.Id, who)
117127
defer GCounter.Incr("TOTAL_MemReturned")
118-
128+
item.mux.RLock()
129+
if item.memlocked == 0 {
130+
dlog(always, "MemReturn called on non-memlocked item seg.Id='%s' who='%s'", item.segment.Id, who)
131+
return // not memlocked, nothing to do
132+
}
133+
item.mux.RUnlock()
119134
// remove map entry from mem
120135
m.mux.Lock()
121136
delete(m.memdata, item)
@@ -130,6 +145,8 @@ func (m *MemLimiter) MemReturn(who string, item *segmentChanItem) {
130145
dlog(always, "ERROR on MemReturn chan is full seg.Id='%s' who='%s'", item.segment.Id, who)
131146
os.Exit(1) // this is a bug! we should never return a slot to a full chan!
132147
}
133-
148+
item.mux.Lock()
149+
item.memlocked--
150+
item.mux.Unlock()
134151
dlog(cfg.opt.DebugMemlim, "MemReturned seg.Id='%s' who='%s'", item.segment.Id, who)
135152
} // end func memlim.MemReturn

Routines.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ func (s *SESSION) GoCheckRoutine(wid int, provider *Provider, item *segmentChanI
4040
connitem, err = provider.ConnPool.GetConn()
4141
}
4242
if err != nil {
43-
return 0, fmt.Errorf("ERROR in GoCheckRoutine: ConnGet '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
43+
return 0, fmt.Errorf("error in GoCheckRoutine: ConnGet '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
4444
}
4545
if connitem == nil || connitem.conn == nil {
46-
return 0, fmt.Errorf("ERROR in GoCheckRoutine: ConnGet got nil item or conn '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
46+
return 0, fmt.Errorf("error in GoCheckRoutine: ConnGet got nil item or conn '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
4747
}
4848

4949
code, msg, err := CMD_STAT(connitem, item)
@@ -165,10 +165,10 @@ func (s *SESSION) GoDownsRoutine(wid int, provider *Provider, item *segmentChanI
165165
connitem, err = provider.ConnPool.GetConn()
166166
}
167167
if err != nil {
168-
return 0, fmt.Errorf("ERROR in GoDownsRoutine: ConnGet '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
168+
return 0, fmt.Errorf("error in GoDownsRoutine: ConnGet '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
169169
}
170170
if connitem == nil || connitem.conn == nil {
171-
return 0, fmt.Errorf("ERROR in GoDownsRoutine: ConnGet got nil item or conn '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
171+
return 0, fmt.Errorf("error in GoDownsRoutine: ConnGet got nil item or conn '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
172172
}
173173
dlog(cfg.opt.DebugWorker, "GoDownsRoutine got connitem='%v' sharedCC='%v' --> CMD_ARTICLE seg.Id='%s'", connitem, sharedCC, item.segment.Id)
174174

@@ -326,6 +326,15 @@ func (s *SESSION) GoReupsRoutine(wid int, provider *Provider, item *segmentChanI
326326
defer GCounter.Decr("GoReupsRoutines")
327327
//who := fmt.Sprintf("UR=%d@'%s' seg.Id='%s'", wid, provider.Name, item.segment.Id) // DISABLED MEMRETURN
328328

329+
item.mux.Lock()
330+
memlocked := item.memlocked > 0
331+
item.mux.Unlock()
332+
333+
if !memlocked {
334+
dlog(cfg.opt.DebugWorker, "GoReupsRoutine: item not memlocked seg.Id='%s' @ '%s'#'%s'", item.segment.Id, provider.Name, provider.Group)
335+
memlim.MemLockWait(item, "GoUR") // gets memlock here
336+
}
337+
329338
var err error
330339
var connitem *ConnItem
331340
if sharedCC != nil {
@@ -334,10 +343,10 @@ func (s *SESSION) GoReupsRoutine(wid int, provider *Provider, item *segmentChanI
334343
connitem, err = provider.ConnPool.GetConn()
335344
}
336345
if err != nil {
337-
return 0, fmt.Errorf("ERROR in GoReupsRoutine: ConnGet '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
346+
return 0, fmt.Errorf("error in GoReupsRoutine: ConnGet '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
338347
}
339348
if connitem == nil || connitem.conn == nil || connitem.srvtp == nil {
340-
return 0, fmt.Errorf("ERROR in GoReupsRoutine: ConnGet got nil item or conn '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
349+
return 0, fmt.Errorf("error in GoReupsRoutine: ConnGet got nil item or conn '%s' connitem='%v' sharedCC='%v' err='%v'", provider.Name, connitem, sharedCC, err)
341350
}
342351

343352
var uploaded, unwanted, retry bool
@@ -523,7 +532,7 @@ func (s *SESSION) StopRoutines() {
523532
}
524533
// pushing nil into the segment chans will stop the routines
525534
for _, provider := range s.providerList {
526-
closeSegmentChannel(s.segmentChansCheck[provider.Group])
535+
//closeSegmentChannel(s.segmentChansCheck[provider.Group])
527536
closeSegmentChannel(s.segmentChansDowns[provider.Group])
528537
closeSegmentChannel(s.segmentChansReups[provider.Group])
529538

Session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (p *PROCESSOR) LaunchSession(s *SESSION, nzbfilepath string, waitSession *s
258258
make(map[int]bool, len(s.providerList)),
259259
[]string{}, []string{}, []string{}, // []string fields
260260
false, false, false, false, false, false, false, false, false, // bool fields
261-
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} // int fields
261+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} // int fields
262262
if len(segment.Id) < 3 { // a@a
263263
dlog(always, "ERROR LaunchSession: segment.Id='%s' is too short! file='%s'", segment.Id, file.Filename)
264264
continue

Workers.go

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (s *SESSION) GoBootWorkers(waitDivider *sync.WaitGroup, workerWGconnReady *
104104
// the check routine may still be activly checking!
105105
s.checkFeedDone = true
106106
s.mux.Unlock()
107+
close(segmentChanCheck) // close the chan to signal no more items will come
107108
}(s.segmentChansCheck[provider.Group])
108109
}
109110
globalmux.Unlock()
@@ -185,6 +186,7 @@ func (s *SESSION) GoWorker(wid int, provider *Provider, waitWorker *sync.WaitGro
185186
segCC := s.segmentChansCheck[provider.Group]
186187
go func(wid int, provider *Provider, waitWorker *sync.WaitGroup, sharedCC chan *ConnItem, segCC chan *segmentChanItem) {
187188
defer waitWorker.Done()
189+
defer dlog(always, "CheckRoutine: wid=%d provider='%s' exiting", wid, provider.Name)
188190
forGoCheckRoutine:
189191
for {
190192
item, ok := <-segCC
@@ -216,6 +218,7 @@ func (s *SESSION) GoWorker(wid int, provider *Provider, waitWorker *sync.WaitGro
216218
segCD := s.segmentChansDowns[provider.Group]
217219
go func(wid int, provider *Provider, waitWorker *sync.WaitGroup, sharedCC chan *ConnItem, segCD chan *segmentChanItem) {
218220
defer waitWorker.Done()
221+
defer dlog(always, "GoDownsRoutine: wid=%d provider='%s' exiting", wid, provider.Name)
219222
forGoDownsRoutine:
220223
for {
221224
dlog(cfg.opt.DebugWorker, "GoDownsRoutine: wid=%d provider='%s' wait on segCD len=%d", wid, provider.Name, len(segCD))
@@ -232,7 +235,7 @@ func (s *SESSION) GoWorker(wid int, provider *Provider, waitWorker *sync.WaitGro
232235

233236
start := time.Now()
234237
who := fmt.Sprintf("DR=%d@'%s'#'%s' seg.Id='%s'", wid, provider.Name, provider.Group, item.segment.Id)
235-
memlim.MemLockWait(item, who)
238+
memlim.MemLockWait(item, who) // gets memlock here
236239
dlog(cfg.opt.DebugWorker && cfg.opt.DebugMemlim, "GoDownsRoutine got MemCheckWait who='%s' waited=(%d ms)", who, time.Since(start).Milliseconds())
237240
errStr := ""
238241
StartDowns := time.Now()
@@ -274,6 +277,7 @@ func (s *SESSION) GoWorker(wid int, provider *Provider, waitWorker *sync.WaitGro
274277
segCR := s.segmentChansReups[provider.Group]
275278
go func(wid int, provider *Provider, waitWorker *sync.WaitGroup, sharedCC chan *ConnItem, segCR chan *segmentChanItem) {
276279
defer waitWorker.Done()
280+
defer dlog(always, "ReupsRoutine: wid=%d provider='%s' exiting", wid, provider.Name)
277281
forGoReupsRoutine:
278282
for {
279283
item, ok := <-segCR
@@ -304,7 +308,7 @@ func (s *SESSION) GoWorker(wid int, provider *Provider, waitWorker *sync.WaitGro
304308
item.PrintItemFlags(cfg.opt.DebugFlags, true, fmt.Sprintf("post-GoReupsRoutine: code=%d", code))
305309

306310
DecreaseUPQueueCnt()
307-
if err != nil {
311+
if err != nil || code == 0 {
308312
errStr := fmt.Sprintf("ERROR in GoReupsRoutine code='%d' err='%v' no retry", code, err)
309313
dlog(always, "%s", errStr)
310314
memlim.MemReturn("MemRetOnERR: "+errStr, item) // memfree GoReupsRoutine on error
@@ -362,9 +366,17 @@ func matchThisDL(item *segmentChanItem) bool {
362366
} // end func matchThisDL
363367

364368
func (item *segmentChanItem) FlagError(providerId int) {
369+
log.Printf("FlagError called for providerId=%d on segment.Id='%s' waiting to lock item mutex", providerId, item.segment.Id)
365370
item.mux.Lock()
366371
defer item.mux.Unlock()
372+
item.ignoreDlOn[providerId] = true
373+
item.ignoreUlOn[providerId] = true
367374
item.errorOn[providerId] = true
375+
item.flaginUP = false
376+
item.flaginDL = false
377+
item.fails++
378+
delete(item.availableOn, providerId)
379+
dlog(always, "Flagged item error, will not retry '%s' on providerId=%d", item.segment.Id, providerId)
368380
}
369381

370382
func (item *segmentChanItem) FlagDLFailed(providerList []*Provider, providerGroup string) {
@@ -377,6 +389,7 @@ func (item *segmentChanItem) FlagDLFailed(providerList []*Provider, providerGrou
377389
item.ignoreDlOn[id] = true
378390
item.missingOn[id] = true
379391
item.errorOn[id] = true
392+
delete(item.availableOn, id)
380393
}
381394
}
382395

@@ -418,7 +431,7 @@ func (s *SESSION) pushDL(allowDl bool, item *segmentChanItem) (pushed bool, nodl
418431
return false, 1, nil // not a match, item is already in DL or UP or has article
419432
}
420433
if !memlim.MemAvail() {
421-
return
434+
return false, 1, nil // not enough memory available to download
422435
}
423436
// if we are here, we are allowed to push the item to download queue
424437
// loop over the availableOn map and check if we can push the item to download
@@ -436,6 +449,10 @@ providerDl:
436449
dlog(cfg.opt.DebugWorker, " | [DV-pushDL] (nodl) item missingOn but should be avail!? seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
437450
continue providerDl
438451
}
452+
if item.errorOn[pid] {
453+
dlog(cfg.opt.DebugWorker, " | [DV-pushDL] (nodl) item errorOn seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
454+
continue providerDl
455+
}
439456
if s.providerList[pid].NoDownload {
440457
nodl++
441458
item.ignoreDlOn[pid] = true
@@ -464,7 +481,7 @@ providerDl:
464481
item.pushedDL++ // mark as pushed to download queue (in pushDL)
465482
IncreaseDLQueueCnt()
466483
dlog(cfg.opt.DebugWorker, " | [DV-pushDL] pushed to dlchan seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
467-
return // return after 1st push!
484+
return pushed, 0, nil // return after 1st push!
468485
default:
469486
dlog(cfg.opt.BUG, "DEBUG SPAM pushDL: chan is full @ #'%s', retry next", s.providerList[pid].Group)
470487
// chan is full means we cannot push the item to the download queue to this provider group
@@ -475,19 +492,19 @@ providerDl:
475492
} // end select
476493
//}
477494
} // end for providerDl
478-
return
495+
return false, 1, nil
479496
} // end func pushDL
480497

481498
func (s *SESSION) pushUP(allowUp bool, item *segmentChanItem) (pushed bool, noup uint64, inretry uint64, err error) {
482499
if !allowUp {
483-
return
500+
return false, 1, 0, fmt.Errorf("pushUP not allowed")
484501
}
485502

486503
s.mux.RLock()
487504
segcheckdone := s.segcheckdone // get the segcheckdone state
488505
s.mux.RUnlock()
489506
if cfg.opt.CheckFirst && !segcheckdone {
490-
return
507+
return false, 1, 0, nil // still checking, do not push to upload yet
491508
}
492509

493510
item.mux.Lock() // LOCK item for the duration of this function
@@ -514,6 +531,11 @@ providerUp:
514531
dlog(cfg.opt.DebugWorker, " | [DV-pushUP] (noup) flagNoUp seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
515532
continue providerUp
516533
}
534+
if item.ignoreUlOn[pid] {
535+
noup++
536+
dlog(cfg.opt.DebugWorker, " | [DV-pushUP] (noup) ignoreUlOn seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
537+
continue providerUp
538+
}
517539
if item.uploadedTo[pid] {
518540
noup++
519541
dlog(cfg.opt.DebugWorker, " | [DV-pushUP] (noup) uploadedTo seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
@@ -534,6 +556,11 @@ providerUp:
534556
dlog(cfg.opt.DebugWorker, " | [DV-pushUP] (noup) dmcaOn seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
535557
continue providerUp
536558
}
559+
if item.errorOn[pid] {
560+
noup++
561+
dlog(cfg.opt.DebugWorker, " | [DV-pushUP] (noup) errorOn seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
562+
continue providerUp
563+
}
537564
if item.retryOn[pid] {
538565
if item.retryIn > time.Now().Unix() {
539566
inretry++
@@ -557,7 +584,7 @@ providerUp:
557584
IncreaseUPQueueCnt() // increment upQueueCnt counter
558585
//GCounter.IncrMax("upQueueCnt", uint64(len(s.segmentList)), "pushUP")
559586
//GCounter.IncrMax("TOTAL_upQueueCnt", uint64(len(s.segmentList)), "pushUP")
560-
dlog(cfg.opt.DebugWorker, " | pushUP: in chan seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
587+
dlog(always, " | pushUP: in chan seg.Id='%s' @ #'%s'", item.segment.Id, s.providerList[pid].Group)
561588
return true, 0, 0, nil // return after 1st push to a group!
562589
default:
563590
//dlog(cfg.opt.BUG "DEBUG SPAM pushUP: chan is full @ '%s'", s.providerList[pid].Name)
@@ -692,36 +719,33 @@ forever:
692719
}
693720
item.mux.RUnlock() // RUNLOCKS HERE #824d
694721

695-
if !testing {
696-
pushedUp, nNoUp, nInRetry, a1err := s.pushUP(allowUp, item)
697-
if a1err != nil {
698-
dlog(always, "ERROR pushUP err='%v' (seg.Id='%s')", a1err, item.segment.Id)
722+
pushedUp, nNoUp, nInRetry, a1err := s.pushUP(allowUp, item)
723+
if a1err != nil {
724+
dlog(always, "ERROR pushUP err='%v' (seg.Id='%s')", a1err, item.segment.Id)
725+
continue forsegmentList
726+
}
727+
if pushedUp {
728+
inup++
729+
dlog(cfg.opt.DebugWorker, " | [DV] PUSHEDup seg.Id='%s' pushedUp=%t inup=%d", item.segment.Id, pushedUp, inup)
730+
}
731+
noup += nNoUp
732+
inretry += nInRetry
733+
734+
if !pushedUp && allowDl {
735+
pushedDl, nNoDl, b1err := s.pushDL(allowDl, item)
736+
if b1err != nil {
737+
dlog(always, "ERROR pushDL err='%v' (seg.Id='%s')", b1err, item.segment.Id)
699738
continue forsegmentList
700739
}
701-
if pushedUp {
702-
inup++
703-
dlog(cfg.opt.DebugWorker, " | [DV] PUSHEDup seg.Id='%s' pushedUp=%t inup=%d", item.segment.Id, pushedUp, inup)
740+
nodl += nNoDl
741+
Tnodl += uint64(len(item.ignoreDlOn))
742+
if pushedDl {
743+
indl++
744+
//if cfg.opt.BUG {
745+
dlog(cfg.opt.DebugWorker, " | [DV] PUSHEDdl seg.Id='%s' pushedDl=%t indl=%d", item.segment.Id, pushedDl, indl)
746+
//}
704747
}
705-
noup += nNoUp
706-
//Tnoup += len(item.ignoreDlOn)
707-
inretry += nInRetry
708-
709-
if !pushedUp && allowDl {
710-
pushedDl, nNoDl, b1err := s.pushDL(allowDl, item)
711-
if b1err != nil {
712-
dlog(always, "ERROR pushDL err='%v' (seg.Id='%s')", b1err, item.segment.Id)
713-
continue forsegmentList
714-
}
715-
nodl += nNoDl
716-
Tnodl += uint64(len(item.ignoreDlOn))
717-
if pushedDl {
718-
indl++
719-
//if cfg.opt.BUG {
720-
dlog(cfg.opt.DebugWorker, " | [DV] PUSHEDdl seg.Id='%s' pushedDl=%t indl=%d", item.segment.Id, pushedDl, indl)
721-
//}
722-
}
723-
} // end pushDL
724-
} // if !testing {
748+
} // end pushDL
725749

726750
} // end for forsegmentList
727751
//dlog(cfg.opt.DebugWorker, " | [DV] lastRunTook='%d ms' '%v", lastRunTook.Milliseconds(), lastRunTook)

0 commit comments

Comments
 (0)