Skip to content

Commit 07c7f7e

Browse files
committed
Implemented fcfs with tests
1 parent c66eeb9 commit 07c7f7e

12 files changed

Lines changed: 767 additions & 73 deletions

File tree

asyncflow_queue_limit/asyncflow_mm1.ipynb

Lines changed: 27 additions & 31 deletions
Large diffs are not rendered by default.

asyncflow_queue_limit/asyncflow_mmc.ipynb

Lines changed: 253 additions & 0 deletions
Large diffs are not rendered by default.

asyncflow_queue_limit/asyncflow_mmc_split.ipynb

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
"id": "870337dc",
66
"metadata": {},
77
"source": [
8-
"# AsyncFlow — MMc Theory vs Simulation (Guided Notebook)\n",
8+
"# AsyncFlow — MMc split Theory vs Simulation (Guided Notebook)\n",
99
"\n",
1010
"This notebook shows how to:\n",
1111
"\n",
1212
"1. Make imports work inside a notebook (src-layout or package install)\n",
13-
"2. Build a **multi-server** scenario compatible with **M/M/c** assumptions\n",
13+
"2. Build a **multi-server** scenario compatible with **M/M/c** assumptions in the case of n parallel M/M/1\n",
1414
"3. Run the simulation and collect results\n",
1515
"4. Compare theory vs observed KPIs (pretty-printed table)\n",
1616
"5. Plot the standard dashboards (latency, throughput, server time series)\n",
@@ -84,15 +84,11 @@
8484
" Service times follow an **Exponential** distribution with mean \\$E\\[S]\\$ (service rate \\$\\mu = 1/E\\[S]\\$). No RAM/IO steps are included in the pipeline.\n",
8585
"\n",
8686
"* **Load balancer with round-robin dispatch**\n",
87-
" A **single load balancer** is required when \\$c > 1\\$. It splits arrivals **round-robin** across servers, so each server has its own local queue.\n",
87+
" A **single load balancer** is required when \\$c > 1\\$. It splits arrivals **randomly** across servers, so each server has its own local queue.\n",
8888
" This corresponds to a **split M/M/c** model, not the textbook pooled queue.\n",
8989
"\n",
90-
"* **Deterministic, very small network latency**\n",
91-
" All edges have **fixed latency** \\$\\ll 1,\\mathrm{ms}\\$. Queueing behavior is therefore dominated by CPU service, closely matching textbook assumptions.\n",
92-
"\n",
9390
"* **“Poisson arrivals” via the generator**\n",
94-
" Arrivals are produced by the same **two-stage, windowed Poisson sampler**: in each user-sampling window \\$\\Delta\\$, we draw the active users \\$U\\$ (Poisson or Normal, per config).\n",
95-
" Within the window, arrivals are a **homogeneous Poisson process** with rate \\$\\Lambda = U \\cdot \\lambda\\_r/60\\$.\n",
91+
" \n",
9692
"\n",
9793
" \n",
9894
"\n",
@@ -234,7 +230,7 @@
234230
"id": "da98b8b9",
235231
"metadata": {},
236232
"source": [
237-
"## 3) MMc (Round-Robin) — theory vs observed comparison\n",
233+
"## 3) MMc (Random) — theory vs observed comparison\n",
238234
"\n",
239235
"If the payload violates MMc assumptions, a readable error is shown instead.\n",
240236
"This section matches exactly what the analyzer computes **now**: a **split random model** (not the pooled FCFS/Erlang-C model). When we add **FCFS** in the LB, we’ll also expose the textbook Erlang-C formulas side-by-side.\n",
@@ -366,12 +362,7 @@
366362
"\n",
367363
"### Why small deltas appear\n",
368364
"\n",
369-
"* **Windowed arrivals:** The generator uses a **windowed** active-user sampler; across windows the rate is piecewise-constant (Cox process), not one global Poisson.\n",
370-
"* **Finite horizon / warm-up:** Short runs and lack of warm-up bias early windows.\n",
371-
"* **Network latency:** Small but non-zero link delays add a constant component to $W$.\n",
372-
"\n",
373-
"\n",
374-
"Increasing the run length, keeping edges tiny and deterministic, and ensuring identical servers will usually shrink Theory–Observed gaps. "
365+
"* **Finite horizon / warm-up:** Short runs and lack of warm-up bias early windows.\n"
375366
]
376367
},
377368
{

src/asyncflow/config/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class LbAlgorithmsName(StrEnum):
134134
ROUND_ROBIN = "round_robin"
135135
LEAST_CONNECTIONS = "least_connection"
136136
RANDOM = "random"
137+
FCFS = "fcfs"
137138

138139

139140
# ======================================================================

src/asyncflow/queue_theory_analysis/mmc.py

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from __future__ import annotations
77

8+
import math
89
import sys
910
from typing import TYPE_CHECKING, Literal, TextIO, TypedDict, cast
1011
from weakref import WeakSet
@@ -127,8 +128,8 @@ def _check_topology(self, payload: SimulationPayload) -> list[str]:
127128
errs.append("for c=1 the load balancer must be absent.")
128129
elif lb is None:
129130
errs.append("for c>1 a load balancer is required.")
130-
elif lb.algorithms != LbAlgorithmsName.RANDOM:
131-
errs.append("only random is supported for the split M/M/c model.")
131+
elif lb.algorithms not in {LbAlgorithmsName.RANDOM, LbAlgorithmsName.FCFS}:
132+
errs.append("supported lb algorithms: RANDOM (split) or FCFS (pooled).")
132133

133134
return errs
134135

@@ -350,11 +351,14 @@ def _build_observed_params(
350351
)
351352

352353
# ────────────────────────────────────────────────────────────────────
353-
# Closed form (split RR): theory → MMcResults
354+
# Closed form (Random): theory → MMcResults split model ( n parallel mm1)
354355
# ────────────────────────────────────────────────────────────────────
355356

356357
def _theoretical_kpis_split(self, payload: SimulationPayload) -> MMcResults:
357-
"""Closed forms for RR split: λ_i=λ/c; Wq=rho/(μ-λ_i); W=1/μ+Wq; Lq=λWq; L=λW"""
358+
"""
359+
Closed forms for Random split: λ_i=λ/c;
360+
Wq=rho/(μ-λ_i); W=1/μ+Wq; Lq=λWq; L=λW
361+
"""
358362
self.validate_or_raise(payload)
359363
params = self._build_params(payload)
360364

@@ -394,10 +398,68 @@ def _theoretical_kpis_split(self, payload: SimulationPayload) -> MMcResults:
394398
Wq=wq,
395399
)
396400

