-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathntp_check.py
116 lines (91 loc) · 4.09 KB
/
ntp_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import argparse
import ntplib
import time
from datetime import datetime, timezone
from service_check_summarizer import summarize_service_check_output
from tts_utils import speak_text
from summary_utils import add_to_combined_summaries
# List of well-known NTP servers
ntp_servers = [
"time.google.com",
"time1.google.com",
"time2.google.com",
"time3.google.com",
"time4.google.com",
"time.nist.gov",
"time.windows.com",
"pool.ntp.org",
"time.apple.com", # Apple NTP server
"ntp2.usno.navy.mil", # US Navy NTP server
"tick.usno.navy.mil", # US Military NTP server (USNO)
"tock.usno.navy.mil" # US Military NTP server (USNO)
]
def check_ntp_server(server):
"""Query an NTP server and check for a valid response."""
client = ntplib.NTPClient()
try:
# Query the NTP server
response = client.request(server, version=3, timeout=5)
# Check if the response is valid by comparing local and server times
server_time = datetime.fromtimestamp(response.tx_time, tz=timezone.utc)
return "reachable", server_time
except Exception as e:
return "unreachable", str(e)
def check_ntp_servers(servers):
reachable_servers = []
unreachable_servers = []
# First round of checks
for ntp_server in servers:
status, result = check_ntp_server(ntp_server)
if status == "reachable":
reachable_servers.append((ntp_server, result))
else:
unreachable_servers.append((ntp_server, result))
# Retry unreachable servers after a delay
if unreachable_servers:
print("\nRetrying unreachable servers...\n")
time.sleep(5) # Wait 5 seconds before retrying
remaining_unreachable = []
for ntp_server, error in unreachable_servers:
status, retry_result = check_ntp_server(ntp_server)
if status == "reachable":
reachable_servers.append((ntp_server, retry_result))
else:
remaining_unreachable.append((ntp_server, retry_result))
unreachable_servers = remaining_unreachable # Update unreachable after retry
return reachable_servers, unreachable_servers
def main(silent=False, polite=False):
args = argparse.Namespace(silent=silent, polite=polite)
if not args.silent:
speak_text( "Starting NTP server monitoring.")
print(f"This script will check the reachability of several of the most commonly used NTP servers around the "
f"western world.\n")
ntp_check_results = ("NTP servers function as authoritative time sources, employing a hierarchical system of "
"stratum layers and precision algorithms to ensure synchronized time across distributed "
"computing systems. Their importance lies in the strict temporal alignment they provide, "
"enabling coordinated actions, event correlation, and the prevention of cascading errors in "
"networked environments.\n\n")
reachable, unreachable = check_ntp_servers(ntp_servers)
print("Reachable NTP Servers:")
ntp_check_results += "Reachable NTP Servers:\n"
for server, server_time in reachable:
print(f"- {server}: Server Time: {server_time}")
ntp_check_results += f"- {server}: Server Time: {server_time}\n"
if len(unreachable) == 0:
print("\nAll NTP servers are reachable.")
ntp_check_results += "\nAll NTP servers are reachable.\n"
else:
print("\nUnreachable NTP Servers:")
ntp_check_results += "\nUnreachable NTP Servers:\n"
for server, error in unreachable:
print(f"- {server}: {error}")
ntp_check_results += f"- {server}: {error}\n"
ntp_summary = summarize_service_check_output(ntp_check_results)
print(f"{ntp_summary}")
# Add the summary to the combined summaries
add_to_combined_summaries(ntp_summary)
if not args.silent:
speak_text("The NTP server monitoring report is as follows:")
speak_text(f"{ntp_summary}")
if __name__ == "__main__":
main()