Skip to content

Commit 2284363

Browse files
committed
Added metrics L system and Lq LB
1 parent 98fc0fd commit 2284363

16 files changed

Lines changed: 108 additions & 43 deletions

File tree

src/asyncflow/config/enums.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,13 @@ class SampledMetricName(StrEnum):
189189
"""
190190

191191
# Mandatory metrics to collect
192-
READY_QUEUE_LEN = "ready_queue_len" #length of the event loop ready q
193-
EVENT_LOOP_IO_SLEEP = "event_loop_io_sleep"
192+
LQ_SERVER = "ready_queue_len" #length of the event loop ready q
193+
LQ_IO = "event_loop_io_sleep"
194194
RAM_IN_USE = "ram_in_use"
195195
EDGE_CONCURRENT_CONNECTION = "edge_concurrent_connection"
196+
L_SYSTEM = "l_system"
197+
LQ_LB = "lq_lb"
198+
196199

197200

198201
class SamplePeriods(float, Enum):

src/asyncflow/metrics/collector.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
import simpy
66

7-
from asyncflow.config.enums import SampledMetricName
7+
from asyncflow.config.enums import LbAlgorithmsName, SampledMetricName
8+
from asyncflow.runtime.actors.arrivals_generator import ArrivalsGeneratorRuntime
89
from asyncflow.runtime.actors.edge import EdgeRuntime
10+
from asyncflow.runtime.actors.load_balancer import LoadBalancerRuntime
911
from asyncflow.runtime.actors.server import ServerRuntime
1012
from asyncflow.schemas.settings.simulation import SimulationSettings
1113

@@ -17,34 +19,42 @@
1719
class SampledMetricCollector:
1820
"""class to define a centralized object to collect sampled metrics"""
1921

20-
def __init__(
22+
def __init__(# noqa: PLR0913
2123
self,
2224
*,
25+
arrivals: ArrivalsGeneratorRuntime,
2326
edges: list[EdgeRuntime],
2427
servers: list[ServerRuntime],
28+
lb: LoadBalancerRuntime | None,
2529
env: simpy.Environment,
2630
sim_settings: SimulationSettings,
2731
) -> None:
2832
"""
2933
Args:
34+
arrivals: (ArrivalsGeneratorRuntime): usefull to compute l_system
3035
edges (list[EdgeRuntime]): list of the class EdgeRuntime
3136
servers (list[ServerRuntime]): list of server of the class ServerRuntime
37+
lb (LoadBalancerRuntime): useful to compute Lq
3238
env (simpy.Environment): environment for the simulation
3339
sim_settings (SimulationSettings): general settings for the simulation
3440
3541
"""
42+
self.arrivals = arrivals
3643
self.edges = edges
3744
self.servers = servers
45+
self.lb = lb
3846
self.sim_settings = sim_settings
3947
self.env = env
4048
self._sample_period = sim_settings.sample_period_s
4149

4250

4351
# enum keys instance-level for mandatory sampled metrics to collect
44-
self._conn_key = SampledMetricName.EDGE_CONCURRENT_CONNECTION
45-
self._ram_key = SampledMetricName.RAM_IN_USE
46-
self._io_key = SampledMetricName.EVENT_LOOP_IO_SLEEP
47-
self._ready_key = SampledMetricName.READY_QUEUE_LEN
52+
self._conn_key = SampledMetricName.EDGE_CONCURRENT_CONNECTION
53+
self._ram_key = SampledMetricName.RAM_IN_USE
54+
self._io_key = SampledMetricName.LQ_IO
55+
self._ready_key = SampledMetricName.LQ_SERVER
56+
self._l_system = SampledMetricName.L_SYSTEM
57+
self._lq_lb = SampledMetricName.LQ_LB
4858

4959

5060
def _build_time_series(self) -> Generator[simpy.Event, None, None]:
@@ -65,6 +75,16 @@ def _build_time_series(self) -> Generator[simpy.Event, None, None]:
6575
server.enabled_metrics[self._io_key].append(server.io_queue_len)
6676
server.enabled_metrics[self._ready_key].append(server.ready_queue_len)
6777

78+
if self._l_system in self.arrivals.enabled_metrics:
79+
self.arrivals.enabled_metrics[self._l_system].append(
80+
float(self.arrivals.l_system),
81+
)
82+
83+
if (self.lb is not None and
84+
self.lb is not None and
85+
self.lb.lb_config.algorithms == LbAlgorithmsName.FCFS):
86+
self.lb.enabled_metrics[self._lq_lb].append(self.lb.lq_lb)
87+
6888

6989

7090
def start(self) -> None:

src/asyncflow/metrics/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
# will be considered
1414

1515
SERVER_METRICS = (
16-
SampledMetricName.READY_QUEUE_LEN,
17-
SampledMetricName.EVENT_LOOP_IO_SLEEP,
16+
SampledMetricName.LQ_SERVER,
17+
SampledMetricName.LQ_IO,
1818
SampledMetricName.RAM_IN_USE,
1919
)
2020

src/asyncflow/metrics/simulation_analyzer.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from asyncflow.config.enums import (
1111
EventMetricName,
1212
LatencyKey,
13+
LbAlgorithmsName,
1314
SampledMetricName,
1415
)
1516
from asyncflow.config.plot_constants import (
@@ -263,6 +264,17 @@ def _extract_sampled_metrics(self) -> None:
263264
for name, values in edge.enabled_metrics.items():
264265
metrics[name.value][eid] = values
265266

267+
for name, values in self._generator.enabled_metrics.items():
268+
aid = self._generator.arrivals.id
269+
metrics[name.value][aid] = values
270+
271+
if (self.lb is not None and
272+
self.lb.lb_config.algorithms == LbAlgorithmsName.FCFS):
273+
lb_id = self.lb.lb_config.id
274+
for name, values in self.lb.enabled_metrics.items():
275+
# es. SampledMetricName.LQ_LB → “lq_lb”
276+
metrics[name.value][lb_id] = values
277+
266278
self.sampled_metrics = metrics
267279

268280
# ─────────────────────────────────────────────
@@ -559,7 +571,7 @@ def plot_single_server_ready_queue(self, ax: Axes, server_id: str) -> None:
559571
"""Plot Ready queue with mean/min/max lines and a single legend box with
560572
values. No trend/ewma, no legend entry for the main series.
561573
"""
562-
times, vals = self.get_series(SampledMetricName.READY_QUEUE_LEN, server_id)
574+
times, vals = self.get_series(SampledMetricName.LQ_SERVER, server_id)
563575
if not vals:
564576
ax.text(0.5, 0.5, SERVER_QUEUES_PLOT.no_data, ha="center", va="center")
565577
return
@@ -617,7 +629,7 @@ def plot_single_server_io_queue(self, ax: Axes, server_id: str) -> None:
617629
"""Plot I/O queue with mean/min/max lines and a single legend box with
618630
values. No trend/ewma, no legend entry for the main series.
619631
"""
620-
times, vals = self.get_series(SampledMetricName.EVENT_LOOP_IO_SLEEP, server_id)
632+
times, vals = self.get_series(SampledMetricName.LQ_IO, server_id)
621633
if not vals:
622634
ax.text(0.5, 0.5, SERVER_QUEUES_PLOT.no_data, ha="center", va="center")
623635
return

src/asyncflow/runner/simulation.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
class Startable(Protocol):
4545
"""A protocol for runtime actors that can be started."""
4646

47-
def start(self) -> None:
47+
def start(self) -> simpy.Process:
4848
"""Starts the main process loop for the actor."""
4949
...
5050

@@ -368,8 +368,10 @@ def _start_all_processes(self) -> None:
368368
def _start_metric_collector(self) -> None:
369369
"""One coroutine that snapshots RAM / queues / connections."""
370370
SampledMetricCollector(
371+
arrivals=next(iter(self._arrivals_runtime.values())),
371372
edges=list(self._edges_runtime.values()),
372373
servers=list(self._servers_runtime.values()),
374+
lb=self._lb_runtime,
373375
env=self.env,
374376
sim_settings=self.simulation_settings,
375377
).start()

src/asyncflow/runtime/actors/arrivals_generator.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import simpy
2222

23+
from asyncflow.config.enums import SampledMetricName
2324
from asyncflow.runtime.actors.edge import EdgeRuntime
2425
from asyncflow.schemas.arrivals.generator import ArrivalsGenerator
2526
from asyncflow.schemas.settings.simulation import SimulationSettings
@@ -62,7 +63,14 @@ def __init__( # noqa: PLR0913
6263
self.arrivals_generator_box = arrivals_generator_box
6364
self.completed_box = completed_box
6465
self.id_counter = 0
66+
67+
68+
# necessary to collect metrics (global throughput and latency)
6569
self._rqs_clock: list[RqsClock] = []
70+
# necessary to collect simultaneous rqs in the system
71+
self._l_system: int = 0
72+
# dict for the collector to have the time series
73+
self.enabled_metrics: dict[SampledMetricName, list[float]] = {}
6674

6775

6876
def _next_id(self) -> int:
@@ -93,6 +101,7 @@ def _event_arrival(self) -> Generator[simpy.Event, None, None]:
93101
self.arrivals.id,
94102
self.env.now,
95103
)
104+
self._l_system += 1
96105
# transport is a method of the edge runtime
97106
# which define the step of how the state is moving
98107
# from one node to another
@@ -109,17 +118,22 @@ def _collector(self) -> Generator[simpy.Event, None, None]:
109118
finish=state.finish_time,
110119
)
111120
self._rqs_clock.append(clock_data)
121+
self._l_system -= 1
112122
yield self.completed_box.put(state)
113123

114124

115125
def start(self) -> simpy.Process:
116-
"""Passing the structure as a simpy process"""
117-
self.env.process(self._event_arrival())
126+
"""Start the simpy processes"""
127+
p_arr = self.env.process(self._event_arrival())
118128
self.env.process(self._collector())
119-
120-
129+
return p_arr
121130

122131
@property
123132
def rqs_clock(self) -> list[RqsClock]:
124133
"""Readable version to compute aggregate metrics"""
125134
return self._rqs_clock
135+
136+
@property
137+
def l_system(self) -> int:
138+
"""Readable version to sample the metric"""
139+
return self._l_system

src/asyncflow/runtime/actors/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
4747

4848
self.out_edge.transport(state)
4949

50-
def start(self) -> None:
50+
def start(self) -> simpy.Process:
5151
"""Initialization of the process"""
52-
self.env.process(self._forwarder())
52+
return self.env.process(self._forwarder())
5353

src/asyncflow/runtime/actors/load_balancer.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import simpy
99

10-
from asyncflow.config.enums import LbAlgorithmsName, SystemNodes
10+
from asyncflow.config.enums import LbAlgorithmsName, SampledMetricName, SystemNodes
1111
from asyncflow.runtime.actors.edge import EdgeRuntime
1212
from asyncflow.runtime.actors.routing.lb_algorithms import (
1313
LB_TABLE,
@@ -65,6 +65,11 @@ def __init__(
6565
# queue theory
6666
self._lb_waiting_time: list[float] = []
6767

68+
# counter to collect when algo is fcfs lq to compare with queue theory
69+
self._lq_lb: int = 0
70+
# dict to collect the time series
71+
self.enabled_metrics: dict[SampledMetricName, list[float]] = {}
72+
6873

6974
# Helpers FCFS
7075

@@ -99,7 +104,7 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
99104
state: RequestState = yield self.lb_box.get() # type: ignore[assignment]
100105

101106
if self.lb_config.algorithms == LbAlgorithmsName.FCFS:
102-
107+
self._lq_lb += 1
103108
hist = getattr(state, "history", None)
104109
if hist:
105110
last = hist[-1]
@@ -139,6 +144,7 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
139144
self._lb_waiting_time.append(waiting_time)
140145

141146
edge_rt = self.lb_out_edges[edge_id]
147+
self._lq_lb -= 1
142148
edge_rt.transport(state)
143149
else:
144150
state.record_hop(
@@ -149,12 +155,17 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
149155
edge_rt = LB_TABLE[self.lb_config.algorithms](self.lb_out_edges)
150156
edge_rt.transport(state)
151157

152-
def start(self) -> None:
158+
def start(self) -> simpy.Process:
153159
"""Start the process and populate FIFO"""
154160
self._prime_free_edges()
155-
self.env.process(self._forwarder())
161+
return self.env.process(self._forwarder())
156162

157163
@property
158164
def lb_waiting_times(self) -> Sequence[float]:
159165
"""Read-only view of LB FCFS waiting times (one per waited request)."""
160166
return tuple(self._lb_waiting_time)
167+
168+
@property
169+
def lq_lb(self) -> int:
170+
"""Readable version to sample the metric"""
171+
return self._lq_lb

src/asyncflow/runtime/actors/server.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,6 @@ def _dispatcher(self) -> Generator[simpy.Event, None, None]:
189189
# Spawn a new, independent process to handle this request
190190
self.env.process(self._handle_request(request_state))
191191

192-
def start(self) -> None:
193-
"""Generate the process to simulate the server inside simpy env"""
194-
self.env.process(self._dispatcher())
195-
196192
# right now we disable the warnings but a refactor will be done soon
197193
def _handle_request( # noqa: PLR0915, PLR0912, C901
198194
self,
@@ -444,6 +440,9 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901
444440
assert self.out_edge is not None
445441
self.out_edge.transport(state)
446442

443+
def start(self) -> simpy.Process:
444+
"""Generate the process to simulate the server inside simpy env"""
445+
return self.env.process(self._dispatcher())
447446

448447
# we need these accessor because we need to read these private attribute
449448
# in the sampled metric collector

src/asyncflow/runtime/events/injection.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,11 @@ def _assign_server_state(self) -> Generator[simpy.Event, None, None]:
254254

255255

256256

257-
def start(self) -> None:
257+
def start(self) -> tuple[simpy.Process, simpy.Process]:
258258
"""Start both edge-spike and server-outage timelines."""
259-
self.env.process(self._assign_edges_spike())
260-
self.env.process(self._assign_server_state())
259+
p1 = self.env.process(self._assign_edges_spike())
260+
p2 = self.env.process(self._assign_server_state())
261+
return p1, p2
261262

262263

263264
@property

0 commit comments

Comments
 (0)