401+
402+
def _theoretical_mmc_erlang_c_kpis(
403+
self,
404+
lambda_rate: float,
405+
mu_rate: float,
406+
c: int,
407+
) -> MMcResults:
408+
"""Closed forms for pooled M/M/c (FCFS, Erlang-C)."""
409+
rho = self._rho_from(lambda_rate, c, mu_rate)
410+
if rho >= 1.0:
411+
inf = float("inf")
412+
return MMcResults(
413+
lambda_rate=lambda_rate,
414+
mu_rate=mu_rate,
415+
c=c,
416+
rho=rho,
417+
L=inf, Lq=inf, W=inf, Wq=inf,
418+
)
419+
420+
a = lambda_rate / mu_rate # offered traffic
421+
# P0
422+
s = sum((a**n) / math.factorial(n) for n in range(c))
423+
tail = (a**c) / math.factorial(c) * (1.0 / (1.0 - rho))
424+
p0 = 1.0 / (s + tail)
425+
pw = ((a**c) / math.factorial(c)) * (1.0 / (1.0 - rho)) * p0
426+
lq = pw * (rho / (1.0 - rho))
427+
wq = lq / lambda_rate
428+
w = wq + 1.0 / mu_rate
429+
lam = lambda_rate * w
430+
431+
return MMcResults(
432+
lambda_rate=lambda_rate,
433+
mu_rate=mu_rate,
434+
c=c,
435+
rho=rho,
436+
L=lam,
437+
Lq=lq,
438+
W=w,
439+
Wq=wq,
440+
)
441+
442+
def _theoretical_kpis_pooled(self, payload: SimulationPayload) -> MMcResults:
443+
"""Closed forms for pooled M/M/c (LB=FCFS, central queue)."""
444+
# riusa i builder che hai già
445+
params = self._build_params(payload)
446+
return self._theoretical_mmc_erlang_c_kpis(
447+
lambda_rate=params["lambda_rate"],
448+
mu_rate=params["mu_rate"],
449+
c=params["c"],
450+
)
451+
452+
397453
def evaluate(self, payload: SimulationPayload) -> MMcResults:
398-
"""Public entry-point: return closed-form KPIs for split RR."""
454+
"""Return closed-form KPIs: split (RR/RANDOM) or pooled (FCFS)."""
455+
self.validate_or_raise(payload)
456+
lb = payload.topology_graph.nodes.load_balancer
457+
if (lb is not None) and (lb.algorithms == LbAlgorithmsName.FCFS):
458+
return self._theoretical_kpis_pooled(payload)
459+
# default: split (random/RR)
399460
return self._theoretical_kpis_split(payload)
400461

