Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: Be more stringent with timer resets/stops (ref #9417) #9422

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/stdiscosrv/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ loop:

case <-statisticsDone:
// The statistics routine is done with one iteratation, schedule
// the next.
// the next. Guaranteed to happen after a read from t.C above.
t.Reset(databaseStatisticsInterval)

case <-ctx.Done():
Expand Down
13 changes: 7 additions & 6 deletions cmd/strelaysrv/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/timeutil"
"github.com/syncthing/syncthing/lib/tlsutil"

"github.com/syncthing/syncthing/lib/relay/protocol"
Expand Down Expand Up @@ -111,14 +112,14 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config, token strin

pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()
timeoutTicker := time.NewTimer(networkTimeout)
defer timeoutTicker.Stop()
timeoutTimer := time.NewTimer(networkTimeout)
defer timeoutTimer.Stop()
joined := false

for {
select {
case message := <-messages:
timeoutTicker.Reset(networkTimeout)
timeutil.ResetTimer(timeoutTimer, networkTimeout)
if debug {
log.Printf("Message %T from %s", message, id)
}
Expand All @@ -140,7 +141,7 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config, token strin
log.Println("Refusing join request from", id, "due to being over limits")
}
conn.Close()
limitCheckTimer.Reset(time.Second)
timeutil.ResetTimer(limitCheckTimer, time.Second)
continue
}

Expand Down Expand Up @@ -280,10 +281,10 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config, token strin
protocol.WriteMessage(conn, protocol.RelayFull{})
conn.Close()

limitCheckTimer.Reset(time.Second)
timeutil.ResetTimer(limitCheckTimer, time.Second)
}

case <-timeoutTicker.C:
case <-timeoutTimer.C:
// We should receive a error from the reader loop, which will cause
// us to quit this loop.
if debug {
Expand Down
9 changes: 5 additions & 4 deletions cmd/syncthing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/syncthing/syncthing/lib/syncthing"
"github.com/syncthing/syncthing/lib/timeutil"
"github.com/syncthing/syncthing/lib/upgrade"
)

Expand Down Expand Up @@ -791,7 +792,7 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger)

opts := cfg.Options()
if !opts.AutoUpgradeEnabled() {
timer.Reset(upgradeCheckInterval)
timeutil.ResetTimer(timer, upgradeCheckInterval)
continue
}

Expand All @@ -805,21 +806,21 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger)
// Don't complain too loudly here; we might simply not have
// internet connectivity, or the upgrade server might be down.
l.Infoln("Automatic upgrade:", err)
timer.Reset(checkInterval)
timeutil.ResetTimer(timer, checkInterval)
continue
}

if upgrade.CompareVersions(rel.Tag, build.Version) != upgrade.Newer {
// Skip equal, older or majorly newer (incompatible) versions
timer.Reset(checkInterval)
timeutil.ResetTimer(timer, checkInterval)
continue
}

l.Infof("Automatic upgrade (current %q < latest %q)", build.Version, rel.Tag)
err = upgrade.To(rel)
if err != nil {
l.Warnln("Automatic upgrade:", err)
timer.Reset(checkInterval)
timeutil.ResetTimer(timer, checkInterval)
continue
}
sub.Unsubscribe()
Expand Down
20 changes: 14 additions & 6 deletions cmd/syncthing/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/timeutil"
)

