Skip to content

Commit 8f30a98

Browse files
committed
Implemented fcfs algo and validation of mmc models Erlang C
1 parent 07c7f7e commit 8f30a98

11 files changed

Lines changed: 445 additions & 174 deletions

File tree

asyncflow_queue_limit/asyncflow_mmc.ipynb

Lines changed: 164 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
},
2121
{
2222
"cell_type": "code",
23-
"execution_count": 9,
23+
"execution_count": 96,
2424
"id": "3e168d4a",
2525
"metadata": {},
2626
"outputs": [],
@@ -45,7 +45,7 @@
4545
},
4646
{
4747
"cell_type": "code",
48-
"execution_count": 10,
48+
"execution_count": 97,
4949
"id": "dd39a8e3",
5050
"metadata": {},
5151
"outputs": [
@@ -112,15 +112,15 @@
112112
},
113113
{
114114
"cell_type": "code",
115-
"execution_count": null,
115+
"execution_count": 98,
116116
"id": "d2937e5e",
117117
"metadata": {},
118118
"outputs": [],
119119
"source": [
120120
"def build_payload():\n",
121121
" generator = ArrivalsGenerator(\n",
122122
" id=\"rqs-1\",\n",
123-
" lambda_rps=30,\n",
123+
" lambda_rps=270,\n",
124124
" model=Distribution.POISSON\n",
125125
" )\n",
126126
"\n",
@@ -174,7 +174,7 @@
174174
" ]\n",
175175
"\n",
176176
" settings = SimulationSettings(\n",
177-
" total_simulation_time=2400,\n",
177+
" total_simulation_time=3600,\n",
178178
" sample_period_s=0.05,\n",
179179
" )\n",
180180
"\n",
@@ -201,22 +201,15 @@
201201
},
202202
{
203203
"cell_type": "code",
204-
"execution_count": 12,
204+
"execution_count": 99,
205205
"id": "d0634bc8",
206206
"metadata": {},
207207
"outputs": [
208208
{
209-
"ename": "ValidationError",
210-
"evalue": "1 validation error for TopologyNodes\n Value error, Load balancer 'lb-1' references unknown server 'srv-3'. Define it under 'servers' or remove it from 'server_covered'. [type=value_error, input_value={'servers': [Server(id='s..., ram_per_process=None)}, input_type=dict]\n For further information visit https://errors.pydantic.dev/2.11/v/value_error",
211-
"output_type": "error",
212-
"traceback": [
213-
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
214-
"\u001b[31mValidationError\u001b[39m Traceback (most recent call last)",
215-
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[12]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m payload = \u001b[43mbuild_payload\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 2\u001b[39m env = simpy.Environment()\n\u001b[32m 3\u001b[39m runner = SimulationRunner(env=env, simulation_input=payload)\n",
216-
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[11]\u001b[39m\u001b[32m, line 70\u001b[39m, in \u001b[36mbuild_payload\u001b[39m\u001b[34m()\u001b[39m\n\u001b[32m 46\u001b[39m edges = [\n\u001b[32m 47\u001b[39m LinkEdge(\u001b[38;5;28mid\u001b[39m=\u001b[33m\"\u001b[39m\u001b[33mgen-client\u001b[39m\u001b[33m\"\u001b[39m, source=\u001b[33m\"\u001b[39m\u001b[33mrqs-1\u001b[39m\u001b[33m\"\u001b[39m, target=\u001b[33m\"\u001b[39m\u001b[33mclient-1\u001b[39m\u001b[33m\"\u001b[39m,),\n\u001b[32m 48\u001b[39m LinkEdge(\u001b[38;5;28mid\u001b[39m=\u001b[33m\"\u001b[39m\u001b[33mclient-lb\u001b[39m\u001b[33m\"\u001b[39m, source=\u001b[33m\"\u001b[39m\u001b[33mclient-1\u001b[39m\u001b[33m\"\u001b[39m, target=\u001b[33m\"\u001b[39m\u001b[33mlb-1\u001b[39m\u001b[33m\"\u001b[39m, ),\n\u001b[32m (...)\u001b[39m\u001b[32m 54\u001b[39m LinkEdge(\u001b[38;5;28mid\u001b[39m=\u001b[33m\"\u001b[39m\u001b[33msrv3-client\u001b[39m\u001b[33m\"\u001b[39m, source=\u001b[33m\"\u001b[39m\u001b[33msrv-3\u001b[39m\u001b[33m\"\u001b[39m, target=\u001b[33m\"\u001b[39m\u001b[33mclient-1\u001b[39m\u001b[33m\"\u001b[39m,),\n\u001b[32m 55\u001b[39m ]\n\u001b[32m 57\u001b[39m settings = SimulationSettings(\n\u001b[32m 58\u001b[39m total_simulation_time=\u001b[32m2400\u001b[39m,\n\u001b[32m 59\u001b[39m sample_period_s=\u001b[32m0.05\u001b[39m,\n\u001b[32m 60\u001b[39m )\n\u001b[32m 62\u001b[39m payload = \u001b[43m(\u001b[49m\n\u001b[32m 63\u001b[39m \u001b[43m \u001b[49m\u001b[43mAsyncFlow\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 64\u001b[39m \u001b[43m \u001b[49m\u001b[43m.\u001b[49m\u001b[43madd_arrivals_generator\u001b[49m\u001b[43m(\u001b[49m\u001b[43mgenerator\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 65\u001b[39m \u001b[43m \u001b[49m\u001b[43m.\u001b[49m\u001b[43madd_client\u001b[49m\u001b[43m(\u001b[49m\u001b[43mclient\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 66\u001b[39m \u001b[43m \u001b[49m\u001b[43m.\u001b[49m\u001b[43madd_servers\u001b[49m\u001b[43m(\u001b[49m\u001b[43msrv1\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msrv2\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 67\u001b[39m \u001b[43m \u001b[49m\u001b[43m.\u001b[49m\u001b[43madd_load_balancer\u001b[49m\u001b[43m(\u001b[49m\u001b[43mlb\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 68\u001b[39m \u001b[43m \u001b[49m\u001b[43m.\u001b[49m\u001b[43madd_edges\u001b[49m\u001b[43m(\u001b[49m\u001b[43m*\u001b[49m\u001b[43medges\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 69\u001b[39m \u001b[43m \u001b[49m\u001b[43m.\u001b[49m\u001b[43madd_simulation_settings\u001b[49m\u001b[43m(\u001b[49m\u001b[43msettings\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m---> \u001b[39m\u001b[32m70\u001b[39m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\u001b[43m.\u001b[49m\u001b[43mbuild_payload\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 72\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m payload\n",
217-
"\u001b[36mFile \u001b[39m\u001b[32m~/projects/AsyncFlow/src/asyncflow/builder/asyncflow_builder.py:210\u001b[39m, in \u001b[36mAsyncFlow.build_payload\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 207\u001b[39m msg = \u001b[33m\"\u001b[39m\u001b[33mThe simulation settings must be instantiated before the simulation\u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 208\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(msg)\n\u001b[32m--> \u001b[39m\u001b[32m210\u001b[39m nodes = \u001b[43mTopologyNodes\u001b[49m\u001b[43m(\u001b[49m\n\u001b[32m 211\u001b[39m \u001b[43m \u001b[49m\u001b[43mservers\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_servers\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 212\u001b[39m \u001b[43m \u001b[49m\u001b[43mclient\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_client\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 213\u001b[39m \u001b[43m \u001b[49m\u001b[43mload_balancer\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_load_balancer\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 214\u001b[39m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 216\u001b[39m graph = TopologyGraph(\n\u001b[32m 217\u001b[39m nodes = nodes,\n\u001b[32m 218\u001b[39m edges=edges_u,\n\u001b[32m 219\u001b[39m )\n\u001b[32m 221\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m SimulationPayload.model_validate({\n\u001b[32m 222\u001b[39m \u001b[33m\"\u001b[39m\u001b[33marrivals\u001b[39m\u001b[33m\"\u001b[39m: \u001b[38;5;28mself\u001b[39m._arrivals,\n\u001b[32m 223\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mtopology_graph\u001b[39m\u001b[33m\"\u001b[39m: graph,\n\u001b[32m 224\u001b[39m \u001b[33m\"\u001b[39m\u001b[33msim_settings\u001b[39m\u001b[33m\"\u001b[39m: \u001b[38;5;28mself\u001b[39m._sim_settings,\n\u001b[32m 225\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mevents\u001b[39m\u001b[33m\"\u001b[39m: \u001b[38;5;28mself\u001b[39m._events \u001b[38;5;129;01mor\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m,\n\u001b[32m 226\u001b[39m })\n",
218-
"\u001b[36mFile \u001b[39m\u001b[32m~/projects/AsyncFlow/.venv/lib/python3.12/site-packages/pydantic/main.py:253\u001b[39m, in \u001b[36mBaseModel.__init__\u001b[39m\u001b[34m(self, **data)\u001b[39m\n\u001b[32m 251\u001b[39m \u001b[38;5;66;03m# `__tracebackhide__` tells pytest and some other tools to omit this function from tracebacks\u001b[39;00m\n\u001b[32m 252\u001b[39m __tracebackhide__ = \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m253\u001b[39m validated_self = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m__pydantic_validator__\u001b[49m\u001b[43m.\u001b[49m\u001b[43mvalidate_python\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mself_instance\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[32m 254\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m validated_self:\n\u001b[32m 255\u001b[39m warnings.warn(\n\u001b[32m 256\u001b[39m \u001b[33m'\u001b[39m\u001b[33mA custom validator is returning a value other than `self`.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33m'\u001b[39m\n\u001b[32m 257\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mReturning anything other than `self` from a top level model validator isn\u001b[39m\u001b[33m'\u001b[39m\u001b[33mt supported when validating via `__init__`.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33m\"\u001b[39m\n\u001b[32m 258\u001b[39m \u001b[33m'\u001b[39m\u001b[33mSee the `model_validator` docs (https://docs.pydantic.dev/latest/concepts/validators/#model-validators) for more details.\u001b[39m\u001b[33m'\u001b[39m,\n\u001b[32m 259\u001b[39m stacklevel=\u001b[32m2\u001b[39m,\n\u001b[32m 260\u001b[39m )\n",
219-
"\u001b[31mValidationError\u001b[39m: 1 validation error for TopologyNodes\n Value error, Load balancer 'lb-1' references unknown server 'srv-3'. Define it under 'servers' or remove it from 'server_covered'. [type=value_error, input_value={'servers': [Server(id='s..., ram_per_process=None)}, input_type=dict]\n For further information visit https://errors.pydantic.dev/2.11/v/value_error"
209+
"name": "stdout",
210+
"output_type": "stream",
211+
"text": [
212+
"Done.\n"
220213
]
221214
}
222215
],
@@ -225,7 +218,160 @@
225218
"env = simpy.Environment()\n",
226219
"runner = SimulationRunner(env=env, simulation_input=payload)\n",
227220
"results: ResultsAnalyzer = runner.run()\n",
228-
"print(\"Done.\")"
221+
"print(\"Done.\")\n"
222+
]
223+
},
224+
{
225+
"cell_type": "markdown",
226+
"id": "e5fe2a4a",
227+
"metadata": {},
228+
"source": [
229+
"# 3) M/M/c (FCFS) — theory vs observed comparison\n",
230+
"\n",
231+
"This section shows how we compute the **theoretical Erlang-C KPIs** (pooled queue, FCFS) and compare them against **simulation estimates**.\n",
232+
"\n",
233+
"---\n",
234+
"\n",
235+
"## Variables\n",
236+
"\n",
237+
"* **$c$**: number of identical servers.\n",
238+
"* **$\\lambda$**: global arrival rate (req/s).\n",
239+
"* **$\\mu$**: per-server service rate (req/s), $\\mu = 1/\\mathbb{E}[S]$.\n",
240+
"* **$\\rho$**: global utilization, $\\rho = \\lambda/(c\\mu)$.\n",
241+
"* **$W$**: mean time in system (queue + service).\n",
242+
"* **$W_q$**: mean waiting time in queue.\n",
243+
"* **$L$**: mean number in system.\n",
244+
"* **$L_q$**: mean number in queue.\n",
245+
"\n",
246+
"---\n",
247+
"\n",
248+
"## Theory (Erlang-C formulas)\n",
249+
"\n",
250+
"We assume **Poisson arrivals** for $\\lambda$ (taken directly from the payload).\n",
251+
"\n",
252+
"1. Offered load:\n",
253+
"\n",
254+
"$$\n",
255+
"a = \\frac{\\lambda}{\\mu}\n",
256+
"$$\n",
257+
"\n",
258+
"2. Probability system is empty:\n",
259+
"\n",
260+
"$$\n",
261+
"P_0 = \\left[\\sum_{n=0}^{c-1}\\frac{a^n}{n!} + \\frac{a^c}{c!\\,(1-\\rho)}\\right]^{-1}\n",
262+
"$$\n",
263+
"\n",
264+
"3. Probability of waiting (Erlang-C):\n",
265+
"\n",
266+
"$$\n",
267+
"P_w = \\frac{a^c}{c!\\,(1-\\rho)} \\, P_0\n",
268+
"$$\n",
269+
"\n",
270+
"4. Queue length and waiting:\n",
271+
"\n",
272+
"$$\n",
273+
"L_q = P_w \\cdot \\frac{\\rho}{1-\\rho}, \\qquad\n",
274+
"W_q = \\frac{L_q}{\\lambda}\n",
275+
"$$\n",
276+
"\n",
277+
"5. Total response time and system size:\n",
278+
"\n",
279+
"$$\n",
280+
"W = W_q + \\frac{1}{\\mu}, \\qquad\n",
281+
"L = \\lambda W\n",
282+
"$$\n",
283+
"\n",
284+
"If $\\rho \\ge 1$, the system is unstable and all metrics diverge to $+\\infty$.\n",
285+
"\n",
286+
"---\n",
287+
"\n",
288+
"## Observed (from simulation)\n",
289+
"\n",
290+
"After processing metrics:\n",
291+
"\n",
292+
"1. **Arrival rate**:\n",
293+
"\n",
294+
"$$\n",
295+
"\\lambda_{\\text{Observed}} = \\text{mean throughput (client completions)}\n",
296+
"$$\n",
297+
"\n",
298+
"2. **Service rate**:\n",
299+
"\n",
300+
"$$\n",
301+
"\\mu_{\\text{Observed}} = 1 / \\overline{S}, \\quad \\overline{S} = \\text{mean(service\\_time)}\n",
302+
"$$\n",
303+
"\n",
304+
"3. **End-to-end latency**:\n",
305+
"\n",
306+
"$$\n",
307+
"W_{\\text{Observed}} = \\text{mean(client latencies)}\n",
308+
"$$\n",
309+
"\n",
310+
"4. **Waiting time**:\n",
311+
"\n",
312+
"$$\n",
313+
"W_{q,\\text{Observed}} = \\text{mean(waiting\\_time)} \n",
314+
"$$\n",
315+
"\n",
316+
"5. **Little’s law check**:\n",
317+
"\n",
318+
"$$\n",
319+
"L_{\\text{Observed}} = \\lambda_{\\text{Observed}} W_{\\text{Observed}}, \\qquad\n",
320+
"L_{q,\\text{Observed}} = \\lambda_{\\text{Observed}} W_{q,\\text{Observed}}\n",
321+
"$$\n",
322+
"\n",
323+
"6. **Utilization**:\n",
324+
"\n",
325+
"$$\n",
326+
"\\rho_{\\text{Observed}} = \\lambda_{\\text{Observed}}/(c\\,\\mu_{\\text{Observed}})\n",
327+
"$$\n",
328+
"\n",
329+
"---\n",
330+
"\n",
331+
"## Comparison\n",
332+
"\n",
333+
"The analyzer builds a table with two columns — **Theory** (Erlang-C closed forms) and **Observed** (empirical estimates) — and reports absolute and relative deltas.\n",
334+
"\n",
335+
"This allows us to verify whether AsyncFlow reproduces the textbook M/M/c (FCFS) predictions under Poisson arrivals and exponential service.\n",
336+
"\n",
337+
"\n"
338+
]
339+
},
340+
{
341+
"cell_type": "code",
342+
"execution_count": 100,
343+
"id": "ccd7379b",
344+
"metadata": {},
345+
"outputs": [
346+
{
347+
"name": "stdout",
348+
"output_type": "stream",
349+
"text": [
350+
"=================================================================\n",
351+
"MMc (FCFS/Erlang-C) — Theory vs Observed\n",
352+
"-----------------------------------------------------------------\n",
353+
"sym metric theory observed abs rel%\n",
354+
"-----------------------------------------------------------------\n",
355+
"λ Arrival rate (1/s) 270.000000 270.258333 0.258333 0.10\n",
356+
"μ Service rate (1/s) 100.000000 100.036707 0.036707 0.04\n",
357+
"rho Utilization 0.900000 0.900531 0.000531 0.06\n",
358+
"L Mean items in sys 10.053549 10.073544 0.019994 0.20\n",
359+
"Lq Mean items in queue 7.353549 7.371934 0.018385 0.25\n",
360+
"W Mean time in sys (s) 0.037235 0.037274 0.000038 0.10\n",
361+
"Wq Mean waiting (s) 0.027235 0.027277 0.000042 0.15\n",
362+
"=================================================================\n"
363+
]
364+
}
365+
],
366+
"source": [
367+
"mmc = MMc()\n",
368+
"if mmc.is_compatible(payload):\n",
369+
" mmc.print_comparison(payload, results) \n",
370+
"else:\n",
371+
" print(\"Payload is not compatible with M/M/c:\")\n",
372+
" for reason in mmc.explain_incompatibilities(payload):\n",
373+
" print(\" -\", reason)\n",
374+
" \n"
229375
]
230376
}
231377
],

asyncflow_queue_limit/asyncflow_mmc_split.ipynb

Lines changed: 25 additions & 27 deletions
Large diffs are not rendered by default.

src/asyncflow/metrics/simulation_analyzer.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323

2424
if TYPE_CHECKING:
2525
# Standard library typing imports in type-checking block (TC003).
26-
from collections.abc import Iterable
26+
from collections.abc import Iterable, Sequence
2727

2828
from matplotlib.axes import Axes
2929
from matplotlib.lines import Line2D
3030

3131
from asyncflow.runtime.actors.client import ClientRuntime
3232
from asyncflow.runtime.actors.edge import EdgeRuntime
33+
from asyncflow.runtime.actors.load_balancer import LoadBalancerRuntime
3334
from asyncflow.runtime.actors.server import ServerRuntime
3435
from asyncflow.schemas.settings.simulation import SimulationSettings
3536

@@ -72,12 +73,14 @@ def __init__(
7273
servers: list[ServerRuntime],
7374
edges: list[EdgeRuntime],
7475
settings: SimulationSettings,
76+
lb: LoadBalancerRuntime | None = None,
7577
) -> None:
7678
"""Initialize with the runtime objects and original settings."""
7779
self._client = client
7880
self._servers = servers
7981
self._edges = edges
8082
self._settings = settings
83+
self.lb = lb
8184

8285
# Lazily computed caches
8386
self.latencies: list[float] | None = None
@@ -364,6 +367,17 @@ def get_series(self, key: SampledMetricName | str, entity_id: str) -> Series:
364367
times = (np.arange(len(vals)) * self._settings.sample_period_s).tolist()
365368
return times, vals
366369

370+
def get_lb_waiting_times(self) -> Sequence[float]:
371+
"""
372+
Return LB waiting times (FCFS). If LB missing or property absent, return empty
373+
useful when the routing algo is fcfs
374+
"""
375+
try:
376+
return () if self.lb is None else self.lb.lb_waiting_times
377+
except AttributeError:
378+
return ()
379+
380+
367381
# ─────────────────────────────────────────────
368382
# Plotting helpers
369383
# ─────────────────────────────────────────────

0 commit comments

Comments
 (0)