462+
401463
# ────────────────────────────────────────────────────────────────────
402464
# Observed KPIs → MMcResults (coerenti con definizioni sopra)
403465
# ────────────────────────────────────────────────────────────────────
@@ -483,7 +545,12 @@ def compare_against_run(
483545
"""Build a table with theory vs observed and deltas."""
484546
self.validate_or_raise(payload)
485547

486-
theory = self._theoretical_kpis_split(payload)
548+
lb = payload.topology_graph.nodes.load_balancer
549+
if (lb is not None) and (lb.algorithms == LbAlgorithmsName.FCFS):
550+
theory = self._theoretical_kpis_pooled(payload)
551+
else:
552+
theory = self._theoretical_kpis_split(payload)
553+
487554
observed = self._observed_kpis(payload, results_analyzer)
488555

489556
rows: list[MMcKPIRow] = []
@@ -514,16 +581,21 @@ def add(symbol: str, name: str, key: MMcResultKey) -> None:
514581

515582
return rows
516583

517-
518-
# --------------- PRETTY PRINT -------------------
519-
520584
# ────────────────────────────────────────────────────────────────────
521585
# Pretty table (KPI):
522586
# ────────────────────────────────────────────────────────────────────
587+
588+
def _title_for(self, payload: "SimulationPayload") -> str:
589+
lb = payload.topology_graph.nodes.load_balancer
590+
if lb is not None and lb.algorithms == LbAlgorithmsName.FCFS:
591+
return "MMc (FCFS/Erlang-C) — Theory vs Observed"
592+
# default to random split when no LB or non-FCFS
593+
return "MMc (Random split) — Theory vs Observed"
594+
523595
@staticmethod
524596
def _format_kpi_table(
525-
rows: list[MMcKPIRow],
526-
title: str = "MMc (RR) — Theory vs Observed",
597+
rows: list["MMcKPIRow"],
598+
title: str = "MMc — Theory vs Observed",
527599
) -> str:
528600
data = [
529601
(
@@ -536,12 +608,11 @@ def _format_kpi_table(
536608
)
537609
for r in rows
538610
]
539-
540611
headers = ("sym", "metric", "theory", "observed", "abs", "rel%")
541612
w_sym = max(len(headers[0]), *(len(d[0]) for d in data))
542613
w_met = max(len(headers[1]), *(len(d[1]) for d in data))
543-
w_th = max(len(headers[2]), *(len(d[2]) for d in data))
544-
w_ob = max(len(headers[3]), *(len(d[3]) for d in data))
614+
w_th = max(len(headers[2]), *(len(d[2]) for d in data))
615+
w_ob = max(len(headers[3]), *(len(d[3]) for d in data))
545616
w_abs = max(len(headers[4]), *(len(d[4]) for d in data))
546617
w_rel = max(len(headers[5]), *(len(d[5]) for d in data))
547618

@@ -564,12 +635,12 @@ def _format_kpi_table(
564635

565636
def compare_and_format(
566637
self,
567-
payload: SimulationPayload,
568-
results_analyzer: ResultsAnalyzer,
638+
payload: "SimulationPayload",
639+
results_analyzer: "ResultsAnalyzer",
569640
) -> str:
570-
"""Return a formatted KPI table for theory vs observed."""
571641
rows = self.compare_against_run(payload, results_analyzer)
572-
return self._format_kpi_table(rows)
642+
title = self._title_for(payload)
643+
return self._format_kpi_table(rows, title=title)
573644

574645
def print_comparison(
575646
self,

src/asyncflow/runner/simulation.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
from collections import OrderedDict
6+
from functools import partial
67
from itertools import chain
78
from pathlib import Path
89
from types import MappingProxyType
@@ -12,6 +13,7 @@
1213
import simpy
1314
import yaml
1415

16+
from asyncflow.config.enums import LbAlgorithmsName
1517
from asyncflow.metrics.collector import SampledMetricCollector
1618
from asyncflow.metrics.simulation_analyzer import ResultsAnalyzer
1719
from asyncflow.resources.registry import ResourcesRuntime
@@ -229,7 +231,9 @@ def _build_edges(self) -> None:
229231
msg = f"Unknown runtime for {edge.target!r}"
230232
raise TypeError(msg)
231233

232-
234+
# prepare a dict of edges runtime with unique key as a tuple
235+
# once all are ready we have to assign each one to the source node
236+
# to allow the transport of the state through the edge
233237
self._edges_runtime[(edge.source, edge.target)] = (
234238
EdgeRuntime(
235239
env=self.env,
@@ -251,10 +255,31 @@ def _build_edges(self) -> None:
251255
edge.source,
252256
edge.target)
253257
]
258+
259+
# since multiple edges fan out from the LB we use a dict
260+
# to have access in o(1) and assign the correct Edge runtime
254261
elif isinstance(source_object, LoadBalancerRuntime):
255262
self._lb_out_edges[edge.id] = (
256263
self._edges_runtime[(edge.source, edge.target)]
257264
)
265+
if isinstance(target_object, ServerRuntime) and (
266+
source_object.lb_config.algorithms == LbAlgorithmsName.FCFS
267+
):
268+
# if the target is a server we pass the callback to comunicate
269+
# to the Lb that the server is free
270+
271+
assert self._lb_runtime is not None
272+
lb_rt = self._lb_runtime
273+
edge_id = edge.id
274+
275+
# We use functools.partial here to "pre-bind" the edge_id argument
276+
# of LoadBalancerRuntime.mark_free.
277+
# This turns it into a zero-argument
278+
# callable, so the ServerRuntime can simply
279+
# call notify_server_free()
280+
# when done, without needing to know its own edge_id.
281+
target_object.notify_server_free = partial(lb_rt.mark_free, edge_id)
282+
258283
else:
259284
msg = f"Unknown runtime for {edge.source!r}"
260285
raise TypeError(msg)

src/asyncflow/runtime/actors/load_balancer.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
"""Definition of the node represented by the LB in the simulation"""
22

33

4-
from collections import OrderedDict
4+
from collections import OrderedDict, defaultdict
55
from collections.abc import Generator
66
from typing import (
77
TYPE_CHECKING,
88
)
99

1010
import simpy
1111

12-
from asyncflow.config.enums import SystemNodes
12+
from asyncflow.config.enums import LbAlgorithmsName, SystemNodes
1313
from asyncflow.runtime.actors.edge import EdgeRuntime
14-
from asyncflow.runtime.actors.routing.lb_algorithms import LB_TABLE
14+
from asyncflow.runtime.actors.routing.lb_algorithms import (
15+
LB_TABLE,
16+
fcfs_picker,
17+
)
1518
from asyncflow.schemas.topology.nodes import LoadBalancer
1619

1720
if TYPE_CHECKING:
@@ -55,7 +58,33 @@ def __init__(
5558
self.lb_out_edges = lb_out_edges
5659
self.lb_box = lb_box
5760

58-
61+
# we need to keep track of the server that are busy
62+
# right now we don't have multiprocess so each server has
63+
# one core, to handle multiprocess in the feature we would
64+
# have to add a new structure called capacity
65+
self._busy: dict[str, int] = defaultdict(int)
66+
67+
# In the case of a FCFS algo or similar if no servers
68+
# are available we need to wait before starting the algo
69+
self._wait_ev: simpy.Event | None = None
70+
71+
# Helpers FCFS
72+
# We manage here the logic for this algo because we need to be carefull
73+
# to dont have collision with eventinjectionruntime, that's why we are
74+
# not popping and rotating from the ordered dict with key edge_id and
75+
# value edge runtime, but we manage adding an extra structure to know
76+
# the state of the servers through the edges connecting them with the LB
77+
def mark_busy(self, edge_id: str) -> None:
78+
"""Helper to manage the state of the available server"""
79+
self._busy[edge_id] += 1
80+
81+
def mark_free(self, edge_id: str) -> None:
82+
"""Helper to manage the state of the available server"""
83+
b = self._busy.get(edge_id, 0)
84+
if b > 0:
85+
self._busy[edge_id] = b - 1
86+
if self._wait_ev is not None and not self._wait_ev.triggered:
87+
self._wait_ev.succeed()
5988

6089
def _forwarder(self) -> Generator[simpy.Event, None, None]:
6190
"""Updtate the state before passing it to another node"""
@@ -68,8 +97,24 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
6897
self.env.now,
6998
)
7099

71-
out_edge = LB_TABLE[self.lb_config.algorithms](self.lb_out_edges)
72-
out_edge.transport(state)
100+
if self.lb_config.algorithms == LbAlgorithmsName.FCFS:
101+
# FCFS: wait for an edge to be free
102+
pick = fcfs_picker(self.lb_out_edges, self._busy)
103+
while pick is None:
104+
if self._wait_ev is None or self._wait_ev.triggered:
105+
self._wait_ev = self.env.event()
106+
yield self._wait_ev
107+
pick = fcfs_picker(self.lb_out_edges, self._busy)
108+
109+
edge_id, edge_rt = pick
110+
# mark the server as occupied
111+
self.mark_busy(edge_id)
112+
# transport the request
113+
edge_rt.transport(state)
114+
else:
115+
# Algo different from FCFS
116+
edge_rt = LB_TABLE[self.lb_config.algorithms](self.lb_out_edges)
117+
edge_rt.transport(state)
73118

74119
def start(self) -> simpy.Process:
75120
"""Initialization of the simpy process for the LB"""

0 commit comments

Comments
 (0)