mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-03 15:17:25 +00:00
parent
91922b6dc8
commit
16f4921c50
@ -30,9 +30,10 @@ type Service struct {
|
||||
|
||||
id protocol.DeviceID
|
||||
cfg config.Wrapper
|
||||
processScheduled chan struct{}
|
||||
|
||||
mappings []*Mapping
|
||||
timer *time.Timer
|
||||
enabled bool
|
||||
mut sync.RWMutex
|
||||
}
|
||||
|
||||
@ -40,12 +41,14 @@ func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service {
|
||||
s := &Service{
|
||||
id: id,
|
||||
cfg: cfg,
|
||||
processScheduled: make(chan struct{}, 1),
|
||||
|
||||
timer: time.NewTimer(0),
|
||||
mut: sync.NewRWMutex(),
|
||||
}
|
||||
s.Service = util.AsService(s.serve, s.String())
|
||||
cfg.Subscribe(s)
|
||||
cfgCopy := cfg.RawCopy()
|
||||
s.CommitConfiguration(cfgCopy, cfgCopy)
|
||||
return s
|
||||
}
|
||||
|
||||
@ -54,19 +57,16 @@ func (s *Service) VerifyConfiguration(from, to config.Configuration) error {
|
||||
}
|
||||
|
||||
func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
|
||||
if !from.Options.NATEnabled && to.Options.NATEnabled {
|
||||
s.mut.Lock()
|
||||
if !s.enabled && to.Options.NATEnabled {
|
||||
l.Debugln("Starting NAT service")
|
||||
s.timer.Reset(0)
|
||||
s.mut.Unlock()
|
||||
} else if from.Options.NATEnabled && !to.Options.NATEnabled {
|
||||
s.mut.Lock()
|
||||
s.enabled = true
|
||||
s.scheduleProcess()
|
||||
} else if s.enabled && !to.Options.NATEnabled {
|
||||
l.Debugln("Stopping NAT service")
|
||||
if !s.timer.Stop() {
|
||||
<-s.timer.C
|
||||
s.enabled = false
|
||||
}
|
||||
s.mut.Unlock()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@ -78,14 +78,33 @@ func (s *Service) Stop() {
|
||||
func (s *Service) serve(ctx context.Context) {
|
||||
announce := stdsync.Once{}
|
||||
|
||||
s.mut.Lock()
|
||||
s.timer.Reset(0)
|
||||
s.mut.Unlock()
|
||||
timer := time.NewTimer(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.timer.C:
|
||||
if found := s.process(ctx); found != -1 {
|
||||
case <-timer.C:
|
||||
case <-s.processScheduled:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
s.mut.RLock()
|
||||
for _, mapping := range s.mappings {
|
||||
mapping.clearAddresses()
|
||||
}
|
||||
s.mut.RUnlock()
|
||||
return
|
||||
}
|
||||
s.mut.RLock()
|
||||
enabled := s.enabled
|
||||
s.mut.RUnlock()
|
||||
if !enabled {
|
||||
continue
|
||||
}
|
||||
found, renewIn := s.process(ctx)
|
||||
timer.Reset(renewIn)
|
||||
if found != -1 {
|
||||
announce.Do(func() {
|
||||
suffix := "s"
|
||||
if found == 1 {
|
||||
@ -94,19 +113,10 @@ func (s *Service) serve(ctx context.Context) {
|
||||
l.Infoln("Detected", found, "NAT service"+suffix)
|
||||
})
|
||||
}
|
||||
case <-ctx.Done():
|
||||
s.timer.Stop()
|
||||
s.mut.RLock()
|
||||
for _, mapping := range s.mappings {
|
||||
mapping.clearAddresses()
|
||||
}
|
||||
s.mut.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) process(ctx context.Context) int {
|
||||
func (s *Service) process(ctx context.Context) (int, time.Duration) {
|
||||
// toRenew are mappings which are due for renewal
|
||||
// toUpdate are the remaining mappings, which will only be updated if one of
|
||||
// the old IGDs has gone away, or a new IGD has appeared, but only if we
|
||||
@ -131,21 +141,11 @@ func (s *Service) process(ctx context.Context) int {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Reset the timer while holding the lock, because of the following race:
|
||||
// T1: process acquires lock
|
||||
// T1: process checks the mappings and gets next renewal time in 30m
|
||||
// T2: process releases the lock
|
||||
// T2: NewMapping acquires the lock
|
||||
// T2: NewMapping adds mapping
|
||||
// T2: NewMapping releases the lock
|
||||
// T2: NewMapping resets timer to 1s
|
||||
// T1: process resets timer to 30
|
||||
s.timer.Reset(renewIn)
|
||||
s.mut.RUnlock()
|
||||
|
||||
// Don't do anything, unless we really need to renew
|
||||
if len(toRenew) == 0 {
|
||||
return -1
|
||||
return -1, renewIn
|
||||
}
|
||||
|
||||
nats := discoverAll(ctx, time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second)
|
||||
@ -158,7 +158,14 @@ func (s *Service) process(ctx context.Context) int {
|
||||
s.updateMapping(ctx, mapping, nats, false)
|
||||
}
|
||||
|
||||
return len(nats)
|
||||
return len(nats), renewIn
|
||||
}
|
||||
|
||||
func (s *Service) scheduleProcess() {
|
||||
select {
|
||||
case s.processScheduled <- struct{}{}: // 1-buffered
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) NewMapping(protocol Protocol, ip net.IP, port int) *Mapping {
|
||||
@ -174,9 +181,8 @@ func (s *Service) NewMapping(protocol Protocol, ip net.IP, port int) *Mapping {
|
||||
|
||||
s.mut.Lock()
|
||||
s.mappings = append(s.mappings, mapping)
|
||||
// Reset the timer while holding the lock, see process() for explanation
|
||||
s.timer.Reset(time.Second)
|
||||
s.mut.Unlock()
|
||||
s.scheduleProcess()
|
||||
|
||||
return mapping
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user