Edit on GitHub

vi.simulation

Creating a new Simulation is as simple as adding two lines of code to a Python file:

from vi import Config, Simulation
Simulation(Config()).run()

To add some agents to your simulation, you have two tools available to you:

  1. HeadlessSimulation.batch_spawn_agents
  2. HeadlessSimulation.spawn_agent

As a general rule, you should avoid calling HeadlessSimulation.spawn_agent in a loop as it will load the images from disk multiple times. Instead, you should call HeadlessSimulation.batch_spawn_agents with your desired agent count. This will only load the images once and cheaply distribute them across the agents.

If you want to spice things up, you can also add obstacles and sites to your simulation:

To customise your simulation, you can provide a vi.config.Config to the simulation's constructor.

from vi import Agent, Config, Simulation

(
    Simulation(Config(duration=60 * 10, image_rotation=True))
    .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
    .run()
)

Once you're finished setting up your experiment and want to start researching different parameters, then you probably don't want to open a window every time. Violet refers to this as Headless Mode.

Headless Mode allows you to run your simulation a bit faster by not calling any rendering-related code. To activate Headless Mode, simply swap Simulation for HeadlessSimulation and your GPU should now remain idle!

  1"""Creating a new `Simulation` is as simple as adding two lines of code to a Python file:
  2
  3```python
  4from vi import Config, Simulation
  5Simulation(Config()).run()
  6```
  7
  8To add some agents to your simulation, you have two tools available to you:
  91. `HeadlessSimulation.batch_spawn_agents`
 102. `HeadlessSimulation.spawn_agent`
 11
 12As a general rule, you should avoid calling `HeadlessSimulation.spawn_agent` in a loop
 13as it will load the images from disk multiple times.
 14Instead, you should call `HeadlessSimulation.batch_spawn_agents` with your desired agent count.
 15This will only load the images once and cheaply distribute them across the agents.
 16
 17If you want to spice things up, you can also add obstacles and sites to your simulation:
 18- `HeadlessSimulation.spawn_obstacle`
 19- `HeadlessSimulation.spawn_site`
 20
 21To customise your simulation, you can provide a `vi.config.Config` to the simulation's constructor.
 22
 23```python
 24from vi import Agent, Config, Simulation
 25
 26(
 27    Simulation(Config(duration=60 * 10, image_rotation=True))
 28    .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
 29    .run()
 30)
 31```
 32
 33Once you're finished setting up your experiment
 34and want to start researching different parameters,
 35then you probably don't want to open a window every time.
 36Violet refers to this as Headless Mode.
 37
 38Headless Mode allows you to run your simulation a bit faster by not calling any rendering-related code.
 39To activate Headless Mode, simply swap `Simulation` for `HeadlessSimulation` and your GPU should now remain idle!
 40"""  # noqa: D415
 41
 42from __future__ import annotations
 43
 44import random
 45from dataclasses import dataclass
 46from typing import TYPE_CHECKING
 47
 48import pygame as pg
 49from pygame.gfxdraw import hline, vline
 50from pygame.math import Vector2
 51
 52from ._static import _StaticSprite
 53from .metrics import Metrics
 54from .proximity import ProximityEngine
 55
 56
 57if TYPE_CHECKING:
 58    from typing import Any, Self
 59
 60    from pygame.event import Event
 61
 62    from .agent import Agent
 63    from .config import Config
 64
 65
 66__all__ = [
 67    "HeadlessSimulation",
 68    "Simulation",
 69]
 70
 71
 72@dataclass
 73class Shared:
 74    """A mutatable container for data that needs to be shared between `vi.agent.Agent` and `Simulation`."""
 75
 76    prng_move: random.Random
 77    """A PRNG for agent movement exclusively.
 78
 79    To make sure that the agent's movement isn't influenced by other random function calls,
 80    all agents share a decoupled PRNG for movement exclusively.
 81    This ensures that the agents will always move the exact same way given a seed.
 82    """
 83
 84    counter: int = 0
 85    """A counter that increases each tick of the simulation."""
 86
 87
 88class HeadlessSimulation[ConfigClass: Config]:
 89    """The Headless Mode equivalent of `Simulation`.
 90
 91    Headless Mode removes all the rendering logic from the simulation
 92    to not only remove the annoying simulation window from popping up every time,
 93    but to also speed up your simulation when it's GPU bound.
 94
 95    Combining Headless Mode with `vi.config.Matrix` and Python's [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) opens a realm of new possibilities.
 96    Vi's Matrix is `vi.config.Config` on steroids.
 97    It allows you to pass lists of values instead of single values on supported parameters,
 98    to then effortlessly combine each unique combination of values into its own `vi.config.Config`.
 99    When combined with [multiprocessing](https://docs.python.org/3/library/multiprocessing.html),
100    we can run multiple configs in parallel.
101
102    ```python
103    from multiprocessing import Pool
104
105    import polars as pl
106
107    from vi import Agent, Config, HeadlessSimulation, Matrix
108
109
110    def run_simulation(config: Config) -> pl.DataFrame:
111        return (
112            HeadlessSimulation(config)
113            .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
114            .run()
115            .snapshots
116        )
117
118
119    if __name__ == "__main__":
120        # We create a threadpool to run our simulations in parallel
121        with Pool() as p:
122            # The matrix will create four unique configs
123            matrix = Matrix(radius=[25, 50], seed=[1, 2])
124
125            # Create unique combinations of matrix values
126            configs = matrix.to_configs(Config)
127
128            # Combine our individual DataFrames into one big DataFrame
129            df = pl.concat(p.map(run_simulation, configs))
130
131            print(df)
132    ```
133    """
134
135    shared: Shared
136    """Attributes that are shared between the simulation and all agents."""
137
138    _running: bool = False
139    """The simulation keeps running as long as running is True."""
140
141    _area: pg.rect.Rect
142
143    # Sprite Groups
144    _all: pg.sprite.Group[Any]
145    _agents: pg.sprite.Group[Any]
146    _obstacles: pg.sprite.Group[Any]
147    _sites: pg.sprite.Group[Any]
148
149    _next_agent_id: int = 0
150    """The agent identifier to be given next."""
151
152    _next_obstacle_id: int = 0
153    """The obstacle identifier to be given next."""
154
155    _next_site_id: int = 0
156    """The site identifier to be given next."""
157
158    # Proximity
159    _proximity: ProximityEngine[ConfigClass]
160
161    # Config that's passed on to agents as well
162    config: ConfigClass
163    """The config of the simulation that's shared with all agents.
164
165    The config can be overriden when inheriting the Simulation class.
166    However, the config must always:
167
168    1. Inherit `Config`
169    2. Be decorated by `@serde`
170    """
171
172    _metrics: Metrics
173    """A collection of all the Snapshots that have been created in the simulation.
174
175    Each agent produces a Snapshot at every frame in the simulation.
176    """
177
178    def __init__(self, config: ConfigClass) -> None:
179        self.config = config
180        self._metrics = Metrics()
181
182        # Initiate the seed as early as possible.
183        random.seed(self.config.seed)
184
185        # Using a custom generator for agent movement
186        prng_move = random.Random()  # noqa: S311
187        prng_move.seed(self.config.seed)
188
189        self.shared = Shared(prng_move=prng_move)
190
191        width, height = self.config.window.as_tuple()
192        self._area = pg.rect.Rect(0, 0, width, height)
193
194        # Create sprite groups
195        self._all = pg.sprite.Group()
196        self._agents = pg.sprite.Group()
197        self._obstacles = pg.sprite.Group()
198        self._sites = pg.sprite.Group()
199
200        # Proximity!
201        self._proximity = ProximityEngine[ConfigClass](self._agents, self.config.radius)
202
203    def batch_spawn_agents(
204        self,
205        count: int,
206        agent_class: type[Agent[ConfigClass]],
207        images: list[str],
208    ) -> Self:
209        """Spawn multiple agents into the simulation.
210
211        Examples
212        --------
213        Spawn 100 `vi.agent.Agent`'s into the simulation with `examples/images/white.png` as image.
214
215        ```python
216        (
217            Simulation(Config())
218            .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
219            .run()
220        )
221        ```
222
223        """
224        # Load images once so the files don't have to be read multiple times.
225        loaded_images = self._load_images(images)
226
227        for _ in range(count):
228            agent_class(images=loaded_images, simulation=self)
229
230        return self
231
232    def spawn_agent(
233        self,
234        agent_class: type[Agent[ConfigClass]],
235        images: list[str],
236    ) -> Self:
237        """Spawn one agent into the simulation.
238
239        While you can run `spawn_agent` in a for-loop,
240        you probably want to call `batch_spawn_agents` instead
241        as `batch_spawn_agents` optimises the image loading process.
242
243        Examples
244        --------
245        Spawn a single `vi.agent.Agent` into the simulation with `examples/images/white.png` as image:
246
247        ```python
248        (
249            Simulation(Config())
250            .spawn_agent(Agent, ["examples/images/white.png"])
251            .run()
252        )
253        ```
254
255        """
256        agent_class(images=self._load_images(images), simulation=self)
257
258        return self
259
260    def spawn_obstacle(self, image_path: str, x: int, y: int) -> Self:
261        """Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle.
262
263        When agents collide with an obstacle, they will make a 180 degree turn.
264
265        Examples
266        --------
267        Spawn a single obstacle into the simulation with `examples/images/bubble-full.png` as image.
268        In addition, we place the obstacle in the centre of our window.
269
270        ```python
271        config = Config()
272        x, y = config.window.as_tuple()
273
274        (
275            Simulation(config)
276            .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2)
277            .run()
278        )
279        ```
280
281        """
282        _StaticSprite(
283            containers=[self._all, self._obstacles],
284            id=self._obstacle_id(),
285            image=self._load_image(image_path),
286            pos=Vector2((x, y)),
287        )
288
289        return self
290
291    def spawn_site(self, image_path: str, x: int, y: int) -> Self:
292        """Spawn one site into the simulation. The given coordinates will be the centre of the site.
293
294        Examples
295        --------
296        Spawn a single site into the simulation with `examples/images/site.png` as image.
297        In addition, we give specific coordinates where the site should be placed.
298
299        ```python
300        (
301            Simulation(Config())
302            .spawn_site("examples/images/site.png", x=375, y=375)
303            .run()
304        )
305        ```
306
307        """
308        _StaticSprite(
309            containers=[self._all, self._sites],
310            id=self._site_id(),
311            image=self._load_image(image_path),
312            pos=Vector2((x, y)),
313        )
314
315        return self
316
317    def run(self) -> Metrics:
318        """Run the simulation until it's ended by closing the window or when the `vi.config.Schema.duration` has elapsed."""
319        self._running = True
320
321        while self._running:
322            self.tick()
323
324        return self._metrics
325
326    def before_update(self) -> None:
327        """Run any code before the agents are updated in every tick.
328
329        You should override this method when inheriting Simulation to add your own logic.
330
331        Some examples include:
332        - Processing events from PyGame's event queue.
333        """
334
335    def after_update(self) -> None: ...
336
337    def tick(self) -> None:
338        """Advance the simulation with one tick."""
339        self.before_update()
340
341        # Update the position of all agents
342        self.__update_positions()
343
344        # If the radius was changed by an event,
345        # also update the radius in the proximity engine
346        self._proximity._set_radius(self.config.radius)
347
348        # Calculate proximity chunks
349        self._proximity.update()
350
351        # Save the replay data of all agents
352        self.__collect_replay_data()
353
354        # Update all agents
355        self._all.update()
356
357        # Merge the collected snapshots into the dataframe.
358        self._metrics._merge()
359
360        self.after_update()
361
362        # If we've reached the duration of the simulation, then stop the simulation.
363        if self.config.duration > 0 and self.shared.counter == self.config.duration:
364            self.stop()
365            return
366
367        self.shared.counter += 1
368
369    def stop(self) -> None:
370        """Stop the simulation.
371
372        The simulation isn't stopped directly.
373        Instead, the current tick is completed, after which the simulation will end.
374        """
375        self._running = False
376
377    def __collect_replay_data(self) -> None:
378        """Collect the replay data for all agents."""
379        for sprite in self._agents:
380            agent: Agent = sprite
381            agent._collect_replay_data()
382
383    def __update_positions(self) -> None:
384        """Update the position of all agents."""
385        for sprite in self._agents.sprites():
386            agent: Agent = sprite
387            agent.change_position()
388
389    def _load_image(self, path: str) -> pg.surface.Surface:
390        return pg.image.load(path)
391
392    def _load_images(self, images: list[str]) -> list[pg.surface.Surface]:
393        return [self._load_image(path) for path in images]
394
395    def _agent_id(self) -> int:
396        agent_id = self._next_agent_id
397        self._next_agent_id += 1
398
399        return agent_id
400
401    def _obstacle_id(self) -> int:
402        obstacle_id = self._next_obstacle_id
403        self._next_obstacle_id += 1
404
405        return obstacle_id
406
407    def _site_id(self) -> int:
408        site_id = self._next_site_id
409        self._next_site_id += 1
410
411        return site_id
412
413
414class Simulation[ConfigClass: Config](HeadlessSimulation[ConfigClass]):
415    """Offers the same functionality as `HeadlessSimulation`, but adds logic to automatically draw all agents, obstacles and sites to your screen.
416
417    If a custom config isn't provided when creating the simulation, the default values of `Config` will be used instead.
418    """
419
420    _background: pg.surface.Surface
421    _clock: pg.time.Clock
422    _screen: pg.surface.Surface
423
424    def __init__(self, config: ConfigClass) -> None:
425        super().__init__(config)
426
427        pg.display.init()
428        pg.display.set_caption("Violet")
429
430        size = self.config.window.as_tuple()
431        self._screen = pg.display.set_mode(size)
432
433        # Initialise background
434        self._background = pg.surface.Surface(size).convert()
435        self._background.fill((0, 0, 0))
436
437        # Show background immediately (before spawning agents)
438        self._screen.blit(self._background, (0, 0))
439        pg.display.flip()
440
441        # Initialise the clock. Used to cap FPS.
442        self._clock = pg.time.Clock()
443
444    def before_update(self) -> None:
445        rebound: list[Event] = []
446        for event in pg.event.get(eventtype=[pg.QUIT, pg.KEYDOWN]):
447            if event.type == pg.QUIT:
448                self.stop()
449            elif event.type == pg.KEYDOWN:
450                if event.key == pg.K_HOME:
451                    self.config.radius += 1
452                elif event.key == pg.K_END:
453                    self.config.radius -= 1
454                else:
455                    # If a different key was pressed, then we want to re-emit the vent
456                    # so other code can handle it.
457                    rebound.append(event)
458
459        for event in rebound:
460            pg.event.post(event)
461
462        # Clear the screen before the update so agents can draw stuff themselves too!
463        self._all.clear(self._screen, self._background)
464        self._screen.blit(self._background, (0, 0))
465
466    def after_update(self) -> None:
467        # Draw everything to the screen
468        self._all.draw(self._screen)
469
470        if self.config.visualise_chunks:
471            self.__visualise_chunks()
472
473        # Update the screen with the new image
474        pg.display.flip()
475
476        self._clock.tick(self.config.fps_limit)
477
478        current_fps = self._clock.get_fps()
479        if current_fps > 0:
480            self._metrics.fps._push(current_fps)
481
482            if self.config.print_fps:
483                print(f"FPS: {current_fps:.1f}")  # noqa: T201
484
485    def __visualise_chunks(self) -> None:
486        """Visualise the proximity chunks by drawing their borders."""
487        colour = pg.Color(255, 255, 255, 122)
488        chunk_size = self._proximity.chunk_size
489
490        width, height = self.config.window.as_tuple()
491
492        for x in range(chunk_size, width, chunk_size):
493            vline(self._screen, x, 0, height, colour)
494
495        for y in range(chunk_size, height, chunk_size):
496            hline(self._screen, 0, width, y, colour)
497
498    def _load_image(self, path: str) -> pg.surface.Surface:
499        return super()._load_image(path).convert_alpha()
class HeadlessSimulation(typing.Generic[ConfigClass]):
 89class HeadlessSimulation[ConfigClass: Config]:
 90    """The Headless Mode equivalent of `Simulation`.
 91
 92    Headless Mode removes all the rendering logic from the simulation
 93    to not only remove the annoying simulation window from popping up every time,
 94    but to also speed up your simulation when it's GPU bound.
 95
 96    Combining Headless Mode with `vi.config.Matrix` and Python's [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) opens a realm of new possibilities.
 97    Vi's Matrix is `vi.config.Config` on steroids.
 98    It allows you to pass lists of values instead of single values on supported parameters,
 99    to then effortlessly combine each unique combination of values into its own `vi.config.Config`.
100    When combined with [multiprocessing](https://docs.python.org/3/library/multiprocessing.html),
101    we can run multiple configs in parallel.
102
103    ```python
104    from multiprocessing import Pool
105
106    import polars as pl
107
108    from vi import Agent, Config, HeadlessSimulation, Matrix
109
110
111    def run_simulation(config: Config) -> pl.DataFrame:
112        return (
113            HeadlessSimulation(config)
114            .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
115            .run()
116            .snapshots
117        )
118
119
120    if __name__ == "__main__":
121        # We create a threadpool to run our simulations in parallel
122        with Pool() as p:
123            # The matrix will create four unique configs
124            matrix = Matrix(radius=[25, 50], seed=[1, 2])
125
126            # Create unique combinations of matrix values
127            configs = matrix.to_configs(Config)
128
129            # Combine our individual DataFrames into one big DataFrame
130            df = pl.concat(p.map(run_simulation, configs))
131
132            print(df)
133    ```
134    """
135
136    shared: Shared
137    """Attributes that are shared between the simulation and all agents."""
138
139    _running: bool = False
140    """The simulation keeps running as long as running is True."""
141
142    _area: pg.rect.Rect
143
144    # Sprite Groups
145    _all: pg.sprite.Group[Any]
146    _agents: pg.sprite.Group[Any]
147    _obstacles: pg.sprite.Group[Any]
148    _sites: pg.sprite.Group[Any]
149
150    _next_agent_id: int = 0
151    """The agent identifier to be given next."""
152
153    _next_obstacle_id: int = 0
154    """The obstacle identifier to be given next."""
155
156    _next_site_id: int = 0
157    """The site identifier to be given next."""
158
159    # Proximity
160    _proximity: ProximityEngine[ConfigClass]
161
162    # Config that's passed on to agents as well
163    config: ConfigClass
164    """The config of the simulation that's shared with all agents.
165
166    The config can be overriden when inheriting the Simulation class.
167    However, the config must always:
168
169    1. Inherit `Config`
170    2. Be decorated by `@serde`
171    """
172
173    _metrics: Metrics
174    """A collection of all the Snapshots that have been created in the simulation.
175
176    Each agent produces a Snapshot at every frame in the simulation.
177    """
178
179    def __init__(self, config: ConfigClass) -> None:
180        self.config = config
181        self._metrics = Metrics()
182
183        # Initiate the seed as early as possible.
184        random.seed(self.config.seed)
185
186        # Using a custom generator for agent movement
187        prng_move = random.Random()  # noqa: S311
188        prng_move.seed(self.config.seed)
189
190        self.shared = Shared(prng_move=prng_move)
191
192        width, height = self.config.window.as_tuple()
193        self._area = pg.rect.Rect(0, 0, width, height)
194
195        # Create sprite groups
196        self._all = pg.sprite.Group()
197        self._agents = pg.sprite.Group()
198        self._obstacles = pg.sprite.Group()
199        self._sites = pg.sprite.Group()
200
201        # Proximity!
202        self._proximity = ProximityEngine[ConfigClass](self._agents, self.config.radius)
203
204    def batch_spawn_agents(
205        self,
206        count: int,
207        agent_class: type[Agent[ConfigClass]],
208        images: list[str],
209    ) -> Self:
210        """Spawn multiple agents into the simulation.
211
212        Examples
213        --------
214        Spawn 100 `vi.agent.Agent`'s into the simulation with `examples/images/white.png` as image.
215
216        ```python
217        (
218            Simulation(Config())
219            .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
220            .run()
221        )
222        ```
223
224        """
225        # Load images once so the files don't have to be read multiple times.
226        loaded_images = self._load_images(images)
227
228        for _ in range(count):
229            agent_class(images=loaded_images, simulation=self)
230
231        return self
232
233    def spawn_agent(
234        self,
235        agent_class: type[Agent[ConfigClass]],
236        images: list[str],
237    ) -> Self:
238        """Spawn one agent into the simulation.
239
240        While you can run `spawn_agent` in a for-loop,
241        you probably want to call `batch_spawn_agents` instead
242        as `batch_spawn_agents` optimises the image loading process.
243
244        Examples
245        --------
246        Spawn a single `vi.agent.Agent` into the simulation with `examples/images/white.png` as image:
247
248        ```python
249        (
250            Simulation(Config())
251            .spawn_agent(Agent, ["examples/images/white.png"])
252            .run()
253        )
254        ```
255
256        """
257        agent_class(images=self._load_images(images), simulation=self)
258
259        return self
260
261    def spawn_obstacle(self, image_path: str, x: int, y: int) -> Self:
262        """Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle.
263
264        When agents collide with an obstacle, they will make a 180 degree turn.
265
266        Examples
267        --------
268        Spawn a single obstacle into the simulation with `examples/images/bubble-full.png` as image.
269        In addition, we place the obstacle in the centre of our window.
270
271        ```python
272        config = Config()
273        x, y = config.window.as_tuple()
274
275        (
276            Simulation(config)
277            .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2)
278            .run()
279        )
280        ```
281
282        """
283        _StaticSprite(
284            containers=[self._all, self._obstacles],
285            id=self._obstacle_id(),
286            image=self._load_image(image_path),
287            pos=Vector2((x, y)),
288        )
289
290        return self
291
292    def spawn_site(self, image_path: str, x: int, y: int) -> Self:
293        """Spawn one site into the simulation. The given coordinates will be the centre of the site.
294
295        Examples
296        --------
297        Spawn a single site into the simulation with `examples/images/site.png` as image.
298        In addition, we give specific coordinates where the site should be placed.
299
300        ```python
301        (
302            Simulation(Config())
303            .spawn_site("examples/images/site.png", x=375, y=375)
304            .run()
305        )
306        ```
307
308        """
309        _StaticSprite(
310            containers=[self._all, self._sites],
311            id=self._site_id(),
312            image=self._load_image(image_path),
313            pos=Vector2((x, y)),
314        )
315
316        return self
317
318    def run(self) -> Metrics:
319        """Run the simulation until it's ended by closing the window or when the `vi.config.Schema.duration` has elapsed."""
320        self._running = True
321
322        while self._running:
323            self.tick()
324
325        return self._metrics
326
327    def before_update(self) -> None:
328        """Run any code before the agents are updated in every tick.
329
330        You should override this method when inheriting Simulation to add your own logic.
331
332        Some examples include:
333        - Processing events from PyGame's event queue.
334        """
335
336    def after_update(self) -> None: ...
337
338    def tick(self) -> None:
339        """Advance the simulation with one tick."""
340        self.before_update()
341
342        # Update the position of all agents
343        self.__update_positions()
344
345        # If the radius was changed by an event,
346        # also update the radius in the proximity engine
347        self._proximity._set_radius(self.config.radius)
348
349        # Calculate proximity chunks
350        self._proximity.update()
351
352        # Save the replay data of all agents
353        self.__collect_replay_data()
354
355        # Update all agents
356        self._all.update()
357
358        # Merge the collected snapshots into the dataframe.
359        self._metrics._merge()
360
361        self.after_update()
362
363        # If we've reached the duration of the simulation, then stop the simulation.
364        if self.config.duration > 0 and self.shared.counter == self.config.duration:
365            self.stop()
366            return
367
368        self.shared.counter += 1
369
370    def stop(self) -> None:
371        """Stop the simulation.
372
373        The simulation isn't stopped directly.
374        Instead, the current tick is completed, after which the simulation will end.
375        """
376        self._running = False
377
378    def __collect_replay_data(self) -> None:
379        """Collect the replay data for all agents."""
380        for sprite in self._agents:
381            agent: Agent = sprite
382            agent._collect_replay_data()
383
384    def __update_positions(self) -> None:
385        """Update the position of all agents."""
386        for sprite in self._agents.sprites():
387            agent: Agent = sprite
388            agent.change_position()
389
390    def _load_image(self, path: str) -> pg.surface.Surface:
391        return pg.image.load(path)
392
393    def _load_images(self, images: list[str]) -> list[pg.surface.Surface]:
394        return [self._load_image(path) for path in images]
395
396    def _agent_id(self) -> int:
397        agent_id = self._next_agent_id
398        self._next_agent_id += 1
399
400        return agent_id
401
402    def _obstacle_id(self) -> int:
403        obstacle_id = self._next_obstacle_id
404        self._next_obstacle_id += 1
405
406        return obstacle_id
407
408    def _site_id(self) -> int:
409        site_id = self._next_site_id
410        self._next_site_id += 1
411
412        return site_id

