Skip to content

Commit 49aa42a

Browse files
committed
Example LB improvements
1 parent 0dc6c1f commit 49aa42a

11 files changed

Lines changed: 196 additions & 546 deletions
251 KB
Loading
127 KB
Loading
128 KB
Loading
-139 KB
Binary file not shown.

examples/builder_input/load_balancer/two_servers.py

Lines changed: 97 additions & 213 deletions
Original file line numberDiff line numberDiff line change
@@ -1,213 +1,104 @@
11
#!/usr/bin/env python3
22
"""
3-
Didactic example: build and run an AsyncFlow scenario **with a Load Balancer**
4-
and two backend servers, using the builder (AsyncFlow) — no YAML.
5-
6-
Topology:
7-
generator ──> client ──> LB ──> srv-1
8-
└─> srv-2
9-
srv-1 ──> client
10-
srv-2 ──> client
11-
12-
Load:
13-
~120 active users, 20 req/min each (Poisson by default).
14-
15-
Servers:
16-
srv-1: 1 CPU core, 1GB RAM, endpoint with CPU→RAM→IO
17-
srv-2: 2 CPU cores, 2GB RAM, endpoint with RAM→IO(DB-like)
18-
19-
Network:
20-
2–3ms mean (exponential) latency on each edge.
21-
22-
What this script does:
23-
1) Build Pydantic models (generator, client, LB, servers, edges, settings).
24-
2) Compose the SimulationPayload via AsyncFlow (builder pattern).
25-
3) Run the simulation with SimulationRunner.
26-
4) Print latency stats, throughput timeline, and a sampled-metrics preview.
27-
5) Save a 2×2 plot figure (latency, throughput, server queues, RAM).
3+
Didactic example: AsyncFlow with a Load Balancer and two **identical** servers.
4+
5+
Goal
6+
----
7+
Show a realistic, symmetric backend behind a load balancer, and export plots
8+
that match the public `ResultsAnalyzer` API (no YAML needed).
9+
10+
Topology
11+
--------
12+
generator ──edge──> client ──edge──> LB ──edge──> srv-1
13+
└──edge──> srv-2
14+
srv-1 ──edge──> client
15+
srv-2 ──edge──> client
16+
17+
Load model
18+
----------
19+
~120 active users, 20 requests/min each (Poisson-like aggregate by default).
20+
21+
Server model (both srv-1 and srv-2)
22+
-----------------------------------
23+
• 1 CPU cores, 2 GB RAM
24+
• Endpoint pipeline: CPU(2 ms) → RAM(128 MB) → I/O wait (15 ms)
25+
- CPU step blocks the event loop
26+
- RAM step holds a working set until the request completes
27+
- I/O step is non-blocking (event-loop friendly)
28+
29+
Network model
30+
-------------
31+
Every edge uses an exponential latency with mean 3 ms.
32+
33+
Outputs
34+
-------
35+
• Prints latency statistics to stdout
36+
• Saves, in the same folder as this script:
37+
- `lb_dashboard.png` (Latency histogram + Throughput)
38+
- `lb_server_<id>_metrics.png` for each server (Ready / I/O / RAM)
2839
"""
2940

3041
from __future__ import annotations
3142

3243
from pathlib import Path
33-
from typing import Iterable, List, Mapping, TYPE_CHECKING
3444

35-
import numpy as np
3645
import simpy
46+
import matplotlib.pyplot as plt
47+
48+
# Public AsyncFlow API (builder-style)
49+
from asyncflow import AsyncFlow
50+
from asyncflow.components import Client, Server, Edge, Endpoint, LoadBalancer
51+
from asyncflow.settings import SimulationSettings
52+
from asyncflow.workload import RqsGenerator
3753

