blob: 07c9e10b2d6e327225cd216696f4338dd4d1c9e0 [file] [log] [blame]
SF initial configurator15089072022-10-06 13:33:19 +03001# Copyright 2014 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import re
16
17
18class ACLError(Exception):
19 pass
20
21
22class ACLEntry(object):
23 """An access control list entry.
24
25 :arg str subject: The SSL certificate Subject Common Name to which
26 the entry applies.
27
28 :arg str register: A regular expression that matches the jobs that
29 connections with this certificate are permitted to register.
30
31 :arg str invoke: A regular expression that matches the jobs that
32 connections with this certificate are permitted to invoke.
33 Also implies the permission to cancel the same set of jobs in
34 the queue.
35
36 :arg boolean grant: A flag indicating whether connections with
37 this certificate are permitted to grant access to other
38 connections. Also implies the permission to revoke access
39 from other connections. The ability to self-revoke access is
40 always implied.
41 """
42
43 def __init__(self, subject, register=None, invoke=None, grant=False):
44 self.subject = subject
45 self.setRegister(register)
46 self.setInvoke(invoke)
47 self.setGrant(grant)
48
49 def __repr__(self):
50 return ('<ACLEntry for %s register=%s invoke=%s grant=%s>' %
51 (self.subject, self.register, self.invoke, self.grant))
52
53 def isEmpty(self):
54 """Checks whether this entry grants any permissions at all.
55
56 :returns: False if any permission is granted, otherwise True.
57 """
58 if (self.register is None and
59 self.invoke is None and
60 self.grant is False):
61 return True
62 return False
63
64 def canRegister(self, name):
65 """Check whether this subject is permitted to register a function.
66
67 :arg str name: The function name to check.
68 :returns: A boolean indicating whether the action should be permitted.
69 """
70 if self.register is None:
71 return False
72 if not self._register.match(name):
73 return False
74 return True
75
76 def canInvoke(self, name):
77 """Check whether this subject is permitted to register a function.
78
79 :arg str name: The function name to check.
80 :returns: A boolean indicating whether the action should be permitted.
81 """
82 if self.invoke is None:
83 return False
84 if not self._invoke.match(name):
85 return False
86 return True
87
88 def setRegister(self, register):
89 """Sets the functions that this subject can register.
90
91 :arg str register: A regular expression that matches the jobs that
92 connections with this certificate are permitted to register.
93 """
94 self.register = register
95 if register:
96 try:
97 self._register = re.compile(register)
98 except re.error as e:
99 raise ACLError('Regular expression error: %s' % (e.message,))
100 else:
101 self._register = None
102
103 def setInvoke(self, invoke):
104 """Sets the functions that this subject can invoke.
105
106 :arg str invoke: A regular expression that matches the jobs that
107 connections with this certificate are permitted to invoke.
108 """
109 self.invoke = invoke
110 if invoke:
111 try:
112 self._invoke = re.compile(invoke)
113 except re.error as e:
114 raise ACLError('Regular expression error: %s' % (e.message,))
115 else:
116 self._invoke = None
117
118 def setGrant(self, grant):
119 """Sets whether this subject can grant ACLs to others.
120
121 :arg boolean grant: A flag indicating whether connections with
122 this certificate are permitted to grant access to other
123 connections. Also implies the permission to revoke access
124 from other connections. The ability to self-revoke access is
125 always implied.
126 """
127 self.grant = grant
128
129
130class ACL(object):
131 """An access control list.
132
133 ACLs are deny-by-default. The checked actions are only allowed if
134 there is an explicit rule in the ACL granting permission for a
135 given client (identified by SSL certificate Common Name Subject)
136 to perform that action.
137 """
138
139 def __init__(self):
140 self.subjects = {}
141
142 def add(self, entry):
143 """Add an ACL entry.
144
145 :arg Entry entry: The :py:class:`ACLEntry` to add.
146 :raises ACLError: If there is already an entry for the subject.
147 """
148 if entry.subject in self.subjects:
149 raise ACLError("An ACL entry for %s already exists" %
150 (entry.subject,))
151 self.subjects[entry.subject] = entry
152
153 def remove(self, subject):
154 """Remove an ACL entry.
155
156 :arg str subject: The SSL certificate Subject Common Name to
157 remove from the ACL.
158 :raises ACLError: If there is no entry for the subject.
159 """
160 if subject not in self.subjects:
161 raise ACLError("There is no ACL entry for %s" % (subject,))
162 del self.subjects[subject]
163
164 def getEntries(self):
165 """Return a list of current ACL entries.
166
167 :returns: A list of :py:class:`ACLEntry` objects.
168 """
169 items = list(self.subjects.items())
170 items.sort(key=lambda a: a[0])
171 return [x[1] for x in items]
172
173 def canRegister(self, subject, name):
174 """Check whether a subject is permitted to register a function.
175
176 :arg str subject: The SSL certificate Subject Common Name to
177 check against.
178 :arg str name: The function name to check.
179 :returns: A boolean indicating whether the action should be permitted.
180 """
181 entry = self.subjects.get(subject)
182 if entry is None:
183 return False
184 return entry.canRegister(name)
185
186 def canInvoke(self, subject, name):
187 """Check whether a subject is permitted to invoke a function.
188
189 :arg str subject: The SSL certificate Subject Common Name to
190 check against.
191 :arg str name: The function name to check.
192 :returns: A boolean indicating whether the action should be permitted.
193 """
194 entry = self.subjects.get(subject)
195 if entry is None:
196 return False
197 return entry.canInvoke(name)
198
199 def canGrant(self, subject):
200 """Check whether a subject is permitted to grant access to others.
201
202 :arg str subject: The SSL certificate Subject Common Name to
203 check against.
204 :returns: A boolean indicating whether the action should be permitted.
205 """
206 entry = self.subjects.get(subject)
207 if entry is None:
208 return False
209 if not entry.grant:
210 return False
211 return True
212
213 def grantInvoke(self, subject, invoke):
214 """Grant permission to invoke certain functions.
215
216 :arg str subject: The SSL certificate Subject Common Name to which
217 the entry applies.
218 :arg str invoke: A regular expression that matches the jobs
219 that connections with this certificate are permitted to
220 invoke. Also implies the permission to cancel the same
221 set of jobs in the queue.
222 """
223 e = self.subjects.get(subject)
224 if not e:
225 e = ACLEntry(subject)
226 self.add(e)
227 e.setInvoke(invoke)
228
229 def grantRegister(self, subject, register):
230 """Grant permission to register certain functions.
231
232 :arg str subject: The SSL certificate Subject Common Name to which
233 the entry applies.
234 :arg str register: A regular expression that matches the jobs that
235 connections with this certificate are permitted to register.
236 """
237 e = self.subjects.get(subject)
238 if not e:
239 e = ACLEntry(subject)
240 self.add(e)
241 e.setRegister(register)
242
243 def grantGrant(self, subject):
244 """Grant permission to grant permissions to other connections.
245
246 :arg str subject: The SSL certificate Subject Common Name to which
247 the entry applies.
248 """
249 e = self.subjects.get(subject)
250 if not e:
251 e = ACLEntry(subject)
252 self.add(e)
253 e.setGrant(True)
254
255 def revokeInvoke(self, subject):
256 """Revoke permission to invoke all functions.
257
258 :arg str subject: The SSL certificate Subject Common Name to which
259 the entry applies.
260 """
261 e = self.subjects.get(subject)
262 if e:
263 e.setInvoke(None)
264 if e.isEmpty():
265 self.remove(subject)
266
267 def revokeRegister(self, subject):
268 """Revoke permission to register all functions.
269
270 :arg str subject: The SSL certificate Subject Common Name to which
271 the entry applies.
272 """
273 e = self.subjects.get(subject)
274 if e:
275 e.setRegister(None)
276 if e.isEmpty():
277 self.remove(subject)
278
279 def revokeGrant(self, subject):
280 """Revoke permission to grant permissions to other connections.
281
282 :arg str subject: The SSL certificate Subject Common Name to which
283 the entry applies.
284 """
285 e = self.subjects.get(subject)
286 if e:
287 e.setGrant(False)
288 if e.isEmpty():
289 self.remove(subject)