The Headless Mode equivalent of Simulation.

Headless Mode removes all the rendering logic from the simulation to not only remove the annoying simulation window from popping up every time, but to also speed up your simulation when it's GPU bound.

Combining Headless Mode with vi.config.Matrix and Python's multiprocessing opens a realm of new possibilities. Vi's Matrix is vi.config.Config on steroids. It allows you to pass lists of values instead of single values on supported parameters, to then effortlessly combine each unique combination of values into its own vi.config.Config. When combined with multiprocessing, we can run multiple configs in parallel.

from multiprocessing import Pool

import polars as pl

from vi import Agent, Config, HeadlessSimulation, Matrix


def run_simulation(config: Config) -> pl.DataFrame:
    return (
        HeadlessSimulation(config)
        .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
        .run()
        .snapshots
    )


if __name__ == "__main__":
    # We create a threadpool to run our simulations in parallel
    with Pool() as p:
        # The matrix will create four unique configs
        matrix = Matrix(radius=[25, 50], seed=[1, 2])

        # Create unique combinations of matrix values
        configs = matrix.to_configs(Config)

        # Combine our individual DataFrames into one big DataFrame
        df = pl.concat(p.map(run_simulation, configs))

        print(df)
HeadlessSimulation(config: 'ConfigClass')
179    def __init__(self, config: ConfigClass) -> None:
180        self.config = config
181        self._metrics = Metrics()
182
183        # Initiate the seed as early as possible.
184        random.seed(self.config.seed)
185
186        # Using a custom generator for agent movement
187        prng_move = random.Random()  # noqa: S311
188        prng_move.seed(self.config.seed)
189
190        self.shared = Shared(prng_move=prng_move)
191
192        width, height = self.config.window.as_tuple()
193        self._area = pg.rect.Rect(0, 0, width, height)
194
195        # Create sprite groups
196        self._all = pg.sprite.Group()
197        self._agents = pg.sprite.Group()
198        self._obstacles = pg.sprite.Group()
199        self._sites = pg.sprite.Group()
200
201        # Proximity!
202        self._proximity = ProximityEngine[ConfigClass](self._agents, self.config.radius)
shared: vi.simulation.Shared