var (
Expand Down Expand Up @@ -454,7 +455,7 @@ type autoclosedFile struct {
fd io.WriteCloser // underlying WriteCloser
opened time.Time // timestamp when the file was last opened
closed chan struct{} // closed on Close(), stops the closerLoop
closeTimer *time.Timer // fires closeDelay after a write
delayClose chan struct{} // delays the close for another interval

mut sync.Mutex
}
Expand All @@ -466,7 +467,7 @@ func newAutoclosedFile(name string, closeDelay, maxOpenTime time.Duration) (*aut
maxOpenTime: maxOpenTime,
mut: sync.NewMutex(),
closed: make(chan struct{}),
closeTimer: time.NewTimer(time.Minute),
delayClose: make(chan struct{}, 1),
}
f.mut.Lock()
defer f.mut.Unlock()
Expand All @@ -489,7 +490,10 @@ func (f *autoclosedFile) Write(bs []byte) (int, error) {
// If we haven't run into the maxOpenTime, postpone close for another
// closeDelay
if time.Since(f.opened) < f.maxOpenTime {
f.closeTimer.Reset(f.closeDelay)
select {
case f.delayClose <- struct{}{}:
default:
}
}

return f.fd.Write(bs)
Expand All @@ -499,8 +503,6 @@ func (f *autoclosedFile) Close() error {
f.mut.Lock()
defer f.mut.Unlock()

// Stop the timer and closerLoop() routine
f.closeTimer.Stop()
close(f.closed)

// Close the file, if it's open
Expand Down Expand Up @@ -532,9 +534,15 @@ func (f *autoclosedFile) ensureOpenLocked() error {
}

func (f *autoclosedFile) closerLoop() {
closeTimer := time.NewTimer(time.Minute)
defer closeTimer.Stop()

for {
select {
case <-f.closeTimer.C:
case <-f.delayClose:
timeutil.ResetTimer(closeTimer, f.closeDelay)

case <-closeTimer.C:
// Close the file when the timer expires.
f.mut.Lock()
if f.fd != nil {
Expand Down
11 changes: 10 additions & 1 deletion lib/api/tokenmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/timeutil"
)

type tokenManager struct {
Expand Down Expand Up @@ -122,7 +123,15 @@ func (m *tokenManager) saveLocked() {
if m.saveTimer == nil {
m.saveTimer = time.AfterFunc(time.Second, m.scheduledSave)
} else {
m.saveTimer.Reset(time.Second)
// Since we are under a lock and the scheduled function takes the
// same lock to nil out the timer, we know the function hasn't run
// yet. Hence it's safe to reset the timer with one of two possible
// outcomes: either we were safely in the waiting period and the
// call gets postponed as it should be, or the timer has triggered
// but not yet run the function in which case it will run now (when
// we release the lock) and then again in a second after the reset,
// which is not a problem.
timeutil.ResetTimer(m.saveTimer, time.Second)
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/config/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (w *wrapper) Serve(ctx context.Context) error {

var e modifyEntry
saveTimer := time.NewTimer(0)
defer saveTimer.Stop()
<-saveTimer.C
saveTimerRunning := false
for {
Expand Down
1 change: 1 addition & 0 deletions lib/connections/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (s *service) connect(ctx context.Context) error {
timeout.Stop()
case <-timeout.C:
case <-ctx.Done():
timeout.Stop()
return ctx.Err()
}
}
Expand Down
17 changes: 9 additions & 8 deletions lib/discover/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/timeutil"
"golang.org/x/net/http2"
)

Expand Down Expand Up @@ -231,15 +232,15 @@ func (c *globalClient) Serve(ctx context.Context) error {
if timerResetCount < maxAddressChangesBetweenAnnouncements {
// Defer announcement by 2 seconds, essentially debouncing
// if we have a stream of events incoming in quick succession.
timer.Reset(2 * time.Second)
timeutil.ResetTimer(timer, 2*time.Second)
} else if timerResetCount == maxAddressChangesBetweenAnnouncements {
// Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row,
// so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop.
l.Warnf("Detected a flip-flopping listener")
c.setError(errors.New("flip flopping listener"))
// Incrementing the count above 10 will prevent us from warning or setting the error again
// It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval
timer.Reset(announceErrorRetryInterval)
timeutil.ResetTimer(timer, announceErrorRetryInterval)
}
timerResetCount++
case <-timer.C:
Expand All @@ -263,7 +264,7 @@ func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer)
// yet still using global discovery for lookups. Do not error out
// here.
c.setError(nil)
timer.Reset(announceErrorRetryInterval)
timeutil.ResetTimer(timer, announceErrorRetryInterval)
return
}

Expand All @@ -276,7 +277,7 @@ func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer)
if err != nil {
l.Debugln(c, "announce POST:", err)
c.setError(err)
timer.Reset(announceErrorRetryInterval)
timeutil.ResetTimer(timer, announceErrorRetryInterval)
return
}
l.Debugln(c, "announce POST:", resp.Status)
Expand All @@ -291,12 +292,12 @@ func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer)
// retry. Follow it.
if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
l.Debugln(c, "announce Retry-After:", secs, err)
timer.Reset(time.Duration(secs) * time.Second)
timeutil.ResetTimer(timer, time.Duration(secs)*time.Second)
return
}
}

timer.Reset(announceErrorRetryInterval)
timeutil.ResetTimer(timer, announceErrorRetryInterval)
return
}

Expand All @@ -307,12 +308,12 @@ func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer)
// reannounce. Follow it.
if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
l.Debugln(c, "announce Reannounce-After:", secs, err)
timer.Reset(time.Duration(secs) * time.Second)
timeutil.ResetTimer(timer, time.Duration(secs)*time.Second)
return
}
}

timer.Reset(defaultReannounceInterval)
timeutil.ResetTimer(timer, defaultReannounceInterval)
}

func (*globalClient) Cache() map[protocol.DeviceID]CacheEntry {
Expand Down