blob: 929126330e934bf50747cd3f76a5cfe171dce837 [file] [log] [blame]
import re
from twisted.internet import defer
from buildbot.util import unicode2bytes
from buildbot.www.authz.endpointmatchers import Match
from buildbot.www.authz.endpointmatchers import EndpointMatcherBase
from buildbot.www.authz.roles import RolesFromBase
from buildbot.www.authz.authz import Forbidden
from buildbot.www.authz.authz import Authz
class WorkerOwnerMatch(Match):
def __init__(self, master, workers, **kwargs):
super().__init__(master, **kwargs)
self.workerOwners = []
for worker in workers:
if 'workerinfo' in worker:
workerinfo = worker['workerinfo']
if 'admin' in workerinfo:
admin = workerinfo['admin']
if admin:
match = re.findall(
r"([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\"([]!#-[^-~ \t]|(\\[\t -~]))+\")@([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\[[\t -Z^-~]*])",
admin)
for parts in match:
self.workerOwners.append(parts[0]+'@'+parts[4])
def getWorkerOwners(self):
return self.workerOwners
class WorkerEndpointMatcher(EndpointMatcherBase):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@defer.inlineCallbacks
def match_WorkerEndpoint(self, epobject, epdict, options):
worker = yield epobject.get({}, epdict)
return WorkerOwnerMatch(self.master, [worker])
@defer.inlineCallbacks
def match_BuildRequestEndpoint_cancel(self, epobject, epdict, options):
buildrequest = yield epobject.get({}, epdict)
if 'builderid' in buildrequest:
workers = yield self.master.db.workers.getWorkers(builderid=buildrequest['builderid'])
if workers:
return WorkerOwnerMatch(self.master, workers)
return None
class RolesFromWorkerOwner(RolesFromBase):
def __init__(self, role):
super().__init__()
self.role = role
def getRolesFromUser(self, userDetails, workerOwners):
if 'email' in userDetails:
if workerOwners and userDetails['email'] in workerOwners:
return [self.role]
return []
class WorkerOwnerAuthz(Authz):
def __init__(self, roleMatchers=None, **kwargs):
super().__init__(roleMatchers=roleMatchers, **kwargs)
self.workerOwnerRoleMatchers = [
r for r in roleMatchers if isinstance(r, RolesFromWorkerOwner)]
self.roleMatchers = [ # Remove workerOwnerRoleMatchers from roleMatchers
r for r in self.roleMatchers if r not in self.workerOwnerRoleMatchers]
def setMaster(self, master):
super().setMaster(master)
for r in self.workerOwnerRoleMatchers:
r.setAuthz(self)
@defer.inlineCallbacks
def assertUserAllowed(self, ep, action, options, userDetails):
IsWorkerOwnerMatch = False
roles = self.getRolesFromUser(userDetails)
for rule in self.allowRules:
match = yield rule.match(ep, action, options)
if match is not None:
# only try to get owner if there are owner Matchers
if self.ownerRoleMatchers:
owner = yield match.getOwner()
if owner:
for r in self.ownerRoleMatchers:
roles.update(set(r.getRolesFromUser(userDetails, owner)))
if self.workerOwnerRoleMatchers and isinstance(match, WorkerOwnerMatch):
IsWorkerOwnerMatch = True
workerOwners = match.getWorkerOwners()
if workerOwners:
for r in self.workerOwnerRoleMatchers:
roles.update(set(r.getRolesFromUser(userDetails, workerOwners)))
for role in roles:
if self.match(role, rule.role): # fnmatch or re.match
return None
if not rule.defaultDeny:
continue # check next suitable rule if not denied
# f" You need to have role '{rule.role}'."
error_msg = unicode2bytes(" You must be the worker owner." if IsWorkerOwnerMatch else
" You must be an author of the commit triggered the build.")
raise Forbidden(error_msg)
return None