38-
# ── AsyncFlow domain imports (match your working paths) ────────────────────────
39-
from asyncflow.builder.asyncflow_builder import AsyncFlow
54+
# Runner + Analyzer
4055
from asyncflow.runtime.simulation_runner import SimulationRunner
4156
from asyncflow.metrics.analyzer import ResultsAnalyzer
42-
from asyncflow.schemas.payload import SimulationPayload
43-
from asyncflow.schemas.workload.rqs_generator import RqsGenerator
44-
from asyncflow.schemas.settings.simulation import SimulationSettings
45-
from asyncflow.schemas.topology.endpoint import Endpoint
46-
from asyncflow.schemas.topology.nodes import Client, Server, LoadBalancer
47-
from asyncflow.schemas.topology.edges import Edge
48-
from asyncflow.config.constants import LatencyKey, SampledMetricName
49-
50-
51-
52-
53-
# ─────────────────────────────────────────────────────────────
54-
# Pretty printers (compact, readable output)
55-
# ─────────────────────────────────────────────────────────────
56-
def print_latency_stats(res: ResultsAnalyzer) -> None:
57-
stats: Mapping[LatencyKey, float] = res.get_latency_stats()
58-
print("\n════════ LATENCY STATS ════════")
59-
if not stats:
60-
print("(empty)")
61-
return
62-
63-
order: List[LatencyKey] = [
64-
LatencyKey.TOTAL_REQUESTS,
65-
LatencyKey.MEAN,
66-
LatencyKey.MEDIAN,
67-
LatencyKey.STD_DEV,
68-
LatencyKey.P95,
69-
LatencyKey.P99,
70-
LatencyKey.MIN,
71-
LatencyKey.MAX,
72-
]
73-
for key in order:
74-
if key in stats:
75-
print(f"{key.name:<20} = {stats[key]:.6f}")
76-
77-
78-
def print_throughput(res: ResultsAnalyzer) -> None:
79-
timestamps, rps = res.get_throughput_series()
80-
print("\n════════ THROUGHPUT (req/sec) ════════")
81-
if not timestamps:
82-
print("(empty)")
83-
return
84-
for t, rate in zip(timestamps, rps):
85-
print(f"t={t:4.1f}s → {rate:6.2f} rps")
86-
87-
88-
def print_sampled_preview(res: ResultsAnalyzer) -> None:
89-
sampled = res.get_sampled_metrics()
90-
print("\n════════ SAMPLED METRICS (preview) ════════")
91-
if not sampled:
92-
print("(empty)")
93-
return
94-
95-
# Keys may be enums or strings depending on your analyzer; handle both.
96-
def _name(m): # pragma: no cover
97-
return m.name if hasattr(m, "name") else str(m)
98-
99-
for metric, series in sampled.items():
100-
print(f"\n📈 {_name(metric)}:")
101-
for entity, vals in series.items():
102-
head = list(vals[:5]) if vals else []
103-
print(f" - {entity}: len={len(vals)}, first={head}")
104-
105-
106-
# ─────────────────────────────────────────────────────────────
107-
# Tiny helpers for sanity checks (optional)
108-
# ─────────────────────────────────────────────────────────────
109-
def _mean(series: Iterable[float]) -> float:
110-
arr = np.asarray(list(series), dtype=float)
111-
return float(np.mean(arr)) if arr.size else 0.0
112-
113-
114-
def run_sanity_checks(
115-
runner: SimulationRunner,
116-
res: ResultsAnalyzer,
117-
) -> None:
118-
print("\n════════ SANITY CHECKS (rough) ════════")
119-
w = runner.simulation_input.rqs_input
120-
lam_rps = (
121-
float(w.avg_active_users.mean)
122-
* float(w.avg_request_per_minute_per_user.mean)
123-
/ 60.0
124-
)
12557

126-
# Observed throughput
127-
_, rps_series = res.get_throughput_series()
128-
rps_observed = _mean(rps_series)
129-
print(
130-
f"• Mean throughput (rps) expected≈{lam_rps:.3f} "
131-
f"observed={rps_observed:.3f}"
132-
)
133-
134-
sampled = res.get_sampled_metrics()
135-
ram_series = sampled.get(SampledMetricName.RAM_IN_USE, {})
136-
ioq_series = sampled.get(SampledMetricName.EVENT_LOOP_IO_SLEEP, {})
137-
ready_series = sampled.get(SampledMetricName.READY_QUEUE_LEN, {})
138-
139-
ram_mean = _mean([_mean(v) for v in ram_series.values()]) if ram_series else 0.0
140-
ioq_mean = _mean([_mean(v) for v in ioq_series.values()]) if ioq_series else 0.0
141-
ready_mean = _mean([_mean(v) for v in ready_series.values()]) if ready_series else 0.0
14258