Attributes that are shared between the simulation and all agents.

config: 'ConfigClass'

The config of the simulation that's shared with all agents.

The config can be overriden when inheriting the Simulation class. However, the config must always:

  1. Inherit Config
  2. Be decorated by @serde
def batch_spawn_agents( self, count: int, agent_class: 'type[Agent[ConfigClass]]', images: list[str]) -> Self:
204    def batch_spawn_agents(
205        self,
206        count: int,
207        agent_class: type[Agent[ConfigClass]],
208        images: list[str],
209    ) -> Self:
210        """Spawn multiple agents into the simulation.
211
212        Examples
213        --------
214        Spawn 100 `vi.agent.Agent`'s into the simulation with `examples/images/white.png` as image.
215
216        ```python
217        (
218            Simulation(Config())
219            .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
220            .run()
221        )
222        ```
223
224        """
225        # Load images once so the files don't have to be read multiple times.
226        loaded_images = self._load_images(images)
227
228        for _ in range(count):
229            agent_class(images=loaded_images, simulation=self)
230
231        return self

Spawn multiple agents into the simulation.

Examples

Spawn 100 vi.agent.Agent's into the simulation with examples/images/white.png as image.

(
    Simulation(Config())
    .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
    .run()
)
def spawn_agent(self, agent_class: 'type[Agent[ConfigClass]]', images: list[str]) -> Self:
233    def spawn_agent(
234        self,
235        agent_class: type[Agent[ConfigClass]],
236        images: list[str],
237    ) -> Self:
238        """Spawn one agent into the simulation.
239
240        While you can run `spawn_agent` in a for-loop,
241        you probably want to call `batch_spawn_agents` instead
242        as `batch_spawn_agents` optimises the image loading process.
243
244        Examples
245        --------
246        Spawn a single `vi.agent.Agent` into the simulation with `examples/images/white.png` as image:
247
248        ```python
249        (
250            Simulation(Config())
251            .spawn_agent(Agent, ["examples/images/white.png"])
252            .run()
253        )
254        ```
255
256        """
257        agent_class(images=self._load_images(images), simulation=self)
258
259        return self

