|
3 | 3 | during the simulation |
4 | 4 | """ |
5 | 5 |
|
| 6 | + |
6 | 7 | from collections.abc import Generator |
7 | 8 | from typing import cast |
8 | 9 |
|
9 | 10 | import numpy as np |
10 | 11 | import simpy |
| 12 | +from pydantic import PositiveFloat, PositiveInt |
11 | 13 |
|
12 | 14 | from asyncflow.config.constants import ( |
13 | 15 | EndpointStepCPU, |
|
22 | 24 | from asyncflow.resources.server_containers import ServerContainers |
23 | 25 | from asyncflow.runtime.actors.edge import EdgeRuntime |
24 | 26 | from asyncflow.runtime.rqs_state import RequestState |
| 27 | +from asyncflow.samplers.common_helpers import general_sampler |
| 28 | +from asyncflow.schemas.common.random_variables import RVConfig |
25 | 29 | from asyncflow.schemas.settings.simulation import SimulationSettings |
26 | 30 | from asyncflow.schemas.topology.nodes import Server |
27 | 31 |
|
@@ -75,6 +79,45 @@ def __init__( # noqa: PLR0913 |
75 | 79 | settings.enabled_sample_metrics, |
76 | 80 | ) |
77 | 81 |
|
| 82 | + # ------------------------------------------------------------------ |
| 83 | + # HELPERS |
| 84 | + # ------------------------------------------------------------------ |
| 85 | + |
| 86 | + def _sample_duration( |
| 87 | + self, time: RVConfig | PositiveFloat | PositiveInt, |
| 88 | + ) -> float: |
| 89 | + """ |
| 90 | + Return a non-negative duration in seconds. |
| 91 | +
|
| 92 | + - RVConfig -> sample via general_sampler(self.rng) |
| 93 | + - float/int -> cast to float |
| 94 | + - Negative draws are clamped to 0.0 (e.g., Normal tails). |
| 95 | + """ |
| 96 | + if isinstance(time, RVConfig): |
| 97 | + time = float(general_sampler(time, self.rng)) |
| 98 | + else: |
| 99 | + time = float(time) |
| 100 | + |
| 101 | + return time |
| 102 | + |
| 103 | + def _compute_latency_cpu( |
| 104 | + self, |
| 105 | + cpu_time:PositiveFloat | PositiveInt | RVConfig, |
| 106 | + ) -> float: |
| 107 | + """Helper to compute the latency of a cpu bound given step""" |
| 108 | + return self._sample_duration(cpu_time) |
| 109 | + |
| 110 | + def _compute_latency_io( |
| 111 | + self, |
| 112 | + io_time:PositiveFloat | PositiveInt | RVConfig, |
| 113 | + ) -> float: |
| 114 | + """Helper to compute the latency of a IO bound given step""" |
| 115 | + return self._sample_duration(io_time) |
| 116 | + |
| 117 | + # ------------------------------------------------------------------- |
| 118 | + # Main function to elaborate a request |
| 119 | + # ------------------------------------------------------------------- |
| 120 | + |
78 | 121 | # right now we disable the warnings but a refactor will be done soon |
79 | 122 | def _handle_request( # noqa: PLR0915, PLR0912, C901 |
80 | 123 | self, |
@@ -103,11 +146,12 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 |
103 | 146 |
|
104 | 147 |
|
105 | 148 | # Extract the total ram to execute the endpoint |
106 | | - total_ram = sum( |
107 | | - step.step_operation[StepOperation.NECESSARY_RAM] |
108 | | - for step in selected_endpoint.steps |
109 | | - if isinstance(step.kind, EndpointStepRAM) |
110 | | - ) |
| 149 | + total_ram = 0 |
| 150 | + for step in selected_endpoint.steps: |
| 151 | + if isinstance(step.kind, EndpointStepRAM): |
| 152 | + ram = step.step_operation[StepOperation.NECESSARY_RAM] |
| 153 | + assert isinstance(ram, int) |
| 154 | + total_ram += ram |
111 | 155 |
|
112 | 156 | # ------------------------------------------------------------------ |
113 | 157 | # CPU & RAM SCHEDULING |
@@ -226,15 +270,19 @@ def _handle_request( # noqa: PLR0915, PLR0912, C901 |
226 | 270 |
|
227 | 271 | core_locked = True |
228 | 272 |
|
229 | | - cpu_time = step.step_operation[StepOperation.CPU_TIME] |
| 273 | + cpu_time = self._compute_latency_cpu( |
| 274 | + step.step_operation[StepOperation.CPU_TIME], |
| 275 | + ) |
230 | 276 | # Execute the step giving back the control to the simpy env |
231 | 277 | yield self.env.timeout(cpu_time) |
232 | 278 |
|
233 | 279 | # since the object is of an Enum class we check if the step.kind |
234 | 280 | # is one member of enum |
235 | 281 | elif isinstance(step.kind, EndpointStepIO): |
236 | 282 | # define the io time |
237 | | - io_time = step.step_operation[StepOperation.IO_WAITING_TIME] |
| 283 | + io_time = self._compute_latency_io( |
| 284 | + step.step_operation[StepOperation.IO_WAITING_TIME], |
| 285 | + ) |
238 | 286 |
|
239 | 287 | if core_locked: |
240 | 288 | # release the core coming from a cpu step |
|
0 commit comments