143-
print(f"• Mean RAM in use (MB) observed={ram_mean:.3f}")
144-
print(f"• Mean I/O queue length observed={ioq_mean:.3f}")
145-
print(f"• Mean ready queue length observed={ready_mean:.3f}")
146-
147-
148-
# ─────────────────────────────────────────────────────────────
149-
# Build the LB + 2 servers scenario via AsyncFlow (builder)
150-
# ─────────────────────────────────────────────────────────────
151-
def build_payload_with_lb() -> SimulationPayload:
152-
"""
153-
Construct the SimulationPayload programmatically using the builder:
154-
- Generator (120 users, 20 rpm each)
155-
- Client
156-
- Load balancer (round_robin) covering two servers
157-
- Two servers with distinct endpoints
158-
- Edges for all hops (gen→client, client→lb, lb→srv1/2, srv1/2→client)
159-
- Simulation settings: 600s total, sample period 20ms
160-
"""
161-
# 1) Request generator
59+
def main() -> None:
60+
# ── 1) Build the scenario programmatically (no YAML) ────────────────────
61+
# Workload (traffic generator)
16262
generator = RqsGenerator(
16363
id="rqs-1",
164-
avg_active_users={"mean": 120}, # Poisson default
165-
avg_request_per_minute_per_user={"mean": 20}, # MUST be Poisson
64+
avg_active_users={"mean": 120},
65+
avg_request_per_minute_per_user={"mean": 20},
16666
user_sampling_window=60,
16767
)
16868

169-
# 2) Client
69+
# Client
17070
client = Client(id="client-1")
17171