Spawn one agent into the simulation.

While you can run spawn_agent in a for-loop, you probably want to call batch_spawn_agents instead as batch_spawn_agents optimises the image loading process.

Examples

Spawn a single vi.agent.Agent into the simulation with examples/images/white.png as image:

(
    Simulation(Config())
    .spawn_agent(Agent, ["examples/images/white.png"])
    .run()
)
def spawn_obstacle(self, image_path: str, x: int, y: int) -> Self:
261    def spawn_obstacle(self, image_path: str, x: int, y: int) -> Self:
262        """Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle.
263
264        When agents collide with an obstacle, they will make a 180 degree turn.
265
266        Examples
267        --------
268        Spawn a single obstacle into the simulation with `examples/images/bubble-full.png` as image.
269        In addition, we place the obstacle in the centre of our window.
270
271        ```python
272        config = Config()
273        x, y = config.window.as_tuple()
274
275        (
276            Simulation(config)
277            .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2)
278            .run()
279        )
280        ```
281
282        """
283        _StaticSprite(
284            containers=[self._all, self._obstacles],
285            id=self._obstacle_id(),
286            image=self._load_image(image_path),
287            pos=Vector2((x, y)),
288        )
289
290        return self

Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle.

When agents collide with an obstacle, they will make a 180 degree turn.

