Skip to content

Commit 667e435

Browse files
committed
Calculations theory vs observed indipendent
1 parent 2284363 commit 667e435

11 files changed

Lines changed: 146 additions & 153 deletions

File tree

asyncflow_queue_limit/asyncflow_mmc.ipynb

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
},
2121
{
2222
"cell_type": "code",
23-
"execution_count": 111,
23+
"execution_count": 9,
2424
"id": "3e168d4a",
2525
"metadata": {},
2626
"outputs": [],
@@ -45,7 +45,7 @@
4545
},
4646
{
4747
"cell_type": "code",
48-
"execution_count": 112,
48+
"execution_count": 10,
4949
"id": "dd39a8e3",
5050
"metadata": {},
5151
"outputs": [
@@ -112,7 +112,7 @@
112112
},
113113
{
114114
"cell_type": "code",
115-
"execution_count": 113,
115+
"execution_count": 11,
116116
"id": "d2937e5e",
117117
"metadata": {},
118118
"outputs": [],
@@ -168,9 +168,9 @@
168168
" LinkEdge(id=\"lb-srv1\", source=\"lb-1\", target=\"srv-1\", ),\n",
169169
" LinkEdge(id=\"lb-srv2\", source=\"lb-1\", target=\"srv-2\", ),\n",
170170
" LinkEdge(id=\"lb-srv3\", source=\"lb-1\", target=\"srv-3\", ),\n",
171-
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"client-1\",),\n",
172-
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"client-1\",),\n",
173-
" LinkEdge(id=\"srv3-client\", source=\"srv-3\", target=\"client-1\",),\n",
171+
" LinkEdge(id=\"srv1-client\", source=\"srv-1\", target=\"rqs-1\",),\n",
172+
" LinkEdge(id=\"srv2-client\", source=\"srv-2\", target=\"rqs-1\",),\n",
173+
" LinkEdge(id=\"srv3-client\", source=\"srv-3\", target=\"rqs-1\",),\n",
174174
" ]\n",
175175
"\n",
176176
" settings = SimulationSettings(\n",
@@ -201,7 +201,7 @@
201201
},
202202
{
203203
"cell_type": "code",
204-
"execution_count": 114,
204+
"execution_count": 12,
205205
"id": "d0634bc8",
206206
"metadata": {},
207207
"outputs": [
@@ -285,47 +285,73 @@
285285
"\n",
286286
"---\n",
287287
"\n",
288-
"## Observed (from simulation)\n",
288+
"## Observed (from simulation, FCFS/Erlang-C)\n",
289289
"\n",
290-
"After processing metrics:\n",
290+
"Under FCFS we derive **each KPI directly** from sampled time series or per-request buckets, **not** by deriving one observed KPI from another. This keeps Little’s law checks as **independent validations**, not definitions.\n",
291291
"\n",
292-
"1. **Arrival rate**:\n",
292+
"1. **Arrival rate**\n",
293293
"\n",
294294
"$$\n",
295-
"\\lambda_{\\text{Observed}} = \\text{mean throughput (client completions)}\n",
295+
"\\lambda_{\\text{obs}}=\\mathrm{mean}(\\text{Throughput RPS})\n",
296296
"$$\n",
297297
"\n",
298-
"2. **Service rate**:\n",
298+
"Mean of the throughput series (client completions per fixed window, typically 1 s).\n",
299+
"\n",
300+
"2. **Service rate**\n",
299301
"\n",
300302
"$$\n",
301-
"\\mu_{\\text{Observed}} = 1 / \\overline{S}, \\quad \\overline{S} = \\text{mean(service\\_time)}\n",
303+
"\\mu_{\\text{obs}}=\\frac{1}{\\overline{S}},\\quad \n",
304+
"\\overline{S}=\\mathrm{mean}(\\texttt{SERVICE\\_TIME})\n",
302305
"$$\n",
303306
"\n",
304-
"3. **End-to-end latency**:\n",
307+
"Computed from per-request server buckets, aggregated across all servers.\n",
308+
"\n",
309+
"3. **End-to-end time**\n",
305310
"\n",
306311
"$$\n",
307-
"W_{\\text{Observed}} = \\text{mean(client latencies)}\n",
312+
"W_{\\text{obs}}=\\mathrm{mean}(\\text{client latencies})\n",
308313
"$$\n",
309314
"\n",
310-
"4. **Waiting time**:\n",
315+
"From the generator’s request clocks (start/finish per request).\n",
316+
"\n",
317+
"4. **Queue waiting time (LB)**\n",
311318
"\n",
312319
"$$\n",
313-
"W_{q,\\text{Observed}} = \\text{mean(waiting\\_time)} \n",
320+
"W_{q,\\text{obs}}=\\mathrm{mean}(\\text{LB waiting times})\n",
314321
"$$\n",
315322
"\n",
316-
"5. **Little’s law check**:\n",
323+
"Mean of the **load balancer FCFS** waiting time recorded per request (not inferred from queue length).\n",
324+
"\n",
325+
"5. **Mean number in system**\n",
317326
"\n",
318327
"$$\n",
319-
"L_{\\text{Observed}} = \\lambda_{\\text{Observed}} W_{\\text{Observed}}, \\qquad\n",
320-
"L_{q,\\text{Observed}} = \\lambda_{\\text{Observed}} W_{q,\\text{Observed}}\n",
328+
"L_{\\text{obs}}=\\mathrm{mean}(\\texttt{L\\_SYSTEM})\n",
321329
"$$\n",
322330
"\n",
323-
"6. **Utilization**:\n",
331+
"Time average of the sampled series of **concurrent requests in the system** (sampled every $\\Delta t$).\n",
332+
"*Fallback if the series is unavailable in tests:* $L_{\\text{obs}}=\\lambda_{\\text{obs}} W_{\\text{obs}}$.\n",
333+
"\n",
334+
"6. **Mean number in queue (LB)**\n",
324335
"\n",
325336
"$$\n",
326-
"\\rho_{\\text{Observed}} = \\lambda_{\\text{Observed}}/(c\\,\\mu_{\\text{Observed}})\n",
337+
"L_{q,\\text{obs}}=\\mathrm{mean}(\\texttt{LQ\\_LB})\n",
327338
"$$\n",
328339
"\n",
340+
"Time average of the **LB queue length** series.\n",
341+
"*Fallback:* $L_{q,\\text{obs}}=\\lambda_{\\text{obs}} W_{q,\\text{obs}}$.\n",
342+
"\n",
343+
"7. **Global utilization**\n",
344+
"\n",
345+
"$$\n",
346+
"\\rho_{\\text{obs}}=\\mathrm{mean}(\\texttt{SERVER\\_UTILIZATION})\n",
347+
"$$\n",
348+
"\n",
349+
"Each server samples a 0/1 “busy” indicator; $\\rho_{\\text{obs}}$ is the time average (with 1 core/server it matches the busy fraction).\n",
350+
"*Fallback:* $\\rho_{\\text{obs}}=\\lambda_{\\text{obs}}/(c\\,\\mu_{\\text{obs}})$.\n",
351+
"\n",
352+
"> Because each quantity comes from its own primary measurement (series or buckets), cross-relations like $L=\\lambda W$ and $L_q=\\lambda W_q$ are genuine **consistency checks**, not tautologies.\n",
353+
"\n",
354+
"\n",
329355
"---\n",
330356
"\n",
331357
"## Comparison\n",
@@ -339,7 +365,7 @@
339365
},
340366
{
341367
"cell_type": "code",
342-
"execution_count": 115,
368+
"execution_count": 13,
343369
"id": "ccd7379b",
344370
"metadata": {},
345371
"outputs": [
@@ -352,13 +378,13 @@
352378
"-------------------------------------------------------------------\n",
353379
"sym metric theory observed abs rel%\n",
354380
"-------------------------------------------------------------------\n",
355-
"λ Arrival rate (1/s) 270.000000 269.997500 -0.002500 -0.00\n",
356-
"μ Service rate (1/s) 100.000000 99.925514 -0.074486 -0.07\n",
357-
"rho Utilization 0.900000 0.900663 0.000663 0.07\n",
358-
"L Mean items in sys 10.053549 9.991820 -0.061730 -0.61\n",
359-
"Lq Mean items in queue 7.353549 7.289848 -0.063701 -0.87\n",
360-
"W Mean time in sys (s) 0.037235 0.037007 -0.000228 -0.61\n",
361-
"Wq Mean waiting (s) 0.027235 0.027000 -0.000236 -0.87\n",
381+
"λ Arrival rate (1/s) 270.000000 269.696111 -0.303889 -0.11\n",
382+
"μ Service rate (1/s) 100.000000 100.016488 0.016488 0.02\n",
383+
"rho Utilization 0.900000 0.898839 -0.001161 -0.13\n",
384+
"L Mean items in sys 10.053549 9.849654 -0.203895 -2.03\n",
385+
"Lq Mean items in queue 7.353549 7.153121 -0.200428 -2.73\n",
386+
"W Mean time in sys (s) 0.037235 0.036521 -0.000714 -1.92\n",
387+
"Wq Mean waiting (s) 0.027235 0.026523 -0.000712 -2.62\n",
362388
"===================================================================\n"
363389
]
364390
}

src/asyncflow/config/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class SampledMetricName(StrEnum):
195195
EDGE_CONCURRENT_CONNECTION = "edge_concurrent_connection"
196196
L_SYSTEM = "l_system"
197197
LQ_LB = "lq_lb"
198+
SERVER_UTILIZATION = "server_utilization"
198199

199200

200201

src/asyncflow/metrics/collector.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ def __init__(# noqa: PLR0913
5353
self._ram_key = SampledMetricName.RAM_IN_USE
5454
self._io_key = SampledMetricName.LQ_IO
5555
self._ready_key = SampledMetricName.LQ_SERVER
56-
self._l_system = SampledMetricName.L_SYSTEM
57-
self._lq_lb = SampledMetricName.LQ_LB
56+
self._l_system_key = SampledMetricName.L_SYSTEM
57+
self._lq_lb_key = SampledMetricName.LQ_LB
58+
self._server_utilization_key = SampledMetricName.SERVER_UTILIZATION
5859

5960

6061
def _build_time_series(self) -> Generator[simpy.Event, None, None]:
@@ -74,16 +75,18 @@ def _build_time_series(self) -> Generator[simpy.Event, None, None]:
7475
server.enabled_metrics[self._ram_key].append(server.ram_in_use)
7576
server.enabled_metrics[self._io_key].append(server.io_queue_len)
7677
server.enabled_metrics[self._ready_key].append(server.ready_queue_len)
78+
server.enabled_metrics[
79+
self._server_utilization_key
80+
].append(server.server_utilization)
7781

78-
if self._l_system in self.arrivals.enabled_metrics:
79-
self.arrivals.enabled_metrics[self._l_system].append(
82+
if self._l_system_key in self.arrivals.enabled_metrics:
83+
self.arrivals.enabled_metrics[self._l_system_key].append(
8084
float(self.arrivals.l_system),
8185
)
8286

8387
if (self.lb is not None and
84-
self.lb is not None and
8588
self.lb.lb_config.algorithms == LbAlgorithmsName.FCFS):
86-
self.lb.enabled_metrics[self._lq_lb].append(self.lb.lq_lb)
89+
self.lb.enabled_metrics[self._lq_lb_key].append(self.lb.lq_lb)
8790

8891

8992

src/asyncflow/metrics/server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
SampledMetricName.LQ_SERVER,
1717
SampledMetricName.LQ_IO,
1818
SampledMetricName.RAM_IN_USE,
19+
SampledMetricName.SERVER_UTILIZATION,
1920
)
2021

2122
def build_server_metrics(

src/asyncflow/queue_theory_analysis/mmc.py

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
EndpointStepCPU,
1616
LatencyKey,
1717
LbAlgorithmsName,
18+
SampledMetricName,
1819
)
1920
from asyncflow.queue_theory_analysis.base import QueueTheoryBase
2021
from asyncflow.schemas.common.random_variables import RVConfig
@@ -474,18 +475,20 @@ def _observed_kpis(
474475
- λ̂: mean throughput
475476
- μ̂: 1 / mean(service_time)
476477
- Ŵ: mean client latency
477-
- Wq̂: mean waiting_time (server arrays)
478-
- L̂: λ̂ * Ŵ
479-
- Lq̂: λ̂ * Wq̂
480-
- rhô: λ̂ / (c μ̂)
478+
- Wq̂: FCFS -> mean LB waits; else -> mean server waiting_time
479+
- L̂: FCFS -> mean L_SYSTEM; else -> λ̂ * Ŵ
480+
- Lq̂: FCFS -> mean LQ_LB; else -> λ̂ * Wq̂
481+
- rho: FCFS -> mean SERVER_UTILIZATION; else -> λ̂ / (c μ̂)
482+
all computations are independent: we calculate each
483+
variable in a separate way
481484
"""
482485
self._ensure_metrics_processed(results_analyzer)
483486

484487
lambda_hat = self._observed_lambda_rate(results_analyzer)
485488
mu_hat = self._observed_mu_rate(results_analyzer)
486489
server_count = self._server_count(payload)
487490

488-
# Ŵ from latency stats (client-side);
491+
# Ŵ from latency stats (generator-side);
489492
lat_stats = results_analyzer.get_latency_stats()
490493
w_hat = float(lat_stats.get(LatencyKey.MEAN, 0.0))
491494

@@ -494,8 +497,42 @@ def _observed_kpis(
494497

495498
# Collect waiting time from LB if the algo is FCFS
496499
if is_fcfs:
500+
# --- L̂: mean L_SYSTEM timeseries (fallback λ̂·Ŵ) ---
501+
lsys_map = results_analyzer.get_metric_map(SampledMetricName.L_SYSTEM)
502+
aid = payload.arrivals.id
503+
lsys_series = lsys_map.get(aid, [])
504+
l_hat = (
505+
(sum(lsys_series) / len(lsys_series))
506+
if lsys_series else (lambda_hat * w_hat)
507+
)
508+
509+
# --- Wq̂: keep LB waiting times ---
497510
lb_waits = list(results_analyzer.get_lb_waiting_times())
498511
wq_hat = (sum(lb_waits) / len(lb_waits)) if lb_waits else 0.0
512+
513+
# --- Lq̂: mean LQ_LB timeseries (fallback λ̂·Wq̂) ---
514+
lq_map = results_analyzer.get_metric_map(SampledMetricName.LQ_LB)
515+
lb_id = lb.id if lb is not None else None
516+
lq_series = lq_map.get(lb_id, []) if lb_id is not None else []
517+
lq_hat = (
518+
(sum(lq_series) / len(lq_series))
519+
if lq_series else (lambda_hat * wq_hat)
520+
)
521+
522+
# ---- rho mean SERVER_UTILIZATION timeseries (fallback λ̂/(cμ̂)) ---
523+
util_map = (
524+
results_analyzer.get_metric_map(SampledMetricName.SERVER_UTILIZATION)
525+
)
526+
total_busy = 0.0
527+
total_samples = 0
528+
for series in util_map.values():
529+
total_busy += float(sum(series))
530+
total_samples += len(series)
531+
rho_hat = (total_busy / total_samples) if total_samples > 0 else (
532+
lambda_hat / (server_count * mu_hat)
533+
if mu_hat not in (0.0, float("inf")) else 0.0
534+
)
535+
499536
else:
500537
arrays_map = results_analyzer.get_server_event_arrays()
501538
wait_sum = 0.0
@@ -506,12 +543,12 @@ def _observed_kpis(
506543
wait_count += len(vals)
507544
wq_hat = (wait_sum / wait_count) if wait_count > 0 else 0.0
508545

509-
l_hat = lambda_hat * w_hat
510-
lq_hat = lambda_hat * wq_hat
511-
rho_hat = (
512-
lambda_hat / (server_count * mu_hat)
513-
if mu_hat not in (0.0, float("inf")) else 0.0
514-
)
546+
l_hat = lambda_hat * w_hat
547+
lq_hat = lambda_hat * wq_hat
548+
rho_hat = (
549+
lambda_hat / (server_count * mu_hat)
550+
if mu_hat not in (0.0, float("inf")) else 0.0
551+
)
515552

516553
return MMcResults(
517554
lambda_rate=lambda_hat,

src/asyncflow/runtime/actors/arrivals_generator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import numpy as np
1111

12-
from asyncflow.config.enums import SystemNodes
12+
from asyncflow.config.enums import SampledMetricName, SystemNodes
1313
from asyncflow.metrics.client import RqsClock
1414
from asyncflow.runtime.rqs_state import RequestState
1515
from asyncflow.samplers.arrivals import general_interarrivals
@@ -20,7 +20,6 @@
2020

2121
import simpy
2222

23-
from asyncflow.config.enums import SampledMetricName
2423
from asyncflow.runtime.actors.edge import EdgeRuntime
2524
from asyncflow.schemas.arrivals.generator import ArrivalsGenerator
2625
from asyncflow.schemas.settings.simulation import SimulationSettings
@@ -71,6 +70,7 @@ def __init__( # noqa: PLR0913
7170
self._l_system: int = 0
7271
# dict for the collector to have the time series
7372
self.enabled_metrics: dict[SampledMetricName, list[float]] = {}
73+
self.enabled_metrics[SampledMetricName.L_SYSTEM] = []
7474

7575

7676
def _next_id(self) -> int:

src/asyncflow/runtime/actors/load_balancer.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,14 @@ def __init__(
6565
# queue theory
6666
self._lb_waiting_time: list[float] = []
6767

68-
# counter to collect when algo is fcfs lq to compare with queue theory
69-
self._lq_lb: int = 0
7068
# dict to collect the time series
7169
self.enabled_metrics: dict[SampledMetricName, list[float]] = {}
70+
# initialize list
71+
if self.lb_config.algorithms == LbAlgorithmsName.FCFS:
72+
self.enabled_metrics[SampledMetricName.LQ_LB] = []
73+
74+
# track if LB is holding one request while waiting for a free edge
75+
self._holding_waiting: bool = False
7276

7377

7478
# Helpers FCFS
@@ -104,7 +108,6 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
104108
state: RequestState = yield self.lb_box.get() # type: ignore[assignment]
105109

106110
if self.lb_config.algorithms == LbAlgorithmsName.FCFS:
107-
self._lq_lb += 1
108111
hist = getattr(state, "history", None)
109112
if hist:
110113
last = hist[-1]
@@ -119,6 +122,9 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
119122
self.env.now,
120123
)
121124

125+
# We're now holding one request while waiting for a free edge
126+
self._holding_waiting = True
127+
122128
# The idea is the following: when a request arrives and the algorithm
123129
# is FCFS, we maintain a FIFO of available edges. If an edge connected
124130
# to a server is ready, the loop continues and (assuming no event injection
@@ -144,7 +150,8 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
144150
self._lb_waiting_time.append(waiting_time)
145151

146152
edge_rt = self.lb_out_edges[edge_id]
147-
self._lq_lb -= 1
153+
self._holding_waiting = False
154+
148155
edge_rt.transport(state)
149156
else:
150157
state.record_hop(
@@ -167,5 +174,8 @@ def lb_waiting_times(self) -> Sequence[float]:
167174

168175
@property
169176
def lq_lb(self) -> int:
170-
"""Readable version to sample the metric"""
171-
return self._lq_lb
177+
"""
178+
FCFS queue length at the LB: items still in lb_box plus the one
179+
currently held by the LB while waiting for a free edge.
180+
"""
181+
return len(self.lb_box.items) + (1 if self._holding_waiting else 0)

0 commit comments

Comments
 (0)