Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add qos_tags and functions factory #76

Merged
merged 6 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,41 @@ In case of database structure upgrade, developers must follow these steps:

For details about the alembic migration tool, see the [Alembic tutorial](https://alembic.sqlalchemy.org/en/latest/tutorial.html).

## Quality of Service rules examples

```
# User limits
# user "Limit for anonymous" (user == "anonymous") : numberOfWorkers
# user "Default per-user limit" (user ~ ".*") : 8

# limits for finished requests in the last hours
# user "Limit for test user 1: 10 finished requests in the last 24 hours" (user == "00000000-0000-4000-a000-000000000000") : 10 - userRequestCount(hour(24))
# user "Limit for users: 10 finished requests in the last 24 hours" (user ~ ".*") : 10 - userRequestCount(hour(24))

# Limits
# limit "Limit for dummy-dataset" (dataset == "test-dummy-adaptor") : numberOfWorkers
# limit "Limit for cads_adaptors:DummyAdaptor" (adaptor == "cads_adaptors:DummyAdaptor") : numberOfWorkers - 6

# Permissions
# permission "anonymous cannot access dummy-dataset" (dataset == "test-dummy-adaptor"): user != 'anonymous'

# Priorities
priority "Priority for test user 1" (user == "00000000-0000-4000-a000-000000000000") : hour(1)
priority "Priority for test user 2" (user == "00000000-0000-3000-abcd-000000000001") : -hour(1)

# Functions examples

# Request contains any of the specified variable
# priority "Priority for temperature and humidity" (request_contains_any("variable", ["temperature", "relative_humidity"])): -hour(1)

# Request contains all the specified months
# limit "Limit for retrieve with all months" (request_contains_all("month", ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"])): 2

# The adaptor is tagged with "block"
# permission "The adaptor is blocked." (tagged("block")): false

```

## License

```
Expand Down
2 changes: 2 additions & 0 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def create_request(
adaptor_properties_hash: str,
metadata: dict[str, Any] = {},
resources: dict[str, Any] = {},
qos_tags: list[str] = [],
origin: str = "ui",
request_uid: str | None = None,
) -> dict[str, Any]:
Expand All @@ -428,6 +429,7 @@ def create_request(
session=session,
)
metadata["resources"] = resources
metadata["qos_tags"] = qos_tags
request = SystemRequest(
request_uid=request_uid or str(uuid.uuid4()),
process_id=process_id,
Expand Down
21 changes: 2 additions & 19 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
except ModuleNotFoundError:
pass

from cads_broker import Environment, config, expressions
from cads_broker import Environment, config, factory
from cads_broker import database as db
from cads_broker.qos import QoS

Expand Down Expand Up @@ -95,23 +95,6 @@ def __init__(self) -> None:
self.rules = QoS.RuleSet()
parser.parse_rules(self.rules, self.environment)

def register_functions(self):
expressions.FunctionFactory.FunctionFactory.register_function(
"dataset",
lambda context, *args: context.request.process_id,
)
expressions.FunctionFactory.FunctionFactory.register_function(
"adaptor",
lambda context, *args: context.request.entry_point,
)
expressions.FunctionFactory.FunctionFactory.register_function(
"userRequestCount",
lambda context, *args: db.count_finished_requests_per_user(
user_uid=context.request.user_uid,
seconds=args[0],
),
)


@attrs.define
class Broker:
Expand All @@ -136,7 +119,7 @@ def from_address(
):
client = distributed.Client(address)
qos_config = QoSRules()
qos_config.register_functions()
factory.register_functions()
session_maker = db.ensure_session_obj(session_maker)
rules_hash = get_rules_hash(qos_config.rules_path)
self = cls(
Expand Down
9 changes: 9 additions & 0 deletions cads_broker/expressions/ListExpression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class ListExpression:
def __init__(self, value):
self.value = value

def __repr__(self):
return repr(self.value)

def evaluate(self, context):
return [item.evaluate(context) for item in self.value]
22 changes: 20 additions & 2 deletions cads_broker/expressions/RulesParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#

from .FunctionFactory import FunctionFactory
from .ListExpression import ListExpression
from .NumberExpression import NumberExpression
from .Parser import Parser, ParserError
from .StringExpression import StringExpression
Expand Down Expand Up @@ -119,6 +120,11 @@ def parse_atom(self):
self.consume(")")
return e

if c == "[":
self.consume("[")
e = self.parse_list()
return e

if c == "-":
self.consume("-")
return FunctionFactory.create("neg", self.parse_atom())
Expand All @@ -136,7 +142,7 @@ def parse_atom(self):
while str.isalpha(c) or c == "_":
name = self.parse_ident()
if self.peek() == "(":
args = self.parse_list()
args = self.parse_args()
return FunctionFactory.create(name, *args)
else:
return FunctionFactory.create(name)
Expand All @@ -163,7 +169,7 @@ def parse_power(self):

return result

def parse_list(self):
def parse_args(self):
result = []
self.consume("(")
while self.peek() != ")":
Expand All @@ -176,6 +182,18 @@ def parse_list(self):
self.consume(")")
return result

def parse_list(self):
result = []
while self.peek() != "]":
result.append(self.parse_expression())
if self.peek() == "]":
break

self.consume(",")

self.consume("]")
return ListExpression(result)

def parse_factor(self):
result = self.parse_power()
c = self.peek()
Expand Down
5 changes: 0 additions & 5 deletions cads_broker/expressions/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,6 @@ def execute(self, context, resource):
return context.environment.resource_enabled(resource)


# class FunctionDataset(FunctionExpression):
# def execute(self, context):
# return request.dataset


class FunctionEstimatedSize(FunctionExpression):
def execute(self, context):
return context.request.cost[0]
Expand Down
49 changes: 49 additions & 0 deletions cads_broker/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from cads_broker import database, expressions


def tagged(context, value):
if value in context.request.request_metadata.get("qos_tags"):
return True


def request_contains_all(context, key, values):
request_values = context.request.request_body.get("request").get(key)
if not isinstance(request_values, (list, tuple)):
request_values = [request_values]
s1 = set(request_values)
s2 = set(values)
return len(s1 & s2) == len(s2)


def request_contains_any(context, key, values):
request_values = context.request.request_body.get("request").get(key)
if not isinstance(request_values, (list, tuple)):
request_values = [request_values]
s1 = set(request_values)
s2 = set(values)
return len(s1 & s2) > 0


def register_functions():
expressions.FunctionFactory.FunctionFactory.register_function(
"dataset",
lambda context, *args: context.request.process_id,
)
expressions.FunctionFactory.FunctionFactory.register_function(
"adaptor",
lambda context, *args: context.request.entry_point,
)
expressions.FunctionFactory.FunctionFactory.register_function(
"user_request_count",
lambda context, seconds: database.count_finished_requests_per_user(
user_uid=context.request.user_uid,
seconds=seconds,
),
)
expressions.FunctionFactory.FunctionFactory.register_function("tagged", tagged)
expressions.FunctionFactory.FunctionFactory.register_function(
"request_contains_all", request_contains_all
)
expressions.FunctionFactory.FunctionFactory.register_function(
"request_contains_any", request_contains_any
)
Loading