Examples

Spawn a single obstacle into the simulation with examples/images/bubble-full.png as image. In addition, we place the obstacle in the centre of our window.

config = Config()
x, y = config.window.as_tuple()

(
    Simulation(config)
    .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2)
    .run()
)
def spawn_site(self, image_path: str, x: int, y: int) -> Self:
292    def spawn_site(self, image_path: str, x: int, y: int) -> Self:
293        """Spawn one site into the simulation. The given coordinates will be the centre of the site.
294
295        Examples
296        --------
297        Spawn a single site into the simulation with `examples/images/site.png` as image.
298        In addition, we give specific coordinates where the site should be placed.
299
300        ```python
301        (
302            Simulation(Config())
303            .spawn_site("examples/images/site.png", x=375, y=375)
304            .run()
305        )
306        ```
307
308        """
309        _StaticSprite(
310            containers=[self._all, self._sites],
311            id=self._site_id(),
312            image=self._load_image(image_path),
313            pos=Vector2((x, y)),
314        )
315
316        return self

Spawn one site into the simulation. The given coordinates will be the centre of the site.

Examples

Spawn a single site into the simulation with examples/images/site.png as image. In addition, we give specific coordinates where the site should be placed.

(
    Simulation(Config())
    .spawn_site("examples/images/site.png", x=375, y=375)
    .run()
)
def run(self) -> vi.metrics.Metrics:
318    def run(self) -> Metrics:
319        """Run the simulation until it's ended by closing the window or when the `vi.config.Schema.duration` has elapsed."""
320        self._running = True
321
322        while self._running:
323            self.tick()
324
325        return self._metrics

