Skip to content

Commit ba3538c

Browse files
committed
conn timeouts
1 parent 793f153 commit ba3538c

File tree

6 files changed

+281
-137
lines changed

6 files changed

+281
-137
lines changed

Config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ type Provider struct {
132132
Username string // username for authentication
133133
Password string // password for authentication
134134
MaxConns int // maximum number of connections to the provider
135+
OpenConns int // current number of open connections
135136
TCPMode string // TCP mode to use (tcp, tcp4, tcp6)
136137
PreferIHAVE bool // if true, prefer IHAVE over POST method
137138
MaxConnErrors int // maximum number of errors before giving up on a connection
@@ -164,6 +165,18 @@ type Provider struct {
164165
}
165166
} // end Provider struct
166167

168+
func (p *Provider) IncrementOpenConns() {
169+
p.mux.Lock()
170+
p.OpenConns++
171+
p.mux.Unlock()
172+
}
173+
174+
func (p *Provider) DecrementOpenConns() {
175+
p.mux.Lock()
176+
p.OpenConns--
177+
p.mux.Unlock()
178+
}
179+
167180
type segmentChanItem struct {
168181
// segmentChanItem is used to store information about a segment/article
169182
// that is being processed in the segment channel.

MemLimit.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package main
1515
*/
1616

1717
import (
18+
"log"
1819
"os"
1920

2021
"github.com/go-while/go-loggedrwmutex"
@@ -131,7 +132,6 @@ func (m *MemLimiter) MemReturn(who string, item *segmentChanItem) {
131132
return // not memlocked, nothing to do
132133
}
133134
item.mux.RUnlock()
134-
defer GCounter.Incr("TOTAL_MemReturned")
135135

136136
// remove map entry from mem
137137
m.mux.Lock()
@@ -141,11 +141,11 @@ func (m *MemLimiter) MemReturn(who string, item *segmentChanItem) {
141141
// return the slot
142142
select {
143143
case m.memchan <- struct{}{}: // return mem slot into chan
144-
//pass
144+
GCounter.Incr("TOTAL_MemReturned")
145145
default:
146146
// wtf chan is full?? that's a bug!
147-
dlog(always, "ERROR on MemReturn chan is full seg.Id='%s' who='%s'", item.segment.Id, who)
148-
os.Exit(1) // this is a bug! we should never return a slot to a full chan!
147+
// this is a bug! we should never return a slot to a full chan!
148+
log.Fatalf("ERROR on MemReturn chan is full seg.Id='%s' who='%s'", item.segment.Id, who)
149149
}
150150
item.mux.Lock()
151151
item.memlocked--

Routines.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ func (s *SESSION) GoReupsRoutine(wid int, provider *Provider, item *segmentChanI
447447
}
448448
item.flaginUP = false
449449
item.flagisUP = true
450+
item.fails = 0
450451
item.mux.Unlock()
451452
// update provider statistics
452453
provider.mux.Lock() // mutex #87c9 articles.refreshed++

Session.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type SESSION struct {
8383
checkFeedDone bool // checkDone is set to true when the segment feeder has finished feeding to channel, check may still be activly running!
8484
segcheckdone bool // segcheckdone is set to true when the segment check is done
8585
proxy bool // flag to signal proxy is used in LaunchSession
86+
bootedWorkers int // counts how many workers have been booted
8687
} // end type SESSION struct
8788

8889
func (p *PROCESSOR) NewProcessor() error {

0 commit comments

Comments
 (0)