@@ -20,6 +20,14 @@ class DomainsNotInConfig(Error):
20
20
"""If no domains exist in configuration file."""
21
21
22
22
23
+ class PrefixesNotInConfig (Error ):
24
+ """If no prefixes exist in configuration file."""
25
+
26
+
27
+ class DomainsAreNotUnique (Error ):
28
+ """If non-unique domains exist in configuration file."""
29
+
30
+
23
31
def flush_workers (domain : Text ) -> None :
24
32
"""Calls peer flush every _CLEANUP_TIME interval."""
25
33
while True :
@@ -35,31 +43,73 @@ def clean_up_worker(domains: List[Text]) -> None:
35
43
domains: list of domains.
36
44
"""
37
45
logger .debug ("Cleaning up the following domains: %s" , domains )
38
- prefix = config .load_config ().get ("domain_prefix" )
46
+ prefixes = config .load_config ().get ("domain_prefixes" )
47
+ cleanup_counter = 0
48
+ # ToDo: do we need a check if every domain got gleaned?
49
+ for prefix in prefixes :
50
+ for domain in domains :
51
+ if prefix in domain :
52
+ logger .info ("Scheduling cleanup task for %s, " , domain )
53
+ try :
54
+ cleaned_domain = domain .split (prefix )[1 ]
55
+ cleanup_counter += 1
56
+ except IndexError :
57
+ logger .error (
58
+ "Cannot strip domain with prefix %s from passed value %s. Skipping cleanup operation" ,
59
+ prefix ,
60
+ domain ,
61
+ )
62
+ continue
63
+ thread = threading .Thread (target = flush_workers , args = (cleaned_domain ,))
64
+ thread .start ()
65
+ if cleanup_counter < len (domains ):
66
+ logger .error (
67
+ "Not every domain got cleaned. Check domains for missing prefixes" ,
68
+ repr (domains ),
69
+ repr (prefixes ),
70
+ )
71
+
72
+
73
+ def check_all_domains_unique (domains , prefixes ):
74
+ """strips off prefixes and checks if domains are unique
75
+
76
+ Args:
77
+ domains: [str]
78
+ Returns:
79
+ boolean
80
+ """
81
+ if not prefixes :
82
+ raise PrefixesNotInConfig ("Could not locate prefixes in configuration." )
83
+ if not isinstance (prefixes , list ):
84
+ raise TypeError ("prefixes is not a list" )
85
+ unique_domains = []
39
86
for domain in domains :
40
- logger .info ("Scheduling cleanup task for %s, " , domain )
41
- try :
42
- cleaned_domain = domain .split (prefix )[1 ]
43
- except IndexError :
44
- logger .error (
45
- "Cannot strip domain with prefix %s from passed value %s. Skipping cleanup operation" ,
46
- prefix ,
47
- domain ,
48
- )
49
- continue
50
- thread = threading .Thread (target = flush_workers , args = (cleaned_domain ,))
51
- thread .start ()
87
+ for prefix in prefixes :
88
+ if prefix in domain :
89
+ stripped_domain = domain .split (prefix )[1 ]
90
+ if stripped_domain in unique_domains :
91
+ logger .error (
92
+ "We have a non-unique domain here" ,
93
+ domain ,
94
+ )
95
+ return False
96
+ unique_domains .append (stripped_domain )
97
+ return True
52
98
53
99
54
100
def main ():
55
101
"""Starts MQTT listener.
56
102
57
103
Raises:
58
104
DomainsNotInConfig: If no domains were found in configuration file.
105
+ DomainsAreNotUnique: If there were non-unique domains after stripping prefix
59
106
"""
60
107
domains = config .load_config ().get ("domains" )
108
+ prefixes = config .load_config ().get ("domain_prefixes" )
61
109
if not domains :
62
110
raise DomainsNotInConfig ("Could not locate domains in configuration." )
111
+ if not check_all_domains_unique (domains , prefixes ):
112
+ raise DomainsAreNotUnique ("There are non-unique domains! Check config." )
63
113
clean_up_worker (domains )
64
114
watch_queue ()
65
115
mqtt .connect ()
0 commit comments