Run the simulation until it's ended by closing the window or when the vi.config.Schema.duration has elapsed.

def before_update(self) -> None:
327    def before_update(self) -> None:
328        """Run any code before the agents are updated in every tick.
329
330        You should override this method when inheriting Simulation to add your own logic.
331
332        Some examples include:
333        - Processing events from PyGame's event queue.
334        """

Run any code before the agents are updated in every tick.

You should override this method when inheriting Simulation to add your own logic.

Some examples include:

  • Processing events from PyGame's event queue.
def after_update(self) -> None:
336    def after_update(self) -> None: ...
def tick(self) -> None:
338    def tick(self) -> None:
339        """Advance the simulation with one tick."""
340        self.before_update()
341
342        # Update the position of all agents
343        self.__update_positions()
344
345        # If the radius was changed by an event,
346        # also update the radius in the proximity engine
347        self._proximity._set_radius(self.config.radius)
348
349        # Calculate proximity chunks
350        self._proximity.update()
351
352        # Save the replay data of all agents
353        self.__collect_replay_data()
354
355        # Update all agents
356        self._all.update()
357
358        # Merge the collected snapshots into the dataframe.
359        self._metrics._merge()
360
361        self.after_update()
362
363        # If we've reached the duration of the simulation, then stop the simulation.
364        if self.config.duration > 0 and self.shared.counter == self.config.duration:
365            self.stop()
366            return
367
368        self.shared.counter += 1

