-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathserver.py
More file actions
45 lines (38 loc) · 1.36 KB
/
server.py
File metadata and controls
45 lines (38 loc) · 1.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
"""
initialization of the structure to gather the sampled metrics
and event metrics for the server of the system
"""
from collections.abc import Iterable
from dataclasses import dataclass
from asyncflow.config.constants import SampledMetricName
# Initialize one time outside the function all possible metrics
# related to the servers, the idea of this structure is to
# guarantee scalability in the long term if multiple metrics
# will be considered
SERVER_METRICS = (
SampledMetricName.READY_QUEUE_LEN,
SampledMetricName.EVENT_LOOP_IO_SLEEP,
SampledMetricName.RAM_IN_USE,
)
def build_server_metrics(
enabled_sample_metrics: Iterable[SampledMetricName],
) -> dict[SampledMetricName, list[float | int]]:
"""
Function to populate a dictionary to collect values for
time series of sampled metrics related to the server of
the system.
"""
# The edge case of the empty dict is avoided since at least
# one metric is always measured by default.
return {
metric: [] for metric in SERVER_METRICS
if metric in enabled_sample_metrics
}
# For the client we choosed a named tuple, here we prefer
# a dataclass because we need mutability since start and
# are updated in two different steps
@dataclass
class ServerClock:
"""Server-side request timing: start + finish."""
start: float
finish: float | None = None