From 8fa355f0672c14238a1ac9de2e9dbb6bd04dc140 Mon Sep 17 00:00:00 2001 From: Haris Khan Date: Wed, 25 Sep 2024 20:08:23 -0400 Subject: [PATCH 1/3] !WIP! router_timestamper --- go.mod | 2 + go.sum | 4 + lib/util/time/sntp/continents.txt | 249 ++++++++++++++ lib/util/time/sntp/doc.md | 128 +++++++ lib/util/time/sntp/router_timestamper.go | 322 ++++++++++++++++++ lib/util/time/sntp/router_timestamper_test.go | 202 +++++++++++ lib/util/time/sntp/update_listener.go | 8 + lib/util/time/sntp/zones.go | 90 +++++ 8 files changed, 1005 insertions(+) create mode 100644 lib/util/time/sntp/continents.txt create mode 100644 lib/util/time/sntp/doc.md create mode 100644 lib/util/time/sntp/router_timestamper.go create mode 100644 lib/util/time/sntp/router_timestamper_test.go create mode 100644 lib/util/time/sntp/update_listener.go create mode 100644 lib/util/time/sntp/zones.go diff --git a/go.mod b/go.mod index 504fcc6..82f3d04 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22 toolchain go1.22.5 require ( + github.com/beevik/ntp v1.4.3 github.com/eyedeekay/go-unzip v0.0.0-20240201194209-560d8225b50e github.com/flynn/noise v1.1.0 github.com/sirupsen/logrus v1.9.3 @@ -17,6 +18,7 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.24.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 794a04b..2c4c76d 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho= +github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,6 +28,8 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= diff --git a/lib/util/time/sntp/continents.txt b/lib/util/time/sntp/continents.txt new file mode 100644 index 0000000..13034bc --- /dev/null +++ b/lib/util/time/sntp/continents.txt @@ -0,0 +1,249 @@ +AD,EU +AE,AS +AF,AS +AG,NA +AI,NA +AL,EU +AM,AS +AO,AF +AQ,AN +AR,SA +AS,OC +AT,EU +AU,OC +AW,NA +AX,EU +AZ,AS +BA,EU +BB,NA +BD,AS +BE,EU +BF,AF +BG,EU +BH,AS +BI,AF +BJ,AF +BL,NA +BM,NA +BN,AS +BO,SA +BQ,NA +BR,SA +BS,NA +BT,AS +BV,AN +BW,AF +BY,EU +BZ,NA +CA,NA +CC,AS +CD,AF +CF,AF +CG,AF +CH,EU +CI,AF +CK,OC +CL,SA +CM,AF +CN,AS +CO,SA +CR,NA +CU,NA +CV,AF +CW,NA +CX,AS +CY,AS +CZ,EU +DE,EU +DJ,AF +DK,EU +DM,NA +DO,NA +DZ,AF +EC,SA +EE,EU +EG,AF +EH,AF +ER,AF +ES,EU +ET,AF +FI,EU +FJ,OC +FK,SA +FM,OC +FO,EU +FR,EU +GA,AF +GB,EU +GD,NA +GE,AS +GF,SA +GG,EU +GH,AF +GI,EU +GL,NA +GM,AF +GN,AF +GP,NA +GQ,AF +GR,EU +GS,AN +GT,NA +GU,OC +GW,AF +GY,SA +HK,AS +HM,AN +HN,NA +HR,EU +HT,NA +HU,EU +ID,AS +IE,EU +IL,AS +IM,EU +IN,AS +IO,AS +IQ,AS +IR,AS +IS,EU +IT,EU +JE,EU +JM,NA +JO,AS +JP,AS +KE,AF +KG,AS +KH,AS +KI,OC +KM,AF +KN,NA +KP,AS +KR,AS +KW,AS +KY,NA +KZ,AS +LA,AS +LB,AS +LC,NA +LI,EU +LK,AS +LR,AF +LS,AF +LT,EU +LU,EU +LV,EU +LY,AF +MA,AF +MC,EU +MD,EU +ME,EU +MF,NA +MG,AF +MH,OC +MK,EU +ML,AF +MM,AS +MN,AS +MO,AS +MP,OC +MQ,NA +MR,AF +MS,NA +MT,EU +MU,AF +MV,AS +MW,AF +MX,NA +MY,AS +MZ,AF +NA,AF +NC,OC +NE,AF +NF,OC +NG,AF +NI,NA +NL,EU +NO,EU +NP,AS +NR,OC +NU,OC +NZ,OC +OM,AS +PA,NA +PE,SA +PF,OC +PG,OC +PH,AS +PK,AS +PL,EU +PM,NA +PN,OC +PR,NA +PS,AS +PT,EU +PW,OC +PY,SA +QA,AS +RE,AF +RO,EU +RS,EU +RU,EU +RW,AF +SA,AS +SB,OC +SC,AF +SD,AF +SE,EU +SG,AS +SH,AF +SI,EU +SJ,EU +SK,EU +SL,AF +SM,EU +SN,AF +SO,AF +SR,SA +SS,AF +ST,AF +SV,NA +SX,NA +SY,AS +SZ,AF +TC,NA +TD,AF +TF,AN +TG,AF +TH,AS +TJ,AS +TK,OC +TL,AS +TM,AS +TN,AF +TO,OC +TR,AS +TT,NA +TV,OC +TW,AS +TZ,AF +UA,EU +UG,AF +UM,OC +US,NA +UY,SA +UZ,AS +VA,EU +VC,NA +VE,SA +VG,NA +VI,NA +VN,AS +VU,OC +WF,OC +WS,OC +YE,AS +YT,AF +ZA,AF +ZM,AF +ZW,AF \ No newline at end of file diff --git a/lib/util/time/sntp/doc.md b/lib/util/time/sntp/doc.md new file mode 100644 index 0000000..6b8224f --- /dev/null +++ b/lib/util/time/sntp/doc.md @@ -0,0 +1,128 @@ +# sntp +-- + import "github.com/go-i2p/go-i2p/lib/util/sntp" + +## Usage + +```go +import "github.com/go-i2p/go-i2p/lib/util/sntp" +``` + +## Types + +### type RouterTimestamper + +```go +type RouterTimestamper struct { + servers []string + priorityServers [][]string + listeners []UpdateListener + queryFrequency time.Duration + concurringServers int + consecutiveFails int + disabled bool + initialized bool + wellSynced bool + isRunning bool + mutex sync.Mutex + zones *Zones + stopChan chan struct{} + waitGroup sync.WaitGroup + ntpClient NTPClient +} +``` + +RouterTimestamper is responsible for querying NTP servers and managing time synchronization. + +#### func NewRouterTimestamper + +```go +func NewRouterTimestamper(client NTPClient) *RouterTimestamper +``` + +NewRouterTimestamper creates a new RouterTimestamper instance. + +#### func (*RouterTimestamper) Start + +```go +func (rt *RouterTimestamper) Start() +``` + +Start begins the time synchronization process. + +#### func (*RouterTimestamper) Stop + +```go +func (rt *RouterTimestamper) Stop() +``` + +Stop halts the time synchronization process. + +#### func (*RouterTimestamper) AddListener + +```go +func (rt *RouterTimestamper) AddListener(listener UpdateListener) +``` + +AddListener adds a new listener for time updates. + +#### func (*RouterTimestamper) RemoveListener + +```go +func (rt *RouterTimestamper) RemoveListener(listener UpdateListener) +``` + +RemoveListener removes a listener from receiving time updates. + +#### func (*RouterTimestamper) WaitForInitialization + +```go +func (rt *RouterTimestamper) WaitForInitialization() +``` + +WaitForInitialization blocks until the RouterTimestamper is initialized or a timeout occurs. + +#### func (*RouterTimestamper) TimestampNow + +```go +func (rt *RouterTimestamper) TimestampNow() +``` + +TimestampNow triggers an immediate time synchronization. + +### type UpdateListener + +```go +type UpdateListener interface { + SetNow(now time.Time, stratum uint8) +} +``` + +UpdateListener is an interface that listeners must implement to receive time updates. + +### type Zones + +```go +type Zones struct { + countryToZone map[string]string + continentToZone map[string]string +} +``` + +Zones manages mappings between country codes, continent codes, and NTP zones. + +#### func NewZones + +```go +func NewZones() *Zones +``` + +NewZones creates a new Zones instance and initializes it with data. + +#### func (*Zones) GetZone + +```go +func (z *Zones) GetZone(countryCode string) string +``` + +GetZone returns the NTP zone for a given country code. \ No newline at end of file diff --git a/lib/util/time/sntp/router_timestamper.go b/lib/util/time/sntp/router_timestamper.go new file mode 100644 index 0000000..c445b0d --- /dev/null +++ b/lib/util/time/sntp/router_timestamper.go @@ -0,0 +1,322 @@ +package sntp + +import ( + "fmt" + "math/rand" + "net" + "strings" + "sync" + "time" + + "github.com/beevik/ntp" +) + +type NTPClient interface { + QueryWithOptions(host string, options ntp.QueryOptions) (*ntp.Response, error) +} + +type DefaultNTPClient struct{} + +func (c *DefaultNTPClient) QueryWithOptions(host string, options ntp.QueryOptions) (*ntp.Response, error) { + return ntp.QueryWithOptions(host, options) +} + +type RouterTimestamper struct { + servers []string + priorityServers [][]string + listeners []UpdateListener + queryFrequency time.Duration + concurringServers int + consecutiveFails int + disabled bool + initialized bool + wellSynced bool + isRunning bool + mutex sync.Mutex + zones *Zones + stopChan chan struct{} + waitGroup sync.WaitGroup + ntpClient NTPClient +} + +const ( + minQueryFrequency = 5 * time.Minute + defaultQueryFrequency = 11 * time.Minute + defaultServerList = "0.pool.ntp.org,1.pool.ntp.org,2.pool.ntp.org" + defaultDisabled = false + defaultConcurring = 3 + maxConsecutiveFails = 10 + defaultTimeout = 10 * time.Second + shortTimeout = 5 * time.Second + maxWaitInitialization = 45 * time.Second + maxVariance = 10 * time.Second +) + +func NewRouterTimestamper(client NTPClient) *RouterTimestamper { + rt := &RouterTimestamper{ + listeners: []UpdateListener{}, + disabled: defaultDisabled, + queryFrequency: defaultQueryFrequency, + concurringServers: defaultConcurring, + zones: NewZones(), + stopChan: make(chan struct{}), + ntpClient: client, + } + rt.updateConfig() + return rt +} + +func (rt *RouterTimestamper) Start() { + if rt.disabled || rt.initialized { + return + } + rt.isRunning = true + rt.waitGroup.Add(1) + go rt.run() +} + +func (rt *RouterTimestamper) Stop() { + if rt.isRunning { + rt.isRunning = false + close(rt.stopChan) + rt.waitGroup.Wait() + } +} + +func (rt *RouterTimestamper) AddListener(listener UpdateListener) { + rt.mutex.Lock() + defer rt.mutex.Unlock() + rt.listeners = append(rt.listeners, listener) +} + +func (rt *RouterTimestamper) RemoveListener(listener UpdateListener) { + rt.mutex.Lock() + defer rt.mutex.Unlock() + for i, l := range rt.listeners { + if l == listener { + rt.listeners = append(rt.listeners[:i], rt.listeners[i+1:]...) + break + } + } +} + +func (rt *RouterTimestamper) WaitForInitialization() { + start := time.Now() + for { + rt.mutex.Lock() + initialized := rt.initialized + rt.mutex.Unlock() + if initialized { + return + } + if time.Since(start) > maxWaitInitialization { + return + } + time.Sleep(100 * time.Millisecond) + } +} + +func (rt *RouterTimestamper) TimestampNow() { + if rt.initialized && rt.isRunning && !rt.disabled { + go rt.runOnce() + } +} + +func (rt *RouterTimestamper) run() { + defer rt.waitGroup.Done() + lastFailed := false + for rt.isRunning { + rt.updateConfig() + preferIPv6 := checkIPv6Connectivity() + if !rt.disabled { + if rt.priorityServers != nil { + for _, servers := range rt.priorityServers { + lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) + if !lastFailed { + break + } + } + } + if rt.priorityServers == nil || lastFailed { + prefIPv6 := preferIPv6 && !lastFailed && rand.Intn(4) != 0 + lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) + } + } + + rt.mutex.Lock() + if !rt.initialized { + rt.initialized = true + } + rt.mutex.Unlock() + + var sleepTime time.Duration + if lastFailed { + rt.consecutiveFails++ + if rt.consecutiveFails >= maxConsecutiveFails { + sleepTime = 30 * time.Minute + } else { + sleepTime = 30 * time.Second + } + } else { + rt.consecutiveFails = 0 + randomDelay := time.Duration(rand.Int63n(int64(rt.queryFrequency / 2))) + sleepTime = rt.queryFrequency + randomDelay + if rt.wellSynced { + sleepTime *= 3 + } + } + + select { + case <-time.After(sleepTime): + case <-rt.stopChan: + return + } + } +} + +func (rt *RouterTimestamper) runOnce() { + lastFailed := false + rt.updateConfig() + preferIPv6 := checkIPv6Connectivity() + if !rt.disabled { + if rt.priorityServers != nil { + for _, servers := range rt.priorityServers { + lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) + if !lastFailed { + break + } + } + } + if rt.priorityServers == nil || lastFailed { + prefIPv6 := preferIPv6 && !lastFailed && rand.Intn(4) != 0 + lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) + } + } + + rt.mutex.Lock() + if !rt.initialized { + rt.initialized = true + } + rt.mutex.Unlock() +} + +func (rt *RouterTimestamper) queryTime(servers []string, timeout time.Duration, preferIPv6 bool) bool { + found := make([]time.Duration, rt.concurringServers) + var expectedDelta time.Duration + rt.wellSynced = false + + for i := 0; i < rt.concurringServers; i++ { + server := servers[rand.Intn(len(servers))] + options := ntp.QueryOptions{ + Timeout: timeout, + //TTL: 5, + } + + if preferIPv6 { + server = fmt.Sprintf("[%s]:123", server) + } + + response, err := rt.ntpClient.QueryWithOptions(server, options) + if err != nil { + fmt.Printf("NTP query failed: %v\n", err) + return false + } + + now := time.Now().Add(response.ClockOffset) + delta := now.Sub(time.Now()) + found[i] = delta + + if i == 0 { + if absDuration(delta) < maxVariance { + if absDuration(delta) < 500*time.Millisecond { + rt.wellSynced = true + } + break + } else { + expectedDelta = delta + } + } else { + if absDuration(delta-expectedDelta) > maxVariance { + // Variance too high, fail this attempt + return false + } + } + } + + rt.stampTime(time.Now().Add(found[0])) + return true +} + +func (rt *RouterTimestamper) stampTime(now time.Time) { + rt.mutex.Lock() + defer rt.mutex.Unlock() + for _, listener := range rt.listeners { + listener.SetNow(now, 0) + } +} + +func (rt *RouterTimestamper) updateConfig() { + serverList := defaultServerList + rt.servers = strings.Split(serverList, ",") + for i, server := range rt.servers { + rt.servers[i] = strings.TrimSpace(server) + } + + if rt.queryFrequency < minQueryFrequency { + rt.queryFrequency = minQueryFrequency + } + + if rt.concurringServers < 1 { + rt.concurringServers = 1 + } else if rt.concurringServers > 4 { + rt.concurringServers = 4 + } + + country := getLocalCountryCode() + if country != "" && country != "a1" && country != "a2" { + rt.priorityServers = [][]string{} + p1 := []string{ + fmt.Sprintf("0.%s.pool.ntp.org", country), + fmt.Sprintf("1.%s.pool.ntp.org", country), + fmt.Sprintf("2.%s.pool.ntp.org", country), + } + rt.priorityServers = append(rt.priorityServers, p1) + zone := rt.zones.GetZone(country) + if zone != "" { + p2 := []string{ + fmt.Sprintf("0.%s.pool.ntp.org", zone), + fmt.Sprintf("1.%s.pool.ntp.org", zone), + fmt.Sprintf("2.%s.pool.ntp.org", zone), + } + rt.priorityServers = append(rt.priorityServers, p2) + } + } else { + rt.priorityServers = nil + } +} + +func checkIPv6Connectivity() bool { + addrs, err := net.InterfaceAddrs() + if err != nil { + return false + } + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To16() != nil && ipNet.IP.To4() == nil { + return true + } + } + } + return false +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +} + +func getLocalCountryCode() string { + return "" +} diff --git a/lib/util/time/sntp/router_timestamper_test.go b/lib/util/time/sntp/router_timestamper_test.go new file mode 100644 index 0000000..3a5da6f --- /dev/null +++ b/lib/util/time/sntp/router_timestamper_test.go @@ -0,0 +1,202 @@ +package sntp + +import ( + "github.com/beevik/ntp" + "sync" + "testing" + "time" +) + +type MockNTPClient struct { + ClockOffset time.Duration + Error error +} + +func (c *MockNTPClient) QueryWithOptions(host string, options ntp.QueryOptions) (*ntp.Response, error) { + if c.Error != nil { + return nil, c.Error + } + return &ntp.Response{ + ClockOffset: c.ClockOffset, + }, nil +} + +type MockListener struct { + mu sync.Mutex + updates []time.Time + stratums []uint8 +} + +func (ml *MockListener) SetNow(now time.Time, stratum uint8) { + ml.mu.Lock() + defer ml.mu.Unlock() + ml.updates = append(ml.updates, now) + ml.stratums = append(ml.stratums, stratum) +} + +func TestRouterTimestamperInitialization(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + if timestamper == nil { + t.Fatal("Expected RouterTimestamper instance, got nil") + } +} + +func TestAddAndRemoveListener(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + listener := &MockListener{} + + timestamper.AddListener(listener) + if len(timestamper.listeners) != 1 { + t.Errorf("Expected 1 listener, got %d", len(timestamper.listeners)) + } + + timestamper.RemoveListener(listener) + if len(timestamper.listeners) != 0 { + t.Errorf("Expected 0 listeners, got %d", len(timestamper.listeners)) + } +} + +func TestTimestampNow(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + listener := &MockListener{} + timestamper.AddListener(listener) + + // Mock Injection + mockNTPClient := &MockNTPClient{ + ClockOffset: 1 * time.Second, + } + timestamper.ntpClient = mockNTPClient + + timestamper.Start() + defer timestamper.Stop() + + timestamper.WaitForInitialization() + + // Trigger update + timestamper.TimestampNow() + + time.Sleep(100 * time.Millisecond) + + listener.mu.Lock() + defer listener.mu.Unlock() + if len(listener.updates) == 0 { + t.Error("Expected at least one time update, got none") + } +} +func TestTimestampNowWithRealNTP(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + listener := &MockListener{} + timestamper.AddListener(listener) + + timestamper.Start() + defer timestamper.Stop() + + t.Log("Waiting for initialization...") + timestamper.WaitForInitialization() + t.Log("Initialization complete") + + // Trigger an immediate time update + t.Log("Triggering time update...") + timestamper.TimestampNow() + + timeout := time.After(30 * time.Second) + updateReceived := make(chan struct{}) + + go func() { + for { + listener.mu.Lock() + if len(listener.updates) > 0 { + listener.mu.Unlock() + updateReceived <- struct{}{} + return + } + listener.mu.Unlock() + time.Sleep(100 * time.Millisecond) + } + }() + + select { + case <-updateReceived: + t.Log("Update received successfully") + case <-timeout: + t.Error("Timed out waiting for NTP update") + } + + listener.mu.Lock() + defer listener.mu.Unlock() + if len(listener.updates) == 0 { + t.Error("Expected at least one time update, got none") + } else { + t.Logf("Received %d updates", len(listener.updates)) + for i, update := range listener.updates { + t.Logf("Update %d: %v", i, update) + } + } + + t.Logf("NTP Servers: %v", timestamper.servers) + t.Logf("Priority Servers: %v", timestamper.priorityServers) +} +func TestWaitForInitialization(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + start := time.Now() + go func() { + time.Sleep(1 * time.Second) + timestamper.mutex.Lock() + timestamper.initialized = true + timestamper.mutex.Unlock() + }() + timestamper.WaitForInitialization() + elapsed := time.Since(start) + if elapsed < 1*time.Second { + t.Errorf("Expected to wait at least 1 second, waited %v", elapsed) + } +} + +func TestQueryTime(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + listener := &MockListener{} + timestamper.AddListener(listener) + + // Mock injection + mockNTPClient := &MockNTPClient{ + ClockOffset: 1 * time.Second, + } + timestamper.ntpClient = mockNTPClient + + servers := []string{"pool.ntp.org"} + success := timestamper.queryTime(servers, 5*time.Second, false) + if !success { + t.Error("Expected queryTime to succeed") + } + + // Ensure that the listener received an update + listener.mu.Lock() + defer listener.mu.Unlock() + if len(listener.updates) == 0 { + t.Error("Expected listener to receive time update") + } +} + +func TestUpdateConfig(t *testing.T) { + defaultClient := &DefaultNTPClient{} + timestamper := NewRouterTimestamper(defaultClient) + + // Modify the default configuration + timestamper.queryFrequency = 1 * time.Minute + timestamper.concurringServers = 5 + + timestamper.updateConfig() + + if timestamper.queryFrequency < minQueryFrequency { + t.Errorf("Expected queryFrequency >= %v, got %v", minQueryFrequency, timestamper.queryFrequency) + } + if timestamper.concurringServers > 4 { + t.Errorf("Expected concurringServers <= 4, got %d", timestamper.concurringServers) + } +} diff --git a/lib/util/time/sntp/update_listener.go b/lib/util/time/sntp/update_listener.go new file mode 100644 index 0000000..5186e4b --- /dev/null +++ b/lib/util/time/sntp/update_listener.go @@ -0,0 +1,8 @@ +package sntp + +import "time" + +// UpdateListener is an interface that listeners must implement to receive time updates. +type UpdateListener interface { + SetNow(now time.Time, stratum uint8) +} diff --git a/lib/util/time/sntp/zones.go b/lib/util/time/sntp/zones.go new file mode 100644 index 0000000..11d271d --- /dev/null +++ b/lib/util/time/sntp/zones.go @@ -0,0 +1,90 @@ +package sntp + +import ( + "bufio" + "embed" + "io" + "log" + "strings" +) + +//go:embed continents.txt +var continentsFS embed.FS + +type Zones struct { + countryToZone map[string]string + continentToZone map[string]string +} + +func NewZones() *Zones { + z := &Zones{ + countryToZone: make(map[string]string), + continentToZone: make(map[string]string), + } + z.initialize() + return z +} + +func (z *Zones) GetZone(countryCode string) string { + countryCode = strings.ToLower(countryCode) + if zone, ok := z.countryToZone[countryCode]; ok { + return zone + } + return "" +} + +func (z *Zones) initialize() { + zones := []string{ + "AF", "africa", + "AN", "antarctica", // Who is living here? + "AS", "asia", + "EU", "europe", + "NA", "north-america", + "OC", "oceania", + "SA", "south-america", + } + + for i := 0; i < len(zones); i += 2 { + z.continentToZone[zones[i]] = zones[i+1] + } + + z.readContinentFile() +} + +func (z *Zones) readContinentFile() { + file, err := continentsFS.Open("continents.txt") + if err != nil { + log.Printf("Error opening continents.txt: %v\n", err) + return + } + defer file.Close() + + reader := bufio.NewReader(file) + for { + line, err := reader.ReadString('\n') + if err != nil && err != io.EOF { + log.Printf("Error reading continents.txt: %v\n", err) + break + } + if err == io.EOF && line == "" { + break + } + + line = strings.TrimSpace(line) + if len(line) == 0 || strings.HasPrefix(line, "#") { + continue + } + + parts := strings.Split(line, ",") + if len(parts) < 2 { + continue + } + + countryCode := strings.ToLower(strings.TrimSpace(parts[0])) + continentCode := strings.ToUpper(strings.TrimSpace(parts[1])) + + if zone, ok := z.continentToZone[continentCode]; ok { + z.countryToZone[countryCode] = zone + } + } +} From f45d3018687782ddbaa5e50eca664a4054a4d615 Mon Sep 17 00:00:00 2001 From: Haris Khan Date: Sat, 19 Oct 2024 22:17:10 -0400 Subject: [PATCH 2/3] proposed refactor -added secureRandBool() and performTimeQuery() --- lib/util/time/sntp/router_timestamper.go | 152 +++++++++++++++++------ 1 file changed, 111 insertions(+), 41 deletions(-) diff --git a/lib/util/time/sntp/router_timestamper.go b/lib/util/time/sntp/router_timestamper.go index c445b0d..b09898d 100644 --- a/lib/util/time/sntp/router_timestamper.go +++ b/lib/util/time/sntp/router_timestamper.go @@ -121,33 +121,99 @@ func (rt *RouterTimestamper) TimestampNow() { go rt.runOnce() } } +func (rt *RouterTimestamper) secureRandBool(probability float64) bool { + return rand.Float64() < probability +} +func (rt *RouterTimestamper) performTimeQuery() bool { + rt.updateConfig() + preferIPv6 := checkIPv6Connectivity() -func (rt *RouterTimestamper) run() { - defer rt.waitGroup.Done() - lastFailed := false - for rt.isRunning { - rt.updateConfig() - preferIPv6 := checkIPv6Connectivity() - if !rt.disabled { - if rt.priorityServers != nil { - for _, servers := range rt.priorityServers { - lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) - if !lastFailed { - break + if rt.disabled { + return false + } + + lastFailed := true + + if rt.priorityServers != nil { + for _, servers := range rt.priorityServers { + lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) + if !lastFailed { + break + } + } + } + + if rt.priorityServers == nil || lastFailed { + prefIPv6 := preferIPv6 && rt.secureRandBool(0.75) + lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) + } + + rt.mutex.Lock() + if !rt.initialized { + rt.initialized = true + } + rt.mutex.Unlock() + + return lastFailed +} + +/* + func (rt *RouterTimestamper) run() { + defer rt.waitGroup.Done() + lastFailed := false + for rt.isRunning { + rt.updateConfig() + preferIPv6 := checkIPv6Connectivity() + if !rt.disabled { + if rt.priorityServers != nil { + for _, servers := range rt.priorityServers { + lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) + if !lastFailed { + break + } } } + if rt.priorityServers == nil || lastFailed { + prefIPv6 := preferIPv6 && !lastFailed && rand.Intn(4) != 0 + lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) + } } - if rt.priorityServers == nil || lastFailed { - prefIPv6 := preferIPv6 && !lastFailed && rand.Intn(4) != 0 - lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) - } - } - rt.mutex.Lock() - if !rt.initialized { - rt.initialized = true + rt.mutex.Lock() + if !rt.initialized { + rt.initialized = true + } + rt.mutex.Unlock() + + var sleepTime time.Duration + if lastFailed { + rt.consecutiveFails++ + if rt.consecutiveFails >= maxConsecutiveFails { + sleepTime = 30 * time.Minute + } else { + sleepTime = 30 * time.Second + } + } else { + rt.consecutiveFails = 0 + randomDelay := time.Duration(rand.Int63n(int64(rt.queryFrequency / 2))) + sleepTime = rt.queryFrequency + randomDelay + if rt.wellSynced { + sleepTime *= 3 + } + } + + select { + case <-time.After(sleepTime): + case <-rt.stopChan: + return + } } - rt.mutex.Unlock() + } +*/ +func (rt *RouterTimestamper) run() { + defer rt.waitGroup.Done() + for rt.isRunning { + lastFailed := rt.performTimeQuery() var sleepTime time.Duration if lastFailed { @@ -174,32 +240,36 @@ func (rt *RouterTimestamper) run() { } } -func (rt *RouterTimestamper) runOnce() { - lastFailed := false - rt.updateConfig() - preferIPv6 := checkIPv6Connectivity() - if !rt.disabled { - if rt.priorityServers != nil { - for _, servers := range rt.priorityServers { - lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) - if !lastFailed { - break +/* + func (rt *RouterTimestamper) runOnce() { + lastFailed := false + rt.updateConfig() + preferIPv6 := checkIPv6Connectivity() + if !rt.disabled { + if rt.priorityServers != nil { + for _, servers := range rt.priorityServers { + lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6) + if !lastFailed { + break + } } } + if rt.priorityServers == nil || lastFailed { + prefIPv6 := preferIPv6 && !lastFailed && rand.Intn(4) != 0 + lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) + } } - if rt.priorityServers == nil || lastFailed { - prefIPv6 := preferIPv6 && !lastFailed && rand.Intn(4) != 0 - lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6) - } - } - rt.mutex.Lock() - if !rt.initialized { - rt.initialized = true + rt.mutex.Lock() + if !rt.initialized { + rt.initialized = true + } + rt.mutex.Unlock() } - rt.mutex.Unlock() +*/ +func (rt *RouterTimestamper) runOnce() { + rt.performTimeQuery() } - func (rt *RouterTimestamper) queryTime(servers []string, timeout time.Duration, preferIPv6 bool) bool { found := make([]time.Duration, rt.concurringServers) var expectedDelta time.Duration From ade80e577cea0f3e89e073282cecad84e2294b1d Mon Sep 17 00:00:00 2001 From: Haris Khan Date: Sun, 20 Oct 2024 00:07:40 -0400 Subject: [PATCH 3/3] added sntp verification -Leap Indicator -Stratum level check -Round-trip Delay -Clock offset -simple non-zero time -Root Dispersion and Root Delay --- lib/util/time/sntp/verification.go | 56 +++++++++++++++++++++++++ lib/util/time/sntp/verification_test.go | 48 +++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 lib/util/time/sntp/verification.go create mode 100644 lib/util/time/sntp/verification_test.go diff --git a/lib/util/time/sntp/verification.go b/lib/util/time/sntp/verification.go new file mode 100644 index 0000000..814a0f3 --- /dev/null +++ b/lib/util/time/sntp/verification.go @@ -0,0 +1,56 @@ +package sntp + +import ( + "fmt" + "github.com/beevik/ntp" + "time" +) + +func (rt *RouterTimestamper) validateResponse(response *ntp.Response) bool { + // Check Leap Indicator + if response.Leap == ntp.LeapNotInSync { + fmt.Println("Invalid response: Server clock not synchronized (Leap Indicator)") + return false + } + + // Check Stratum Level + if response.Stratum == 0 || response.Stratum > 15 { + fmt.Printf("Invalid response: Stratum level %d is out of valid range\n", response.Stratum) + return false + } + + // Round-Trip Delay and Clock Offset Sanity Checks + if response.RTT < 0 || response.RTT > maxRTT { + fmt.Printf("Invalid response: Round-trip delay %v is out of bounds\n", response.RTT) + return false + } + if absDuration(response.ClockOffset) > maxClockOffset { + fmt.Printf("Invalid response: Clock offset %v is out of bounds\n", response.ClockOffset) + return false + } + + // Non-zero Time + if response.Time.IsZero() { + fmt.Println("Invalid response: Received zero time") + return false + } + + // Root Dispersion and Root Delay + if response.RootDispersion > maxRootDispersion { + fmt.Printf("Invalid response: Root dispersion %v is too high\n", response.RootDispersion) + return false + } + if response.RootDelay > maxRootDelay { + fmt.Printf("Invalid response: Root delay %v is too high\n", response.RootDelay) + return false + } + + return true +} + +const ( + maxRTT = 2 * time.Second // Max acceptable round-trip time + maxClockOffset = 10 * time.Second // Max acceptable clock offset + maxRootDispersion = 1 * time.Second // Max acceptable root dispersion + maxRootDelay = 1 * time.Second // Maxi acceptable root delay +) diff --git a/lib/util/time/sntp/verification_test.go b/lib/util/time/sntp/verification_test.go new file mode 100644 index 0000000..2334d40 --- /dev/null +++ b/lib/util/time/sntp/verification_test.go @@ -0,0 +1,48 @@ +package sntp + +import ( + "github.com/beevik/ntp" + "testing" + "time" +) + +func TestValidateResponse(t *testing.T) { + rt := &RouterTimestamper{} + + // Valid response + validResponse := &ntp.Response{ + Leap: ntp.LeapNoWarning, + Stratum: 2, + RTT: 50 * time.Millisecond, + ClockOffset: 100 * time.Millisecond, + Time: time.Now(), + RootDispersion: 500 * time.Millisecond, + RootDelay: 10 * time.Millisecond, + KissCode: "", + } + + if !rt.validateResponse(validResponse) { + t.Error("Expected valid response to pass validation") + } + + // Invalid Leap Indicator + invalidLeapResponse := *validResponse + invalidLeapResponse.Leap = ntp.LeapNotInSync + if rt.validateResponse(&invalidLeapResponse) { + t.Error("Expected response with invalid leap indicator to fail validation") + } + + // Invalid Stratum + invalidStratumResponse := *validResponse + invalidStratumResponse.Stratum = 0 + if rt.validateResponse(&invalidStratumResponse) { + t.Error("Expected response with invalid stratum to fail validation") + } + + // High Root Dispersion + highRootDispersionResponse := *validResponse + highRootDispersionResponse.RootDispersion = 2 * time.Second + if rt.validateResponse(&highRootDispersionResponse) { + t.Error("Expected response with high root dispersion to fail validation") + } +}