Advance the simulation with one tick.

def stop(self) -> None:
370    def stop(self) -> None:
371        """Stop the simulation.
372
373        The simulation isn't stopped directly.
374        Instead, the current tick is completed, after which the simulation will end.
375        """
376        self._running = False

Stop the simulation.

The simulation isn't stopped directly. Instead, the current tick is completed, after which the simulation will end.

class Simulation(vi.simulation.HeadlessSimulation[ConfigClass], typing.Generic[ConfigClass]):
415class Simulation[ConfigClass: Config](HeadlessSimulation[ConfigClass]):
416    """Offers the same functionality as `HeadlessSimulation`, but adds logic to automatically draw all agents, obstacles and sites to your screen.
417
418    If a custom config isn't provided when creating the simulation, the default values of `Config` will be used instead.
419    """
420
421    _background: pg.surface.Surface
422    _clock: pg.time.Clock
423    _screen: pg.surface.Surface
424
425    def __init__(self, config: ConfigClass) -> None:
426        super().__init__(config)
427
428        pg.display.init()
429        pg.display.set_caption("Violet")
430
431        size = self.config.window.as_tuple()
432        self._screen = pg.display.set_mode(size)
433
434        # Initialise background
435        self._background = pg.surface.Surface(size).convert()
436        self._background.fill((0, 0, 0))
437
438        # Show background immediately (before spawning agents)
439        self._screen.blit(self._background, (0, 0))
440        pg.display.flip()
441
442        # Initialise the clock. Used to cap FPS.
443        self._clock = pg.time.Clock()
444
445    def before_update(self) -> None:
446        rebound: list[Event] = []
447        for event in pg.event.get(eventtype=[pg.QUIT, pg.KEYDOWN]):
448            if event.type == pg.QUIT:
449                self.stop()
450            elif event.type == pg.KEYDOWN:
451                if event.key == pg.K_HOME:
452                    self.config.radius += 1
453                elif event.key == pg.K_END:
454                    self.config.radius -= 1
455                else:
456                    # If a different key was pressed, then we want to re-emit the vent
457                    # so other code can handle it.
458                    rebound.append(event)
459
460        for event in rebound:
461            pg.event.post(event)
462
463        # Clear the screen before the update so agents can draw stuff themselves too!
464        self._all.clear(self._screen, self._background)
465        self._screen.blit(self._background, (0, 0))
466
467    def after_update(self) -> None:
468        # Draw everything to the screen
469        self._all.draw(self._screen)
470
471        if self.config.visualise_chunks:
472            self.__visualise_chunks()
473
474        # Update the screen with the new image
475        pg.display.flip()
476
477        self._clock.tick(self.config.fps_limit)
478
479        current_fps = self._clock.get_fps()
480        if current_fps > 0:
481            self._metrics.fps._push(current_fps)
482
483            if self.config.print_fps:
484                print(f"FPS: {current_fps:.1f}")  # noqa: T201
485
486    def __visualise_chunks(self) -> None:
487        """Visualise the proximity chunks by drawing their borders."""
488        colour = pg.Color(255, 255, 255, 122)
489        chunk_size = self._proximity.chunk_size
490
491        width, height = self.config.window.as_tuple()
492
493        for x in range(chunk_size, width, chunk_size):
494            vline(self._screen, x, 0, height, colour)
495
496        for y in range(chunk_size, height, chunk_size):
497            hline(self._screen, 0, width, y, colour)
498
499    def _load_image(self, path: str) -> pg.surface.Surface:
500        return super()._load_image(path).convert_alpha()