172-
# 3) Servers with distinct endpoints
173-
ep_srv1 = Endpoint(
72+
# Two identical servers: CPU(2ms) → RAM(128MB) → IO(15ms)
73+
endpoint = Endpoint(
17474
endpoint_name="/api",
175-
# include 'probability' if your Endpoint schema supports it
176-
probability=1.0, # remove if your Endpoint doesn't have this field
75+
probability=1.0,
17776
steps=[
17877
{"kind": "initial_parsing", "step_operation": {"cpu_time": 0.002}},
179-
{"kind": "ram", "step_operation": {"necessary_ram": 64}},
180-
{"kind": "io_wait", "step_operation": {"io_waiting_time": 0.012}},
78+
{"kind": "ram", "step_operation": {"necessary_ram": 128}},
79+
{"kind": "io_wait", "step_operation": {"io_waiting_time": 0.015}},
18180
],
18281
)
82+
18383
srv1 = Server(
18484
id="srv-1",
185-
server_resources={"cpu_cores": 1, "ram_mb": 1024},
186-
endpoints=[ep_srv1],
187-
)
188-
189-
ep_srv2 = Endpoint(
190-
endpoint_name="/api",
191-
probability=1.0, # remove if not supported in your schema
192-
steps=[
193-
{"kind": "ram", "step_operation": {"necessary_ram": 96}},
194-
{"kind": "io_db", "step_operation": {"io_waiting_time": 0.020}},
195-
],
85+
server_resources={"cpu_cores": 1, "ram_mb": 2048},
86+
endpoints=[endpoint],
19687
)
19788
srv2 = Server(
19889
id="srv-2",
199-
server_resources={"cpu_cores": 2, "ram_mb": 2048},
200-
endpoints=[ep_srv2],
90+
server_resources={"cpu_cores": 1, "ram_mb": 2048},
91+
endpoints=[endpoint],
20192
)
20293

203-
# 4) Load balancer (round_robin)
94+
# Load balancer (round-robin)
20495
lb = LoadBalancer(
20596
id="lb-1",
20697
algorithms="round_robin",
20798
server_covered={"srv-1", "srv-2"},
20899
)
209100

210-
# 5) Edges with exponential latency (2–3 ms)
101+
# Network edges (3 ms mean, exponential)
211102
edges = [
212103
Edge(
213104
id="gen-client",
@@ -219,19 +110,19 @@ def build_payload_with_lb() -> SimulationPayload:
219110
id="client-lb",
220111
source="client-1",
221112
target="lb-1",
222-
latency={"mean": 0.002, "distribution": "exponential"},
113+
latency={"mean": 0.003, "distribution": "exponential"},
223114
),
224115
Edge(
225116
id="lb-srv1",
226117
source="lb-1",
227118
target="srv-1",
228-
latency={"mean": 0.002, "distribution": "exponential"},
119+
latency={"mean": 0.003, "distribution": "exponential"},
229120
),
230121
Edge(
231122
id="lb-srv2",
232123
source="lb-1",
233124
target="srv-2",
234-
latency={"mean": 0.002, "distribution": "exponential"},
125+
latency={"mean": 0.003, "distribution": "exponential"},
235126
),
236127
Edge(
237128
id="srv1-client",
@@ -247,10 +138,10 @@ def build_payload_with_lb() -> SimulationPayload:
247138
),
248139
]
249140

250-
# 6) Simulation settings
141+
# Simulation settings
251142
settings = SimulationSettings(
252143
total_simulation_time=600,
253-
sample_period_s=0.02,
144+
sample_period_s=0.05,
254145
enabled_sample_metrics=[
255146
"ready_queue_len",
256147
"event_loop_io_sleep",
@@ -260,57 +151,50 @@ def build_payload_with_lb() -> SimulationPayload:
260151
enabled_event_metrics=["rqs_clock"],
261152
)
262153

263-
# 7) Assemble the payload via the builder
264-
flow = (
154+
# Assemble the payload with the builder
155+
payload = (
265156
AsyncFlow()
266157
.add_generator(generator)
267158
.add_client(client)
268159
.add_servers(srv1, srv2)
269160
.add_load_balancer(lb)
270161
.add_edges(*edges)
271162
.add_simulation_settings(settings)
272-
)
273-
274-
return flow.build_payload()
275-
163+
).build_payload()
276164

277-
# ─────────────────────────────────────────────────────────────
278-
# Main entry-point
279-
# ─────────────────────────────────────────────────────────────
280-
def main() -> None:
281-
"""
282-
Build → wire → run the simulation, then print diagnostics and save plots.
283-
"""
165+
# ── 2) Run the simulation ───────────────────────────────────────────────
284166
env = simpy.Environment()
285-
payload = build_payload_with_lb()
286-
287167
runner = SimulationRunner(env=env, simulation_input=payload)
288168
results: ResultsAnalyzer = runner.run()
289169

290-
# Human-friendly diagnostics
291-
print_latency_stats(results)
292-
print_throughput(results)
293-
print_sampled_preview(results)
294-
295-
# Optional sanity checks (very rough)
296-
run_sanity_checks(runner, results)
170+
# ── 3) Print a concise latency summary ──────────────────────────────────
171+
print(results.format_latency_stats())
297172

298-
# Save plots (2×2 figure)
299-
try:
300-
from matplotlib import pyplot as plt # noqa: PLC0415
173+
# ── 4) Save plots (same directory as this script) ───────────────────────
174+
out_dir = Path(__file__).parent
301175

302-
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
303-
results.plot_latency_distribution(axes[0, 0])
304-
results.plot_throughput(axes[0, 1])
305-
results.plot_server_queues(axes[1, 0])
306-
results.plot_ram_usage(axes[1, 1])
307-
fig.tight_layout()
308-
309-
out_path = Path(__file__).parent / "two_servers.png"
310-
fig.savefig(out_path)
311-
print(f"\n🖼️ Plots saved to: {out_path}")
312-
except Exception as exc: # Matplotlib not installed or plotting failed
313-
print(f"\n[plotting skipped] {exc!r}")
176+
# 4a) Dashboard: latency + throughput (single figure)
177+
fig_dash, axes = plt.subplots(
178+
1, 2, figsize=(14, 5), dpi=160, constrained_layout=True
179+
)
180+
results.plot_latency_distribution(axes[0])
181+
results.plot_throughput(axes[1])
182+
dash_path = out_dir / "lb_dashboard.png"
183+
fig_dash.savefig(dash_path, bbox_inches="tight")
184+
print(f"🖼️ Dashboard saved to: {dash_path}")
185+
186+
# 4b) Per-server figures: Ready | I/O | RAM (one row per server)
187+
for sid in results.list_server_ids():
188+
fig_srv, axs = plt.subplots(
189+
1, 3, figsize=(18, 4.2), dpi=160, constrained_layout=True
190+
)
191+
results.plot_single_server_ready_queue(axs[0], sid)
192+
results.plot_single_server_io_queue(axs[1], sid)
193+
results.plot_single_server_ram(axs[2], sid)
194+
fig_srv.suptitle(f"Server metrics — {sid}", fontsize=16)
195+
srv_path = out_dir / f"lb_server_{sid}_metrics.png"
196+
fig_srv.savefig(srv_path, bbox_inches="tight")
197+
print(f"🖼️ Per-server plots saved to: {srv_path}")
314198

315199

316200
if __name__ == "__main__":

0 commit comments

Comments
 (0)