Offers the same functionality as HeadlessSimulation, but adds logic to automatically draw all agents, obstacles and sites to your screen.

If a custom config isn't provided when creating the simulation, the default values of Config will be used instead.

Simulation(config: 'ConfigClass')
425    def __init__(self, config: ConfigClass) -> None:
426        super().__init__(config)
427
428        pg.display.init()
429        pg.display.set_caption("Violet")
430
431        size = self.config.window.as_tuple()
432        self._screen = pg.display.set_mode(size)
433
434        # Initialise background
435        self._background = pg.surface.Surface(size).convert()
436        self._background.fill((0, 0, 0))
437
438        # Show background immediately (before spawning agents)
439        self._screen.blit(self._background, (0, 0))
440        pg.display.flip()
441
442        # Initialise the clock. Used to cap FPS.
443        self._clock = pg.time.Clock()
def before_update(self) -> None:
445    def before_update(self) -> None:
446        rebound: list[Event] = []
447        for event in pg.event.get(eventtype=[pg.QUIT, pg.KEYDOWN]):
448            if event.type == pg.QUIT:
449                self.stop()
450            elif event.type == pg.KEYDOWN:
451                if event.key == pg.K_HOME:
452                    self.config.radius += 1
453                elif event.key == pg.K_END:
454                    self.config.radius -= 1
455                else:
456                    # If a different key was pressed, then we want to re-emit the vent
457                    # so other code can handle it.
458                    rebound.append(event)
459
460        for event in rebound:
461            pg.event.post(event)
462
463        # Clear the screen before the update so agents can draw stuff themselves too!
464        self._all.clear(self._screen, self._background)
465        self._screen.blit(self._background, (0, 0))

Run any code before the agents are updated in every tick.

You should override this method when inheriting Simulation to add your own logic.

Some examples include:

  • Processing events from PyGame's event queue.
def after_update(self) -> None:
467    def after_update(self) -> None:
468        # Draw everything to the screen
469        self._all.draw(self._screen)
470
471        if self.config.visualise_chunks:
472            self.__visualise_chunks()
473
474        # Update the screen with the new image
475        pg.display.flip()
476
477        self._clock.tick(self.config.fps_limit)
478
479        current_fps = self._clock.get_fps()
480        if current_fps > 0:
481            self._metrics.fps._push(current_fps)
482
483            if self.config.print_fps:
484                print(f"FPS: {current_fps:.1f}")  # noqa: T201