vi
A smol simulator framework built on top of PyGame.
- Automatic agent wandering behaviour
- Fully deterministic simulations with PRNG seeds
- Install Violet with a simple
pip install
😎 - Matrix-powered multi-threaded configuration testing
- Polars-powered simulation analytics
- Replay-able simulations with a ✨ time machine ✨
- Type-safe configuration system (with TOML support)
Want to get started right away? Check out the Violet Starter Kit!
A Tour of Violet
Violet is all about creating and researching collective intelligence. And what's better than programming your own little video game to do so?
Under the hood, Violet is using PyGame to render all those fancy pixels to your very screen. However, you don't need to have any knowledge of PyGame whatsoever to get started!
Instead, you only need to familiarise yourself with Violet's vi.agent
and vi.simulation
modules.
You see, all that's needed to create a video game is a new instance of the vi.Simulation
class.
>>> from vi import Simulation
>>> Simulation()
Yep, that's all it takes to start your simulation. However, it closes right away...
To actually run
your simulation, you need to add one more thing.
>>> Simulation().run()
There, now you have a nice black window appear in the middle of your screen!
Obviously, creating a black window doesn't really count as having created a simulation just yet. So let's add some agents!
>>> from vi import Agent, Simulation
>>> (
... Simulation()
... .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
... .run()
... )
We now have 100 agents wiggling around our screen. They just don't particularly do anything just yet. Sure, they're moving, but we want them to interact!
That's where you come in! Customising the behaviour of the agents is central to creating simulations. And it couldn't be any easier to customise what these agents do!
Violet is built around the concept of inheritance.
In a nutshell, inheritance allows you to well, inherit,
all the functions and properties that the Agent
class exposes.
But that's not all!
Inheritance also allows you to build on top of the Agent
class,
allowing you to implement your agent's custom behaviour simply by implementing an update
method.
Let's see some inheritance in action.
>>> class MyAgent(Agent): ...
Here we create a new class called MyAgent
, which inherits the Agent
class.
Now, the three dots simply tells Python that we still have to add things to it.
But before we do that, we can already start a simulation with our new MyAgent
.
>>> (
... Simulation()
... .batch_spawn_agents(100, MyAgent, ["examples/images/white.png"])
... .run()
... )
If you look very closely, you can see that there's absolutely no difference! The agent is still aimlessly wandering about. And that's the key takeaway of inheritance. Without adding or changing things, our agent's behaviour will be exactly the same!
To actually customise our agent's behaviour,
we have to implement the vi.Agent.update
method.
The update
method will run on every tick of the simulation,
after the positions of all the agents have been updated.
Now, you might wonder why the agent's position isn't updated in the update
method.
Long story short, in most collective intelligence simulations,
your agent needs to interact with other agents.
Therefore, it needs to have a sense of which other agents are in its proximity.
To make sure that whenever agent A sees agent B, agent B also sees agent A,
we need to separate the position changing code from the behavioural code.
This way, either both agents see each other, or they don't see each other at all.
On the topic of agents seeing each other,
let's implement our own update
method in which we want our agent
to change its colour whenever it sees another agent.
To be able to change colours, we need to supply two different images to our Agent. I'll go with a white and a red circle.
>>> (
... Simulation()
... .batch_spawn_agents(100, MyAgent, [
... "examples/images/white.png",
... "examples/images/red.png",
... ])
... .run()
... )
If we run the simulation again, we see that the white image is picked automatically, as it is the first image in the list of images.
Now, to change the currently selected image,
we can call the vi.Agent.change_image
method.
>>> class MyAgent(Agent):
... def update(self):
... self.change_image(1)
Remember that the first index of a Python list is 0, so changing the image to index 1 actually selects our second image. Cool! Our agents are now red instead! 😎
However, we don't want them to always be red. Instead, we only want our agents to turn red when they see at least one other agent.
Fortunately, Violet already keeps track of who sees who for us, so we don't have to implement any code ourselves!
Instead, we can utilise the vi.Agent.in_proximity_accuracy
method.
This will return a vi.ProximityIter
which we can use to count the number of agents in proximity.
Let's say that if we count at least one other agent, we turn red. Otherwise, we change the image back to white!
>>> class MyAgent(Agent):
... def update(self):
... if self.in_proximity_accuracy().count() >= 1:
... self.change_image(1)
... else:
... self.change_image(0)
And there we have it! A disco of a simulation with agents swapping colours whenever they get close to someone.
Now, there's way more to explore. But this should give you an impression on how to change the behaviour of your agents with Violet. Explore some of the modules on the left and experiment away!
1""" 2A smol simulator framework built on top of [PyGame](https://www.pygame.org/docs/). 3 4- Automatic agent wandering behaviour 5- Fully deterministic simulations with PRNG seeds 6- Install Violet with a simple `pip install` 😎 7- Matrix-powered multi-threaded configuration testing 8- [Polars](https://github.com/pola-rs/polars/)-powered simulation analytics 9- Replay-able simulations with a ✨ time machine ✨ 10- Type-safe configuration system (with TOML support) 11 12Want to get started right away? 13Check out the [Violet Starter Kit](https://github.com/m-rots/violet-starter-kit)! 14 15# A Tour of Violet 16 17Violet is all about creating and researching collective intelligence. 18And what's better than programming your own little video game to do so? 19 20Under the hood, Violet is using [PyGame](https://www.pygame.org/docs/) to render all those fancy pixels to your very screen. 21However, you don't need to have any knowledge of PyGame whatsoever to get started! 22 23Instead, you only need to familiarise yourself with Violet's `vi.agent` and `vi.simulation` modules. 24You see, all that's needed to create a *video game* is a new instance of the `vi.simulation.Simulation` class. 25 26>>> from vi import Simulation 27>>> Simulation() 28 29Yep, that's all it takes to start your simulation. 30However, it closes right away... 31 32To actually `run` your simulation, you need to add one more thing. 33 34>>> Simulation().run() 35 36There, now you have a nice black window appear in the middle of your screen! 37 38Obviously, creating a black window doesn't really count as having created a simulation just yet. 39So let's add some agents! 40 41>>> from vi import Agent, Simulation 42>>> ( 43... Simulation() 44... .batch_spawn_agents(100, Agent, ["examples/images/white.png"]) 45... .run() 46... ) 47 48We now have 100 agents wiggling around our screen. 49They just don't particularly do anything just yet. 50Sure, they're *moving*, but we want them to interact! 51 52That's where you come in! 53Customising the behaviour of the agents is central to creating simulations. 54And it couldn't be any easier to customise what these agents do! 55 56Violet is built around the concept of *inheritance*. 57In a nutshell, inheritance allows you to well, *inherit*, 58all the functions and properties that the `Agent` class exposes. 59But that's not all! 60Inheritance also allows you to build on top of the `Agent` class, 61allowing you to implement your agent's custom behaviour simply by implementing an `update` method. 62 63Let's see some inheritance in action. 64 65>>> class MyAgent(Agent): ... 66 67Here we create a new class called `MyAgent`, which inherits the `Agent` class. 68Now, the three dots simply tells Python that we still have to add things to it. 69But before we do that, we can already start a simulation with our new `MyAgent`. 70 71>>> ( 72... Simulation() 73... .batch_spawn_agents(100, MyAgent, ["examples/images/white.png"]) 74... .run() 75... ) 76 77If you look very closely, you can see that there's absolutely no difference! 78The agent is still aimlessly wandering about. 79And that's the key takeaway of inheritance. 80Without adding or changing things, 81our agent's behaviour will be exactly the same! 82 83To actually customise our agent's behaviour, 84we have to implement the `vi.agent.Agent.update` method. 85 86The `update` method will run on every tick of the simulation, 87after the positions of all the agents have been updated. 88Now, you might wonder why the agent's position isn't updated in the `update` method. 89Long story short, in most collective intelligence simulations, 90your agent needs to interact with other agents. 91Therefore, it needs to have a sense of which other agents are in its proximity. 92To make sure that whenever agent A sees agent B, agent B also sees agent A, 93we need to separate the position changing code from the behavioural code. 94This way, either both agents see each other, or they don't see each other at all. 95 96On the topic of agents seeing each other, 97let's implement our own `update` method in which we want our agent 98to change its colour whenever it sees another agent. 99 100To be able to change colours, 101we need to supply two different images to our Agent. 102I'll go with a white and a red circle. 103 104>>> ( 105... Simulation() 106... .batch_spawn_agents(100, MyAgent, [ 107... "examples/images/white.png", 108... "examples/images/red.png", 109... ]) 110... .run() 111... ) 112 113If we run the simulation again, 114we see that the white image is picked automatically, 115as it is the first image in the list of images. 116 117Now, to change the currently selected image, 118we can call the `vi.agent.Agent.change_image` method. 119 120>>> class MyAgent(Agent): 121... def update(self): 122... self.change_image(1) 123 124Remember that the first index of a Python list is 0, 125so changing the image to index 1 actually selects our second image. 126Cool! Our agents are now red instead! 😎 127 128However, we don't want them to always be red. 129Instead, we only want our agents to turn red when they see at least one other agent. 130 131Fortunately, Violet already keeps track of who sees who for us, so we don't have to implement any code ourselves! 132Instead, we can utilise the `vi.agent.Agent.in_proximity_accuracy` method. 133This will return a `vi.proximity.ProximityIter` which we can use to count the number of agents in proximity. 134 135Let's say that if we count at least one other agent, we turn red. Otherwise, we change the image back to white! 136 137>>> class MyAgent(Agent): 138... def update(self): 139... if self.in_proximity_accuracy().count() >= 1: 140... self.change_image(1) 141... else: 142... self.change_image(0) 143 144And there we have it! 145A disco of a simulation with agents swapping colours whenever they get close to someone. 146 147Now, there's way more to explore. 148But this should give you an impression on how to change the behaviour of your agents with Violet. 149Explore some of the modules on the left and experiment away! 150""" 151from dataclasses import dataclass 152 153from pygame.math import Vector2 154from serde.de import deserialize 155from serde.se import serialize 156 157from .agent import Agent 158from .config import Config, Matrix, Window 159from .metrics import Fps, Metrics 160from .proximity import ProximityIter 161from .replay import TimeMachine 162from .simulation import HeadlessSimulation, Simulation 163from .util import probability 164 165 166__all__ = [ 167 # agent 168 "Agent", 169 170 # config 171 "Config", 172 "Matrix", 173 "Window", 174 175 # metrics 176 "Fps", 177 "Metrics", 178 179 # proximity 180 "ProximityIter", 181 182 # replay 183 "TimeMachine", 184 185 # simulation 186 "HeadlessSimulation", 187 "Simulation", 188 189 # util 190 "probability", 191 192 # re-exports 193 "dataclass", 194 "deserialize", 195 "serialize", 196 "Vector2" 197]
37class Agent(Sprite): 38 """ 39 The `Agent` class is home to Violet's various additions and is 40 built on top of [PyGame's Sprite](https://www.pygame.org/docs/ref/sprite.html) class. 41 42 While you can simply add this `Agent` class to your simulations by calling `batch_spawn_agents`, 43 the agents won't actually do anything interesting. 44 Sure, they'll move around the screen very energetically, but they don't *interact* with each other. 45 46 That's where you come in! 47 By inheriting this `Agent` class you can make use of all its utilities, 48 while also programming the behaviour of your custom agent. 49 """ 50 51 id: int 52 """The unique identifier of the agent.""" 53 54 config: Config 55 """The config of the simulation that's shared with all agents. 56 57 The config can be overriden when inheriting the Agent class. 58 However, the config must always: 59 60 1. Inherit `Config` 61 2. Be decorated by `@deserialize` and `@dataclass` 62 """ 63 64 shared: Shared 65 """Attributes that are shared between the simulation and all agents.""" 66 67 _images: list[Surface] 68 """A list of images which you can use to change the current image of the agent.""" 69 70 _image_index: int 71 """The currently selected image.""" 72 73 _image_cache: Optional[tuple[int, Surface]] = None 74 75 _area: Rect 76 """The area in which the agent is free to move.""" 77 78 move: Vector2 79 """A two-dimensional vector representing the delta between the agent's current and next position. 80 In collective intelligence scenarios, it represents the agent's velocity. 81 82 Note that `move` isn't added to the agent's `pos` automatically. 83 Instead, you should manually add the move delta to `pos`, like so: 84 85 >>> self.pos += self.move * delta_time 86 87 Where `delta_time` is the time elapsed during the movement (usually user defined). 88 Read https://gafferongames.com/post/integration_basics/ to learn more about it. 89 90 Declaring move as `Vector2(2, 1)` indicates that the agent will be moving 2 pixels along the x axis and 91 1 pixel along the y axis. You can use the `Vector2` class to calculate its magnitude by calling `length` 92 which returns the speed or rate of change of the agent's position for collective intelligence scenarios. 93 94 This property is also used to automatically rotate the agent's image 95 when `vi.config.Schema.image_rotation` is enabled. 96 """ 97 98 pos: Vector2 99 """The current (centre) position of the agent.""" 100 101 _obstacles: Group 102 """The group of obstacles the agent can collide with.""" 103 104 _sites: Group 105 """The group of sites on which the agent can appear.""" 106 107 __simulation: HeadlessSimulation 108 109 _moving: bool = True 110 """The agent's movement will freeze when moving is set to False.""" 111 112 def __init__( 113 self, 114 images: list[Surface], 115 simulation: HeadlessSimulation, 116 pos: Optional[Vector2] = None, 117 move: Optional[Vector2] = None, 118 ): 119 Sprite.__init__(self, simulation._all, simulation._agents) 120 121 self.__simulation = simulation 122 123 self.id = simulation._agent_id() 124 self.config = simulation.config 125 self.shared = simulation.shared 126 127 # Default to first image in case no image is given 128 self._image_index = 0 129 self._images = images 130 131 self._obstacles = simulation._obstacles 132 self._sites = simulation._sites 133 134 self._area = simulation._area 135 self.move = ( 136 move 137 if move is not None 138 else random_angle(self.config.movement_speed, prng=self.shared.prng_move) 139 ) 140 141 # On spawn acts like the __init__ for non-pygame facing state. 142 # It could be used to override the initial image if necessary. 143 self.on_spawn() 144 145 if pos is not None: 146 self.pos = pos 147 148 if not hasattr(self, "pos"): 149 # Keep changing the position until the position no longer collides with any obstacle. 150 while True: 151 self.pos = random_pos(self._area, prng=self.shared.prng_move) 152 153 obstacle_hit = pg.sprite.spritecollideany(self, self._obstacles, pg.sprite.collide_mask) # type: ignore 154 if not bool(obstacle_hit) and self._area.contains(self.rect): 155 break 156 157 def _get_image(self) -> Surface: 158 image = self._images[self._image_index] 159 160 if self.config.image_rotation: 161 angle = self.move.angle_to(Vector2((0, -1))) 162 163 return pg.transform.rotate(image, angle) 164 else: 165 return image 166 167 @property 168 def image(self) -> Surface: 169 """The read-only image that's used for PyGame's rendering.""" 170 171 if self._image_cache is not None: 172 frame, image = self._image_cache 173 if frame == self.shared.counter: 174 return image 175 176 new_image = self._get_image() 177 self._image_cache = (self.shared.counter, new_image) 178 179 return new_image 180 181 @property 182 def center(self) -> tuple[int, int]: 183 """The read-only centre position of the agent.""" 184 185 return round_pos(self.pos) 186 187 @property 188 def rect(self) -> Rect: 189 """The read-only bounding-box that's used for PyGame's rendering.""" 190 191 rect = self.image.get_rect() 192 rect.center = self.center 193 194 return rect 195 196 @property 197 def mask(self) -> Mask: 198 """A read-only bit-mask of the image used for collision detection with obstacles and sites.""" 199 200 return pg.mask.from_surface(self.image) 201 202 def update(self): 203 """Run your own agent logic at every tick of the simulation. 204 Every frame of the simulation, this method is called automatically for every agent of the simulation. 205 206 To add your own logic, inherit the `Agent` class and override this method with your own. 207 """ 208 209 ... 210 211 def obstacle_intersections( 212 self, scale: float = 1 213 ) -> Generator[Vector2, None, None]: 214 """Retrieve the centre coordinates of all obstacle intersections. 215 216 If you not only want to check for obstacle collision, 217 but also want to retrieve the coordinates of pixel groups 218 that are colliding with your agent, then `obstacle_intersections` is for you! 219 220 If you only have one obstacle in your environment, 221 but it doesn't have a fill, only a stroke, 222 then your agent could possibly be colliding with different areas of the stroke. 223 Therefore, this method checks the bitmasks of both the agent and the obstacle 224 to calculate the overlapping bitmask. 225 From this overlapping bitmask, 226 the centre coordinates of all groups of connected pixels are returned. 227 228 To emulate a bigger (or smaller) radius, 229 you can pass along the `scale` option. 230 A scale of 2 makes your agent twice as big, 231 but only for calculating the intersecting bitmasks. 232 """ 233 234 mask = pg.mask.from_surface(self.image) 235 236 # Scale the mask to the desired size 237 width, height = mask.get_size() 238 mask = mask.scale((width * scale, height * scale)) 239 240 # Align the mask to the centre position of the agent 241 rect = mask.get_rect() 242 rect.center = self.center 243 244 for sprite in self._obstacles.sprites(): 245 obstacle: _StaticSprite = sprite # type: ignore 246 247 # Calculate the mask offset 248 x = obstacle.rect.x - rect.x 249 y = obstacle.rect.y - rect.y 250 251 overlap = mask.overlap_mask(obstacle.mask, offset=(x, y)) 252 253 # For some reason PyGame has the wrong type hint here (single instead of list) 254 overlap_rects: list[pg.rect.Rect] = overlap.get_bounding_rects() # type: ignore 255 256 for overlap_rect in overlap_rects: 257 # Undo the offset 258 overlap_rect.x += rect.x 259 overlap_rect.y += rect.y 260 261 # Return the centre coordinates of the connected pixels 262 yield Vector2(overlap_rect.center) 263 264 def on_spawn(self): 265 """Run any code when the agent is spawned into the simulation. 266 267 This method is a replacement for `__init__`, which you should not overwrite directly. 268 Instead, you can make alterations to your Agent within this function instead. 269 270 You should override this method when inheriting Agent to add your own logic. 271 272 Some examples include: 273 - Changing the image or state of your Agent depending on its assigned identifier. 274 """ 275 276 ... 277 278 def there_is_no_escape(self) -> bool: 279 """Pac-Man-style teleport the agent to the other side of the screen when it is outside of the playable area. 280 281 Examples 282 -------- 283 284 An agent that will always move to the right, until snapped back to reality. 285 286 >>> class MyAgent(Agent): 287 ... def on_spawn(self): 288 ... self.move = Vector2((5, 0)) 289 ... 290 ... def change_position(self): 291 ... self.there_is_no_escape() 292 ... self.pos += self.move 293 """ 294 295 changed = False 296 297 if self.pos.x < self._area.left: 298 changed = True 299 self.pos.x = self._area.right 300 301 if self.pos.x > self._area.right: 302 changed = True 303 self.pos.x = self._area.left 304 305 if self.pos.y < self._area.top: 306 changed = True 307 self.pos.y = self._area.bottom 308 309 if self.pos.y > self._area.bottom: 310 changed = True 311 self.pos.y = self._area.top 312 313 return changed 314 315 def change_position(self): 316 """Change the position of the agent. 317 318 The agent's new position is calculated as follows: 319 1. The agent checks whether it's outside of the visible screen area. 320 If this is the case, then the agent will be teleported to the other edge of the screen. 321 2. If the agent collides with any obstacles, then the agent will turn around 180 degrees. 322 3. If the agent has not collided with any obstacles, it will have the opportunity to slightly change its angle. 323 """ 324 if not self._moving: 325 return 326 327 changed = self.there_is_no_escape() 328 329 prng = self.shared.prng_move 330 331 # Always calculate the random angle so a seed could be used. 332 deg = prng.uniform(-30, 30) 333 334 # Only update angle if the agent was teleported to a different area of the simulation. 335 if changed: 336 self.move.rotate_ip(deg) 337 338 # Obstacle Avoidance 339 obstacle_hit = pg.sprite.spritecollideany(self, self._obstacles, pg.sprite.collide_mask) # type: ignore 340 collision = bool(obstacle_hit) 341 342 # Reverse direction when colliding with an obstacle. 343 if collision and not self._still_stuck: 344 self.move.rotate_ip(180) 345 self._still_stuck = True 346 347 if not collision: 348 self._still_stuck = False 349 350 # Random opportunity to slightly change angle. 351 # Probabilities are pre-computed so a seed could be used. 352 should_change_angle = prng.random() 353 deg = prng.uniform(-10, 10) 354 355 # Only allow the angle opportunity to take place when no collisions have occured. 356 # This is done so an agent always turns 180 degrees. Any small change in the number of degrees 357 # allows the agent to possibly escape the obstacle. 358 if not collision and not self._still_stuck and 0.25 > should_change_angle: 359 self.move.rotate_ip(deg) 360 361 # Actually update the position at last. 362 self.pos += self.move 363 364 def in_proximity_accuracy(self) -> ProximityIter[tuple[Self, float]]: 365 """Retrieve other agents that are in the `vi.config.Schema.radius` of the current agent. 366 367 This proximity method calculates the distances between agents to determine whether 368 an agent is in the radius of the current agent. 369 370 To calculate the distances between agents, up to four chunks have to be retrieved from the Proximity Engine. 371 These extra lookups, in combination with the vector distance calculation, have a noticable impact on performance. 372 Note however that this performance impact is only noticable with >1000 agents. 373 374 If you want to speed up your simulation at the cost of some accuracy, 375 consider using the `in_proximity_performance` method instead. 376 377 This function doesn't return the agents as a `list` or as a `set`. 378 Instead, you are given a `vi.proximity.ProximityIter`, a small wrapper around a Python generator. 379 380 Examples 381 -------- 382 383 Count the number of agents that are in proximity 384 and change to image `1` if there is at least one agent nearby. 385 386 >>> class MyAgent(Agent): 387 ... def update(self): 388 ... in_proximity = self.in_proximity_accuracy().count() 389 ... 390 ... if in_proximity >= 1: 391 ... self.change_image(1) 392 ... else: 393 ... self.change_image(0) 394 395 Kill the first `Human` agent that's in proximity. 396 397 >>> class Zombie(Agent): 398 ... def update(self): 399 ... human = ( 400 ... self.in_proximity_accuracy() 401 ... .without_distance() 402 ... .filter_kind(Human) # 👈 don't want to kill other zombies 403 ... .first() # 👈 can return None if no humans are around 404 ... ) 405 ... 406 ... if human is not None: 407 ... human.kill() 408 409 Calculate the average distance of agents that are in proximity. 410 411 >>> class Heimerdinger(Agent): 412 ... def update(self): 413 ... in_proximity = list(self.in_proximity_accuracy()) 414 ... 415 ... dist_sum = sum(dist for agent, dist in in_proximity) 416 ... 417 ... dist_avg = ( 418 ... dist_sum / len(in_proximity) 419 ... if len(in_proximity) > 0 420 ... else 0 421 ... ) 422 """ 423 424 return self.__simulation._proximity.in_proximity_accuracy(self) 425 426 def in_proximity_performance(self) -> ProximityIter[Self]: 427 """Retrieve other agents that are in the `vi.config.Schema.radius` of the current agent. 428 429 Unlike `in_proximity_accuracy`, this proximity method does not calculate the distances between agents. 430 Instead, it retrieves agents that are in the same chunk as the current agent, 431 irrespective of their position within the chunk. 432 433 If you find yourself limited by the performance of `in_proximity_accuracy`, 434 you can swap the function call for this one instead. 435 This performance method roughly doubles the frame rates of the simulation. 436 """ 437 438 return self.__simulation._proximity.in_proximity_performance(self) 439 440 def on_site(self) -> bool: 441 """Check whether the agent is currently on a site. 442 443 Examples 444 -------- 445 446 Stop the agent's movement when it reaches a site (think of a nice beach). 447 448 >>> class TravellingAgent(Agent): 449 ... def update(self): 450 ... if self.on_site(): 451 ... # crave that star damage 452 ... self.freeze_movement() 453 """ 454 455 return self.on_site_id() is not None 456 457 def on_site_id(self) -> Optional[int]: 458 """Get the identifier of the site the agent is currently on. 459 460 Examples 461 -------- 462 463 Stop the agent's movement when it reaches a site to inspect. 464 In addition, the current site identifier is saved to the DataFrame. 465 466 ```python 467 class SiteInspector(Agent): 468 def update(self): 469 site_id = self.on_site_id() 470 471 # Save the site id to the DataFrame 472 self.save_data("site", site_id) 473 474 # bool(site_id) would be inaccurate 475 # as a site_id of 0 will return False. 476 # Therefore, we check whether it is not None instead. 477 if site_id is not None: 478 # Inspect the site 479 self.freeze_movement() 480 ``` 481 """ 482 483 site: Optional[_StaticSprite] = pg.sprite.spritecollideany(self, self._sites, pg.sprite.collide_mask) # type: ignore 484 485 if site is not None: 486 return site.id 487 else: 488 return None 489 490 def freeze_movement(self): 491 """Freeze the movement of the agent. The movement can be continued by calling `continue_movement`.""" 492 493 self._moving = False 494 495 def continue_movement(self): 496 """Continue the movement of the agent from before its movement was frozen.""" 497 498 self._moving = True 499 500 def change_image(self, index: int): 501 """Change the image of the agent. 502 503 If you want to change the agent's image to the second image in the images list, 504 then you can change the image to index 1: 505 >>> self.change_image(1) 506 """ 507 508 self._image_index = index 509 510 def save_data(self, column: str, value: Any): 511 """Add extra data to the simulation's metrics. 512 513 The following data is collected automatically: 514 - agent identifier 515 - current frame 516 - x and y coordinates 517 518 Examples 519 -------- 520 521 Saving the number of agents that are currently in proximity: 522 523 >>> class MyAgent(Agent): 524 ... def update(self): 525 ... in_proximity = self.in_proximity_accuracy().count() 526 ... self.save_data("in_proximity", in_proximity) 527 """ 528 529 self.__simulation._metrics._temporary_snapshots[column].append(value) 530 531 def _collect_replay_data(self): 532 """Add the minimum data needed for the replay simulation to the dataframe.""" 533 534 x, y = self.center 535 snapshots = self.__simulation._metrics._temporary_snapshots 536 537 snapshots["frame"].append(self.shared.counter) 538 snapshots["id"].append(self.id) 539 540 snapshots["x"].append(x) 541 snapshots["y"].append(y) 542 543 snapshots["image_index"].append(self._image_index) 544 545 if self.config.image_rotation: 546 angle = self.move.angle_to(Vector2((0, -1))) 547 snapshots["angle"].append(round(angle)) 548 549 def __copy__(self) -> Self: 550 """Create a copy of this agent and spawn it into the simulation. 551 552 Note that this only copies the `pos` and `move` vectors. 553 Any other attributes will be set to their defaults. 554 """ 555 556 cls = self.__class__ 557 agent = cls(self._images, self.__simulation) 558 559 # We want to make sure to copy the position and movement vectors. 560 # Otherwise, the original as well as the clone will continue sharing these vectors. 561 agent.pos = self.pos.copy() 562 agent.move = self.move.copy() 563 564 return agent 565 566 def reproduce(self) -> Self: 567 """Create a new agent and spawn it into the simulation. 568 569 All values will be reset to their defaults, 570 except for the agent's position and movement vector. 571 These will be cloned from the original agent. 572 """ 573 574 return copy(self) 575 576 def kill(self): 577 """Kill the agent. 578 579 While violence usually isn't the right option, 580 sometimes you just want to murder some innocent agents inside your little computer. 581 582 But fear not! 583 By *killing* the agent, all you're really doing is removing it from the simulation. 584 """ 585 586 super().kill() 587 588 def is_dead(self) -> bool: 589 """Is the agent dead? 590 591 Death occurs when `kill` is called. 592 """ 593 return not self.is_alive() 594 595 def is_alive(self) -> bool: 596 """Is the agent still alive? 597 598 Death occurs when `kill` is called. 599 """ 600 601 return super().alive()
The Agent
class is home to Violet's various additions and is
built on top of PyGame's Sprite class.
While you can simply add this Agent
class to your simulations by calling batch_spawn_agents
,
the agents won't actually do anything interesting.
Sure, they'll move around the screen very energetically, but they don't interact with each other.
That's where you come in!
By inheriting this Agent
class you can make use of all its utilities,
while also programming the behaviour of your custom agent.
112 def __init__( 113 self, 114 images: list[Surface], 115 simulation: HeadlessSimulation, 116 pos: Optional[Vector2] = None, 117 move: Optional[Vector2] = None, 118 ): 119 Sprite.__init__(self, simulation._all, simulation._agents) 120 121 self.__simulation = simulation 122 123 self.id = simulation._agent_id() 124 self.config = simulation.config 125 self.shared = simulation.shared 126 127 # Default to first image in case no image is given 128 self._image_index = 0 129 self._images = images 130 131 self._obstacles = simulation._obstacles 132 self._sites = simulation._sites 133 134 self._area = simulation._area 135 self.move = ( 136 move 137 if move is not None 138 else random_angle(self.config.movement_speed, prng=self.shared.prng_move) 139 ) 140 141 # On spawn acts like the __init__ for non-pygame facing state. 142 # It could be used to override the initial image if necessary. 143 self.on_spawn() 144 145 if pos is not None: 146 self.pos = pos 147 148 if not hasattr(self, "pos"): 149 # Keep changing the position until the position no longer collides with any obstacle. 150 while True: 151 self.pos = random_pos(self._area, prng=self.shared.prng_move) 152 153 obstacle_hit = pg.sprite.spritecollideany(self, self._obstacles, pg.sprite.collide_mask) # type: ignore 154 if not bool(obstacle_hit) and self._area.contains(self.rect): 155 break
The config of the simulation that's shared with all agents.
The config can be overriden when inheriting the Agent class. However, the config must always:
- Inherit
Config
- Be decorated by
@deserialize
and@dataclass
A two-dimensional vector representing the delta between the agent's current and next position. In collective intelligence scenarios, it represents the agent's velocity.
Note that move
isn't added to the agent's pos
automatically.
Instead, you should manually add the move delta to pos
, like so:
>>> self.pos += self.move * delta_time
Where delta_time
is the time elapsed during the movement (usually user defined).
Read https://gafferongames.com/post/integration_basics/ to learn more about it.
Declaring move as Vector2(2, 1)
indicates that the agent will be moving 2 pixels along the x axis and
1 pixel along the y axis. You can use the Vector2
class to calculate its magnitude by calling length
which returns the speed or rate of change of the agent's position for collective intelligence scenarios.
This property is also used to automatically rotate the agent's image
when vi.config.Schema.image_rotation
is enabled.
A read-only bit-mask of the image used for collision detection with obstacles and sites.
202 def update(self): 203 """Run your own agent logic at every tick of the simulation. 204 Every frame of the simulation, this method is called automatically for every agent of the simulation. 205 206 To add your own logic, inherit the `Agent` class and override this method with your own. 207 """ 208 209 ...
Run your own agent logic at every tick of the simulation. Every frame of the simulation, this method is called automatically for every agent of the simulation.
To add your own logic, inherit the Agent
class and override this method with your own.
211 def obstacle_intersections( 212 self, scale: float = 1 213 ) -> Generator[Vector2, None, None]: 214 """Retrieve the centre coordinates of all obstacle intersections. 215 216 If you not only want to check for obstacle collision, 217 but also want to retrieve the coordinates of pixel groups 218 that are colliding with your agent, then `obstacle_intersections` is for you! 219 220 If you only have one obstacle in your environment, 221 but it doesn't have a fill, only a stroke, 222 then your agent could possibly be colliding with different areas of the stroke. 223 Therefore, this method checks the bitmasks of both the agent and the obstacle 224 to calculate the overlapping bitmask. 225 From this overlapping bitmask, 226 the centre coordinates of all groups of connected pixels are returned. 227 228 To emulate a bigger (or smaller) radius, 229 you can pass along the `scale` option. 230 A scale of 2 makes your agent twice as big, 231 but only for calculating the intersecting bitmasks. 232 """ 233 234 mask = pg.mask.from_surface(self.image) 235 236 # Scale the mask to the desired size 237 width, height = mask.get_size() 238 mask = mask.scale((width * scale, height * scale)) 239 240 # Align the mask to the centre position of the agent 241 rect = mask.get_rect() 242 rect.center = self.center 243 244 for sprite in self._obstacles.sprites(): 245 obstacle: _StaticSprite = sprite # type: ignore 246 247 # Calculate the mask offset 248 x = obstacle.rect.x - rect.x 249 y = obstacle.rect.y - rect.y 250 251 overlap = mask.overlap_mask(obstacle.mask, offset=(x, y)) 252 253 # For some reason PyGame has the wrong type hint here (single instead of list) 254 overlap_rects: list[pg.rect.Rect] = overlap.get_bounding_rects() # type: ignore 255 256 for overlap_rect in overlap_rects: 257 # Undo the offset 258 overlap_rect.x += rect.x 259 overlap_rect.y += rect.y 260 261 # Return the centre coordinates of the connected pixels 262 yield Vector2(overlap_rect.center)
Retrieve the centre coordinates of all obstacle intersections.
If you not only want to check for obstacle collision,
but also want to retrieve the coordinates of pixel groups
that are colliding with your agent, then obstacle_intersections
is for you!
If you only have one obstacle in your environment, but it doesn't have a fill, only a stroke, then your agent could possibly be colliding with different areas of the stroke. Therefore, this method checks the bitmasks of both the agent and the obstacle to calculate the overlapping bitmask. From this overlapping bitmask, the centre coordinates of all groups of connected pixels are returned.
To emulate a bigger (or smaller) radius,
you can pass along the scale
option.
A scale of 2 makes your agent twice as big,
but only for calculating the intersecting bitmasks.
264 def on_spawn(self): 265 """Run any code when the agent is spawned into the simulation. 266 267 This method is a replacement for `__init__`, which you should not overwrite directly. 268 Instead, you can make alterations to your Agent within this function instead. 269 270 You should override this method when inheriting Agent to add your own logic. 271 272 Some examples include: 273 - Changing the image or state of your Agent depending on its assigned identifier. 274 """ 275 276 ...
Run any code when the agent is spawned into the simulation.
This method is a replacement for __init__
, which you should not overwrite directly.
Instead, you can make alterations to your Agent within this function instead.
You should override this method when inheriting Agent to add your own logic.
Some examples include:
- Changing the image or state of your Agent depending on its assigned identifier.
278 def there_is_no_escape(self) -> bool: 279 """Pac-Man-style teleport the agent to the other side of the screen when it is outside of the playable area. 280 281 Examples 282 -------- 283 284 An agent that will always move to the right, until snapped back to reality. 285 286 >>> class MyAgent(Agent): 287 ... def on_spawn(self): 288 ... self.move = Vector2((5, 0)) 289 ... 290 ... def change_position(self): 291 ... self.there_is_no_escape() 292 ... self.pos += self.move 293 """ 294 295 changed = False 296 297 if self.pos.x < self._area.left: 298 changed = True 299 self.pos.x = self._area.right 300 301 if self.pos.x > self._area.right: 302 changed = True 303 self.pos.x = self._area.left 304 305 if self.pos.y < self._area.top: 306 changed = True 307 self.pos.y = self._area.bottom 308 309 if self.pos.y > self._area.bottom: 310 changed = True 311 self.pos.y = self._area.top 312 313 return changed
Pac-Man-style teleport the agent to the other side of the screen when it is outside of the playable area.
Examples
An agent that will always move to the right, until snapped back to reality.
>>> class MyAgent(Agent):
... def on_spawn(self):
... self.move = Vector2((5, 0))
...
... def change_position(self):
... self.there_is_no_escape()
... self.pos += self.move
315 def change_position(self): 316 """Change the position of the agent. 317 318 The agent's new position is calculated as follows: 319 1. The agent checks whether it's outside of the visible screen area. 320 If this is the case, then the agent will be teleported to the other edge of the screen. 321 2. If the agent collides with any obstacles, then the agent will turn around 180 degrees. 322 3. If the agent has not collided with any obstacles, it will have the opportunity to slightly change its angle. 323 """ 324 if not self._moving: 325 return 326 327 changed = self.there_is_no_escape() 328 329 prng = self.shared.prng_move 330 331 # Always calculate the random angle so a seed could be used. 332 deg = prng.uniform(-30, 30) 333 334 # Only update angle if the agent was teleported to a different area of the simulation. 335 if changed: 336 self.move.rotate_ip(deg) 337 338 # Obstacle Avoidance 339 obstacle_hit = pg.sprite.spritecollideany(self, self._obstacles, pg.sprite.collide_mask) # type: ignore 340 collision = bool(obstacle_hit) 341 342 # Reverse direction when colliding with an obstacle. 343 if collision and not self._still_stuck: 344 self.move.rotate_ip(180) 345 self._still_stuck = True 346 347 if not collision: 348 self._still_stuck = False 349 350 # Random opportunity to slightly change angle. 351 # Probabilities are pre-computed so a seed could be used. 352 should_change_angle = prng.random() 353 deg = prng.uniform(-10, 10) 354 355 # Only allow the angle opportunity to take place when no collisions have occured. 356 # This is done so an agent always turns 180 degrees. Any small change in the number of degrees 357 # allows the agent to possibly escape the obstacle. 358 if not collision and not self._still_stuck and 0.25 > should_change_angle: 359 self.move.rotate_ip(deg) 360 361 # Actually update the position at last. 362 self.pos += self.move
Change the position of the agent.
The agent's new position is calculated as follows:
- The agent checks whether it's outside of the visible screen area. If this is the case, then the agent will be teleported to the other edge of the screen.
- If the agent collides with any obstacles, then the agent will turn around 180 degrees.
- If the agent has not collided with any obstacles, it will have the opportunity to slightly change its angle.
364 def in_proximity_accuracy(self) -> ProximityIter[tuple[Self, float]]: 365 """Retrieve other agents that are in the `vi.config.Schema.radius` of the current agent. 366 367 This proximity method calculates the distances between agents to determine whether 368 an agent is in the radius of the current agent. 369 370 To calculate the distances between agents, up to four chunks have to be retrieved from the Proximity Engine. 371 These extra lookups, in combination with the vector distance calculation, have a noticable impact on performance. 372 Note however that this performance impact is only noticable with >1000 agents. 373 374 If you want to speed up your simulation at the cost of some accuracy, 375 consider using the `in_proximity_performance` method instead. 376 377 This function doesn't return the agents as a `list` or as a `set`. 378 Instead, you are given a `vi.proximity.ProximityIter`, a small wrapper around a Python generator. 379 380 Examples 381 -------- 382 383 Count the number of agents that are in proximity 384 and change to image `1` if there is at least one agent nearby. 385 386 >>> class MyAgent(Agent): 387 ... def update(self): 388 ... in_proximity = self.in_proximity_accuracy().count() 389 ... 390 ... if in_proximity >= 1: 391 ... self.change_image(1) 392 ... else: 393 ... self.change_image(0) 394 395 Kill the first `Human` agent that's in proximity. 396 397 >>> class Zombie(Agent): 398 ... def update(self): 399 ... human = ( 400 ... self.in_proximity_accuracy() 401 ... .without_distance() 402 ... .filter_kind(Human) # 👈 don't want to kill other zombies 403 ... .first() # 👈 can return None if no humans are around 404 ... ) 405 ... 406 ... if human is not None: 407 ... human.kill() 408 409 Calculate the average distance of agents that are in proximity. 410 411 >>> class Heimerdinger(Agent): 412 ... def update(self): 413 ... in_proximity = list(self.in_proximity_accuracy()) 414 ... 415 ... dist_sum = sum(dist for agent, dist in in_proximity) 416 ... 417 ... dist_avg = ( 418 ... dist_sum / len(in_proximity) 419 ... if len(in_proximity) > 0 420 ... else 0 421 ... ) 422 """ 423 424 return self.__simulation._proximity.in_proximity_accuracy(self)
Retrieve other agents that are in the vi.config.Schema.radius
of the current agent.
This proximity method calculates the distances between agents to determine whether an agent is in the radius of the current agent.
To calculate the distances between agents, up to four chunks have to be retrieved from the Proximity Engine. These extra lookups, in combination with the vector distance calculation, have a noticable impact on performance. Note however that this performance impact is only noticable with >1000 agents.
If you want to speed up your simulation at the cost of some accuracy,
consider using the in_proximity_performance
method instead.
This function doesn't return the agents as a list
or as a set
.
Instead, you are given a vi.ProximityIter
, a small wrapper around a Python generator.
Examples
Count the number of agents that are in proximity
and change to image 1
if there is at least one agent nearby.
>>> class MyAgent(Agent):
... def update(self):
... in_proximity = self.in_proximity_accuracy().count()
...
... if in_proximity >= 1:
... self.change_image(1)
... else:
... self.change_image(0)
Kill the first Human
agent that's in proximity.
>>> class Zombie(Agent):
... def update(self):
... human = (
... self.in_proximity_accuracy()
... .without_distance()
... .filter_kind(Human) # 👈 don't want to kill other zombies
... .first() # 👈 can return None if no humans are around
... )
...
... if human is not None:
... human.kill()
Calculate the average distance of agents that are in proximity.
>>> class Heimerdinger(Agent):
... def update(self):
... in_proximity = list(self.in_proximity_accuracy())
...
... dist_sum = sum(dist for agent, dist in in_proximity)
...
... dist_avg = (
... dist_sum / len(in_proximity)
... if len(in_proximity) > 0
... else 0
... )
426 def in_proximity_performance(self) -> ProximityIter[Self]: 427 """Retrieve other agents that are in the `vi.config.Schema.radius` of the current agent. 428 429 Unlike `in_proximity_accuracy`, this proximity method does not calculate the distances between agents. 430 Instead, it retrieves agents that are in the same chunk as the current agent, 431 irrespective of their position within the chunk. 432 433 If you find yourself limited by the performance of `in_proximity_accuracy`, 434 you can swap the function call for this one instead. 435 This performance method roughly doubles the frame rates of the simulation. 436 """ 437 438 return self.__simulation._proximity.in_proximity_performance(self)
Retrieve other agents that are in the vi.config.Schema.radius
of the current agent.
Unlike in_proximity_accuracy
, this proximity method does not calculate the distances between agents.
Instead, it retrieves agents that are in the same chunk as the current agent,
irrespective of their position within the chunk.
If you find yourself limited by the performance of in_proximity_accuracy
,
you can swap the function call for this one instead.
This performance method roughly doubles the frame rates of the simulation.
440 def on_site(self) -> bool: 441 """Check whether the agent is currently on a site. 442 443 Examples 444 -------- 445 446 Stop the agent's movement when it reaches a site (think of a nice beach). 447 448 >>> class TravellingAgent(Agent): 449 ... def update(self): 450 ... if self.on_site(): 451 ... # crave that star damage 452 ... self.freeze_movement() 453 """ 454 455 return self.on_site_id() is not None
Check whether the agent is currently on a site.
Examples
Stop the agent's movement when it reaches a site (think of a nice beach).
>>> class TravellingAgent(Agent):
... def update(self):
... if self.on_site():
... # crave that star damage
... self.freeze_movement()
457 def on_site_id(self) -> Optional[int]: 458 """Get the identifier of the site the agent is currently on. 459 460 Examples 461 -------- 462 463 Stop the agent's movement when it reaches a site to inspect. 464 In addition, the current site identifier is saved to the DataFrame. 465 466 ```python 467 class SiteInspector(Agent): 468 def update(self): 469 site_id = self.on_site_id() 470 471 # Save the site id to the DataFrame 472 self.save_data("site", site_id) 473 474 # bool(site_id) would be inaccurate 475 # as a site_id of 0 will return False. 476 # Therefore, we check whether it is not None instead. 477 if site_id is not None: 478 # Inspect the site 479 self.freeze_movement() 480 ``` 481 """ 482 483 site: Optional[_StaticSprite] = pg.sprite.spritecollideany(self, self._sites, pg.sprite.collide_mask) # type: ignore 484 485 if site is not None: 486 return site.id 487 else: 488 return None
Get the identifier of the site the agent is currently on.
Examples
Stop the agent's movement when it reaches a site to inspect. In addition, the current site identifier is saved to the DataFrame.
class SiteInspector(Agent):
def update(self):
site_id = self.on_site_id()
# Save the site id to the DataFrame
self.save_data("site", site_id)
# bool(site_id) would be inaccurate
# as a site_id of 0 will return False.
# Therefore, we check whether it is not None instead.
if site_id is not None:
# Inspect the site
self.freeze_movement()
490 def freeze_movement(self): 491 """Freeze the movement of the agent. The movement can be continued by calling `continue_movement`.""" 492 493 self._moving = False
Freeze the movement of the agent. The movement can be continued by calling continue_movement
.
495 def continue_movement(self): 496 """Continue the movement of the agent from before its movement was frozen.""" 497 498 self._moving = True
Continue the movement of the agent from before its movement was frozen.
500 def change_image(self, index: int): 501 """Change the image of the agent. 502 503 If you want to change the agent's image to the second image in the images list, 504 then you can change the image to index 1: 505 >>> self.change_image(1) 506 """ 507 508 self._image_index = index
Change the image of the agent.
If you want to change the agent's image to the second image in the images list, then you can change the image to index 1:
>>> self.change_image(1)
510 def save_data(self, column: str, value: Any): 511 """Add extra data to the simulation's metrics. 512 513 The following data is collected automatically: 514 - agent identifier 515 - current frame 516 - x and y coordinates 517 518 Examples 519 -------- 520 521 Saving the number of agents that are currently in proximity: 522 523 >>> class MyAgent(Agent): 524 ... def update(self): 525 ... in_proximity = self.in_proximity_accuracy().count() 526 ... self.save_data("in_proximity", in_proximity) 527 """ 528 529 self.__simulation._metrics._temporary_snapshots[column].append(value)
Add extra data to the simulation's metrics.
The following data is collected automatically:
- agent identifier
- current frame
- x and y coordinates
Examples
Saving the number of agents that are currently in proximity:
>>> class MyAgent(Agent):
... def update(self):
... in_proximity = self.in_proximity_accuracy().count()
... self.save_data("in_proximity", in_proximity)
566 def reproduce(self) -> Self: 567 """Create a new agent and spawn it into the simulation. 568 569 All values will be reset to their defaults, 570 except for the agent's position and movement vector. 571 These will be cloned from the original agent. 572 """ 573 574 return copy(self)
Create a new agent and spawn it into the simulation.
All values will be reset to their defaults, except for the agent's position and movement vector. These will be cloned from the original agent.
576 def kill(self): 577 """Kill the agent. 578 579 While violence usually isn't the right option, 580 sometimes you just want to murder some innocent agents inside your little computer. 581 582 But fear not! 583 By *killing* the agent, all you're really doing is removing it from the simulation. 584 """ 585 586 super().kill()
Kill the agent.
While violence usually isn't the right option, sometimes you just want to murder some innocent agents inside your little computer.
But fear not! By killing the agent, all you're really doing is removing it from the simulation.
588 def is_dead(self) -> bool: 589 """Is the agent dead? 590 591 Death occurs when `kill` is called. 592 """ 593 return not self.is_alive()
Is the agent dead?
Death occurs when kill
is called.
595 def is_alive(self) -> bool: 596 """Is the agent still alive? 597 598 Death occurs when `kill` is called. 599 """ 600 601 return super().alive()
Is the agent still alive?
Death occurs when kill
is called.
Inherited Members
- pygame.sprite.Sprite
- add
- remove
- add_internal
- remove_internal
- groups
- alive
- layer
379@deserialize 380@serialize 381@dataclass 382class Config(Schema[int, float]): 383 """The `Config` class allows you to tweak the settings of your experiment. 384 385 Examples 386 -------- 387 388 If you want to change the proximity `radius` of your agents, 389 you can create a new `Config` instance and pass a custom value for `radius`. 390 391 >>> from vi import Agent, Config, Simulation 392 >>> 393 >>> ( 394 ... # 👇 we override the default radius value 395 ... Simulation(Config(radius=50)) 396 ... .batch_spawn_agents(100, Agent, ["examples/images/white.png"]) 397 ... .run() 398 ... ) 399 400 To add your own values to `Config`, 401 you can simply inherit `Config`, decorate it with [`@dataclass`](https://docs.python.org/3/library/dataclasses.html) and add your own options. 402 However, make sure to declare the [type](https://mypy.readthedocs.io/en/stable/cheat_sheet_py3.html) of the configuration option 403 along with its default value. 404 405 >>> @dataclass 406 >>> class MyConfig(Config): 407 ... # 👇 type 408 ... excitement: int = 100 409 ... # 👆 default value 410 411 Last but not least, declare that your agent is using the `MyConfig` class 412 and pass it along to the constructor of `vi.simulation.Simulation`. 413 414 >>> class MyAgent(Agent): 415 ... config: MyConfig 416 >>> 417 >>> ( 418 ... # 👇 use our custom config 419 ... Simulation(MyConfig()) 420 ... .batch_spawn_agents(100, MyAgent, ["examples/images/white.png"]) 421 ... .run() 422 ... ) 423 """ 424 425 ...
The Config
class allows you to tweak the settings of your experiment.
Examples
If you want to change the proximity radius
of your agents,
you can create a new Config
instance and pass a custom value for radius
.
>>> from vi import Agent, Config, Simulation
>>>
>>> (
... # 👇 we override the default radius value
... Simulation(Config(radius=50))
... .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
... .run()
... )
To add your own values to Config
,
you can simply inherit Config
, decorate it with @dataclass
and add your own options.
However, make sure to declare the type of the configuration option
along with its default value.
>>> @dataclass
>>> class MyConfig(Config):
... # 👇 type
... excitement: int = 100
... # 👆 default value
Last but not least, declare that your agent is using the MyConfig
class
and pass it along to the constructor of vi.Simulation
.
>>> class MyAgent(Agent):
... config: MyConfig
>>>
>>> (
... # 👇 use our custom config
... Simulation(MyConfig())
... .batch_spawn_agents(100, MyAgent, ["examples/images/white.png"])
... .run()
... )
Inherited Members
- vi.config.Schema
- id
- duration
- fps_limit
- image_rotation
- movement_speed
- print_fps
- radius
- seed
- visualise_chunks
- window
- from_file
251@deserialize 252@serialize 253@dataclass 254class Matrix(Schema[list[int], list[float]]): 255 """`Matrix` is the `Config` class on steroids. 256 It allows you to supply a list of values on certain configuration options, 257 to automatically generate multiple unique `Config` instances. 258 259 Examples 260 -------- 261 262 Imagine that you want to research the effect of the `radius` parameter. 263 Instead of only testing the default value of 25 pixels, 264 you also want to test a radius of 10 and 50 pixels. 265 266 A brute-force approach would be to create three unique `Config` instances manually. 267 268 >>> config1 = Config(radius=10) 269 >>> config2 = Config(radius=25) 270 >>> config3 = Config(radius=50) 271 272 However, perhaps we also want to override some other default values, 273 such as adding a `duration` to the simulation. 274 If we follow the same approach, then our code becomes messy rather quickly. 275 276 >>> config1 = Config(radius=10, duration=60 * 10) 277 >>> config2 = Config(radius=25, duration=60 * 10) 278 >>> config3 = Config(radius=50, duration=60 * 10) 279 280 So what do we do? 281 282 We use the `Matrix` class! 😎 283 284 The `Matrix` class allows us to write multiple configurations as if we are writing one configuration. 285 If we want to test multiple values of `radius`, then we can simply supply a list of values. 286 287 >>> matrix = Matrix(duration=60 * 10, radius=[10, 25, 50]) 288 289 It's that easy! 290 Now, if we want to generate a `Config` for each of the values in the radius list, 291 we can call the `to_configs` method. 292 293 >>> configs = matrix.to_configs(Config) 294 295 The list of configs returned by the `to_configs` method is equivalent to the brute-force approach we took earlier. 296 However, by utilising `Matrix`, our code is way more compact and easier to read. 297 298 And the fun doesn't stop there, as we can supply lists to multiple config options as well! 299 Let's say that we not only want to test the effect of `radius`, but also the effect of `movement_speed`. 300 We can simply pass a list of values to `movement_speed` and `Matrix` will automatically compute 301 the unique `Config` combinations that it can make between the values of `radius` and `movement_speed`. 302 303 >>> matrix = Matrix( 304 ... duration=60 * 10, 305 ... radius=[10, 25, 50], 306 ... movement_speed=[0.5, 1.0], 307 ... ) 308 309 If we now check the number of configs generated, 310 we will see that the above matrix produces 6 unique combinations (3 x 2). 311 312 >>> len(matrix.to_configs(Config)) 313 6 314 315 `Matrix` is an essential tool for analysing the effect of your simulation's parameters. 316 It allows you to effortlessly create multiple configurations, while keeping your code tidy. 317 318 Now, before you create a for-loop and iterate over the list of configs, 319 allow me to introduce you to [multiprocessing](https://docs.python.org/3/library/multiprocessing.html). 320 This built-in Python library allows us to run multiple simulations in parallel. 321 322 As you might already know, your processor (or CPU) consists of multiple cores. 323 Parallelisation allows us to run one simulation on every core of your CPU. 324 So if you have a beefy 10-core CPU, you can run 10 simulations in the same time as running one simulation individually. 325 326 However, your GPU might not be able to keep up with rendering 10 simulations at once. 327 Therefore, it's best to switch to `vi.simulation.HeadlessSimulation` when running multiple simulations in parallel, 328 as this simulation class disables all the rendering-related logic. 329 Thus, removing the GPU from the equation. 330 331 To learn more about parallelisation, please check out the [multiprocessing documentation](https://docs.python.org/3/library/multiprocessing.html). 332 For Violet, the following code is all you need to get started with parallelisation. 333 334 >>> from multiprocessing import Pool 335 >>> from vi import Agent, Config, HeadlessSimulation, Matrix 336 >>> import polars as pl 337 >>> 338 >>> 339 >>> class ParallelAgent(Agent): 340 ... config: Config 341 ... 342 ... def update(self): 343 ... # We save the radius and seed config values to our DataFrame, 344 ... # so we can make comparisons between these config values later. 345 ... self.save_data("radius", self.config.radius) 346 ... self.save_data("seed", self.config.seed) 347 >>> 348 >>> 349 >>> def run_simulation(config: Config) -> pl.DataFrame: 350 ... return ( 351 ... HeadlessSimulation(config) 352 ... .batch_spawn_agents(100, ParallelAgent, ["examples/images/white.png"]) 353 ... .run() 354 ... .snapshots 355 ... ) 356 >>> 357 >>> 358 >>> if __name__ == "__main__": 359 ... # We create a threadpool to run our simulations in parallel 360 ... with Pool() as p: 361 ... # The matrix will create four unique configs 362 ... matrix = Matrix(radius=[25, 50], seed=[1, 2]) 363 ... 364 ... # Create unique combinations of matrix values 365 ... configs = matrix.to_configs(Config) 366 ... 367 ... # Combine our individual DataFrames into one big DataFrame 368 ... df = pl.concat(p.map(run_simulation, configs)) 369 ... 370 ... print(df) 371 """ 372 373 def to_configs(self, config: Type[T]) -> list[T]: 374 """Generate a config for every unique combination of values in the matrix.""" 375 376 return [config(**values) for values in _matrixify(vars(self))]
Matrix
is the Config
class on steroids.
It allows you to supply a list of values on certain configuration options,
to automatically generate multiple unique Config
instances.
Examples
Imagine that you want to research the effect of the radius
parameter.
Instead of only testing the default value of 25 pixels,
you also want to test a radius of 10 and 50 pixels.
A brute-force approach would be to create three unique Config
instances manually.
>>> config1 = Config(radius=10)
>>> config2 = Config(radius=25)
>>> config3 = Config(radius=50)
However, perhaps we also want to override some other default values,
such as adding a duration
to the simulation.
If we follow the same approach, then our code becomes messy rather quickly.
>>> config1 = Config(radius=10, duration=60 * 10)
>>> config2 = Config(radius=25, duration=60 * 10)
>>> config3 = Config(radius=50, duration=60 * 10)
So what do we do?
We use the Matrix
class! 😎
The Matrix
class allows us to write multiple configurations as if we are writing one configuration.
If we want to test multiple values of radius
, then we can simply supply a list of values.
>>> matrix = Matrix(duration=60 * 10, radius=[10, 25, 50])
It's that easy!
Now, if we want to generate a Config
for each of the values in the radius list,
we can call the to_configs
method.
>>> configs = matrix.to_configs(Config)
The list of configs returned by the to_configs
method is equivalent to the brute-force approach we took earlier.
However, by utilising Matrix
, our code is way more compact and easier to read.
And the fun doesn't stop there, as we can supply lists to multiple config options as well!
Let's say that we not only want to test the effect of radius
, but also the effect of movement_speed
.
We can simply pass a list of values to movement_speed
and Matrix
will automatically compute
the unique Config
combinations that it can make between the values of radius
and movement_speed
.
>>> matrix = Matrix(
... duration=60 * 10,
... radius=[10, 25, 50],
... movement_speed=[0.5, 1.0],
... )
If we now check the number of configs generated, we will see that the above matrix produces 6 unique combinations (3 x 2).
>>> len(matrix.to_configs(Config))
6
Matrix
is an essential tool for analysing the effect of your simulation's parameters.
It allows you to effortlessly create multiple configurations, while keeping your code tidy.
Now, before you create a for-loop and iterate over the list of configs, allow me to introduce you to multiprocessing. This built-in Python library allows us to run multiple simulations in parallel.
As you might already know, your processor (or CPU) consists of multiple cores. Parallelisation allows us to run one simulation on every core of your CPU. So if you have a beefy 10-core CPU, you can run 10 simulations in the same time as running one simulation individually.
However, your GPU might not be able to keep up with rendering 10 simulations at once.
Therefore, it's best to switch to vi.HeadlessSimulation
when running multiple simulations in parallel,
as this simulation class disables all the rendering-related logic.
Thus, removing the GPU from the equation.
To learn more about parallelisation, please check out the multiprocessing documentation. For Violet, the following code is all you need to get started with parallelisation.
>>> from multiprocessing import Pool
>>> from vi import Agent, Config, HeadlessSimulation, Matrix
>>> import polars as pl
>>>
>>>
>>> class ParallelAgent(Agent):
... config: Config
...
... def update(self):
... # We save the radius and seed config values to our DataFrame,
... # so we can make comparisons between these config values later.
... self.save_data("radius", self.config.radius)
... self.save_data("seed", self.config.seed)
>>>
>>>
>>> def run_simulation(config: Config) -> pl.DataFrame:
... return (
... HeadlessSimulation(config)
... .batch_spawn_agents(100, ParallelAgent, ["examples/images/white.png"])
... .run()
... .snapshots
... )
>>>
>>>
>>> if __name__ == "__main__":
... # We create a threadpool to run our simulations in parallel
... with Pool() as p:
... # The matrix will create four unique configs
... matrix = Matrix(radius=[25, 50], seed=[1, 2])
...
... # Create unique combinations of matrix values
... configs = matrix.to_configs(Config)
...
... # Combine our individual DataFrames into one big DataFrame
... df = pl.concat(p.map(run_simulation, configs))
...
... print(df)
373 def to_configs(self, config: Type[T]) -> list[T]: 374 """Generate a config for every unique combination of values in the matrix.""" 375 376 return [config(**values) for values in _matrixify(vars(self))]
Generate a config for every unique combination of values in the matrix.
Inherited Members
- vi.config.Schema
- id
- duration
- fps_limit
- image_rotation
- movement_speed
- print_fps
- radius
- seed
- visualise_chunks
- window
- from_file
70@deserialize 71@serialize 72@dataclass 73class Window: 74 """Settings related to the simulation window.""" 75 76 width: int = 750 77 """The width of the simulation window in pixels.""" 78 79 height: int = 750 80 """The height of the simulation window in pixels.""" 81 82 @classmethod 83 def square(cls, size: int): 84 return cls(width=size, height=size) 85 86 def as_tuple(self) -> tuple[int, int]: 87 return (self.width, self.height)
Settings related to the simulation window.
211class Metrics: 212 """A container hosting all the accumulated simulation data over time.""" 213 214 fps: Fps 215 """The frames-per-second history to analyse performance.""" 216 217 _temporary_snapshots: defaultdict[str, list[Any]] 218 219 snapshots: pl.DataFrame 220 """The [Polars DataFrame](https://pola-rs.github.io/polars-book/user-guide/quickstart/intro.html) containing the snapshot data of all agents over time.""" 221 222 def __init__(self): 223 self.fps = Fps() 224 self._temporary_snapshots = defaultdict(list) 225 self.snapshots = pl.DataFrame() 226 227 def _merge(self): 228 df = pl.from_dict(self._temporary_snapshots) 229 230 self.snapshots.vstack(df, in_place=True) 231 232 self._temporary_snapshots = defaultdict(list)
A container hosting all the accumulated simulation data over time.
The Polars DataFrame containing the snapshot data of all agents over time.
34class ProximityIter(Generic[T]): 35 """The `ProximityIter` is a small wrapper around a *generator* of agents that are in proximity. 36 37 Now, you've probably never heard of a generator before, so let me give you the TLDR. 38 A Python generator is basically a stream of values. In our case, agents! 39 40 By not adding the agents in a list or in a set, but keeping them in a stream, 41 we can add multiple filters while keeping amazing performance. 42 43 When we're done with the filtering, we can either `count` the agents in our stream 44 (thereby consuming the stream, it's single-use only) or we can collect them in a list or a set. 45 46 Examples 47 -------- 48 49 Imagine that our agent is Mudusa. We want to freeze the movement of all agents that we see. 50 51 We can implement this by simply looping over all the agents that are returned in the `ProximityIter` stream. 52 53 >>> class Medusa(Agent): 54 ... def update(self): 55 ... for agent, distance in self.in_proximity_accuracy(): 56 ... agent.freeze_movement() 57 58 Or perhaps we simply want to change colour if there are at least two other agents nearby. 59 60 >>> class Chameleon(Agent): 61 ... def update(self): 62 ... if self.in_proximity_accuracy().count() >= 2: 63 ... self.change_image(1) 64 ... else: 65 ... self.change_image(0) 66 67 In some cases, we want to loop over our stream of agents multiple times. 68 To make our stream reusable, we can add its agents to a list. 69 70 >>> class TheCollector(Agent): 71 ... def update(self): 72 ... collectables = list(self.self.in_proximity_accuracy()) 73 ... # do something with our collectables multiple times! 74 """ 75 76 _gen: Generator[T, None, None] 77 78 def __init__(self, gen: Generator[T, None, None]): 79 self._gen = gen 80 81 def __iter__(self): 82 return self._gen 83 84 def filter(self, predicate: Callable[[T], bool]) -> ProximityIter[T]: 85 """Filter the agents that are in proximity. 86 87 Example 88 ------- 89 90 Count the number of dead agents in proximity. 91 92 >>> zombies = ( 93 ... self.in_proximity_accuracy() 94 ... .without_distance() 95 ... .filter(lambda agent: agent.is_dead()) 96 ... .count() 97 ... ) 98 99 If you don't want to remove the distance, 100 you can also refer to agent as the first element of the tuple: 101 102 >>> zombies = ( 103 ... self.in_proximity_accuracy() 104 ... .filter(lambda x: x[0].is_dead()) 105 ... .count() 106 ... ) 107 """ 108 109 self._gen = (agent for agent in self if predicate(agent)) 110 return self 111 112 @overload 113 def filter_kind( 114 self: ProximityIter[tuple[AgentClass, float]], kind: Type[U] 115 ) -> ProximityIter[tuple[U, float]]: 116 ... 117 118 @overload 119 def filter_kind(self: ProximityIter[AgentClass], kind: Type[U]) -> ProximityIter[U]: 120 ... 121 122 def filter_kind( 123 self: Union[ProximityIter[tuple[AgentClass, float]], ProximityIter[AgentClass]], 124 kind: Type[U], 125 ) -> Union[ProximityIter[tuple[U, float]], ProximityIter[U]]: 126 """Filter the agents that are in proximity based on their class. 127 128 Examples 129 -------- 130 131 We don't want our Zombie to kill other zombies. 132 Just humans! 133 134 >>> class Human(Agent): ... 135 136 >>> class Zombie(Agent): 137 ... def update(self): 138 ... human = ( 139 ... self.in_proximity_accuracy() 140 ... .without_distance() 141 ... .filter_kind(Human) 142 ... .first() 143 ... ) 144 ... 145 ... if human is not None: 146 ... human.kill() 147 """ 148 149 def internal_generator() -> Generator[Union[tuple[U, float], U], None, None]: 150 for maybe_tuple in self: 151 if isinstance(maybe_tuple, tuple): 152 agent, dist = maybe_tuple 153 if isinstance(agent, kind): 154 yield (agent, dist) 155 elif isinstance(maybe_tuple, kind): 156 yield maybe_tuple 157 158 return ProximityIter(internal_generator()) # type: ignore 159 160 def without_distance(self: ProximityIter[tuple[U, float]]) -> ProximityIter[U]: 161 """Remove the distance from the results. 162 163 If you call `vi.agent.Agent.in_proximity_accuracy`, 164 agents are returned along with their measured distance. 165 However, perhaps you're not interested in the distance. 166 167 Note that `vi.agent.Agent.in_proximity_performance` does not return the distance. 168 So you cannot call this function on the performance method. 169 170 Example 171 ------- 172 173 By default, the `vi.agent.Agent.in_proximity_accuracy` method returns a stream 174 of agent-distance pairs. 175 176 >>> for agent, distance in self.in_proximity_accuracy(): 177 ... # Do things with both agent and distance 178 179 When you use `without_distance`, the distance can no longer be accessed. 180 181 >>> for agent in self.in_proximity_accuracy().without_distance(): 182 ... # Do things with agent directly 183 """ 184 185 return ProximityIter(agent for agent, _ in self) 186 187 def first(self) -> Optional[T]: 188 """Retrieve the first agent that's in proximity. 189 190 If there are no agents in proximity, `None` is returned instead. 191 192 Examples 193 -------- 194 195 Want to kill the first agent you see every frame? 196 197 >>> other_agent = self.in_proximity_accuracy().without_distance().first() 198 >>> if other_agent is not None: 199 ... other_agent.kill() 200 201 If you don't call `without_distance`, then you cannot unpack the tuple directly, 202 as it could potentially be None. 203 E.g. the following code would result in a crash: 204 205 >>> agent, distance = self.in_proximity_accuracy().first() 206 207 Therefore, you should unpack the tuple after checking whether it is not None: 208 209 >>> maybe_agent = self.in_proximity_accuracy().first() 210 >>> if maybe_agent is not None: 211 ... agent, distance = maybe_agent 212 ... agent.kill() 213 """ 214 215 return next(self._gen, None) 216 217 def collect_set(self) -> set[T]: 218 """Transform the generator into a set of agents that are in proximity. 219 220 This is the same as wrapping the stream in a `set`. 221 222 >>> nearby_agents = set(self.in_proximity_accuracy()) 223 224 >>> nearby_agents = self.in_proximity_accuracy().collect_set() 225 """ 226 227 return set(self._gen) 228 229 def count(self) -> int: 230 """Count the number of agents that are in proximity. 231 232 Example 233 ------- 234 235 >>> in_proximity = self.in_proximity_accuracy().count() 236 """ 237 238 count = 0 239 for _ in self._gen: 240 count += 1 241 242 return count
The ProximityIter
is a small wrapper around a generator of agents that are in proximity.
Now, you've probably never heard of a generator before, so let me give you the TLDR. A Python generator is basically a stream of values. In our case, agents!
By not adding the agents in a list or in a set, but keeping them in a stream, we can add multiple filters while keeping amazing performance.
When we're done with the filtering, we can either count
the agents in our stream
(thereby consuming the stream, it's single-use only) or we can collect them in a list or a set.
Examples
Imagine that our agent is Mudusa. We want to freeze the movement of all agents that we see.
We can implement this by simply looping over all the agents that are returned in the ProximityIter
stream.
>>> class Medusa(Agent):
... def update(self):
... for agent, distance in self.in_proximity_accuracy():
... agent.freeze_movement()
Or perhaps we simply want to change colour if there are at least two other agents nearby.
>>> class Chameleon(Agent):
... def update(self):
... if self.in_proximity_accuracy().count() >= 2:
... self.change_image(1)
... else:
... self.change_image(0)
In some cases, we want to loop over our stream of agents multiple times. To make our stream reusable, we can add its agents to a list.
>>> class TheCollector(Agent):
... def update(self):
... collectables = list(self.self.in_proximity_accuracy())
... # do something with our collectables multiple times!
84 def filter(self, predicate: Callable[[T], bool]) -> ProximityIter[T]: 85 """Filter the agents that are in proximity. 86 87 Example 88 ------- 89 90 Count the number of dead agents in proximity. 91 92 >>> zombies = ( 93 ... self.in_proximity_accuracy() 94 ... .without_distance() 95 ... .filter(lambda agent: agent.is_dead()) 96 ... .count() 97 ... ) 98 99 If you don't want to remove the distance, 100 you can also refer to agent as the first element of the tuple: 101 102 >>> zombies = ( 103 ... self.in_proximity_accuracy() 104 ... .filter(lambda x: x[0].is_dead()) 105 ... .count() 106 ... ) 107 """ 108 109 self._gen = (agent for agent in self if predicate(agent)) 110 return self
Filter the agents that are in proximity.
Example
Count the number of dead agents in proximity.
>>> zombies = (
... self.in_proximity_accuracy()
... .without_distance()
... .filter(lambda agent: agent.is_dead())
... .count()
... )
If you don't want to remove the distance, you can also refer to agent as the first element of the tuple:
>>> zombies = (
... self.in_proximity_accuracy()
... .filter(lambda x: x[0].is_dead())
... .count()
... )
122 def filter_kind( 123 self: Union[ProximityIter[tuple[AgentClass, float]], ProximityIter[AgentClass]], 124 kind: Type[U], 125 ) -> Union[ProximityIter[tuple[U, float]], ProximityIter[U]]: 126 """Filter the agents that are in proximity based on their class. 127 128 Examples 129 -------- 130 131 We don't want our Zombie to kill other zombies. 132 Just humans! 133 134 >>> class Human(Agent): ... 135 136 >>> class Zombie(Agent): 137 ... def update(self): 138 ... human = ( 139 ... self.in_proximity_accuracy() 140 ... .without_distance() 141 ... .filter_kind(Human) 142 ... .first() 143 ... ) 144 ... 145 ... if human is not None: 146 ... human.kill() 147 """ 148 149 def internal_generator() -> Generator[Union[tuple[U, float], U], None, None]: 150 for maybe_tuple in self: 151 if isinstance(maybe_tuple, tuple): 152 agent, dist = maybe_tuple 153 if isinstance(agent, kind): 154 yield (agent, dist) 155 elif isinstance(maybe_tuple, kind): 156 yield maybe_tuple 157 158 return ProximityIter(internal_generator()) # type: ignore
Filter the agents that are in proximity based on their class.
Examples
We don't want our Zombie to kill other zombies. Just humans!
>>> class Human(Agent): ...
>>> class Zombie(Agent):
... def update(self):
... human = (
... self.in_proximity_accuracy()
... .without_distance()
... .filter_kind(Human)
... .first()
... )
...
... if human is not None:
... human.kill()
160 def without_distance(self: ProximityIter[tuple[U, float]]) -> ProximityIter[U]: 161 """Remove the distance from the results. 162 163 If you call `vi.agent.Agent.in_proximity_accuracy`, 164 agents are returned along with their measured distance. 165 However, perhaps you're not interested in the distance. 166 167 Note that `vi.agent.Agent.in_proximity_performance` does not return the distance. 168 So you cannot call this function on the performance method. 169 170 Example 171 ------- 172 173 By default, the `vi.agent.Agent.in_proximity_accuracy` method returns a stream 174 of agent-distance pairs. 175 176 >>> for agent, distance in self.in_proximity_accuracy(): 177 ... # Do things with both agent and distance 178 179 When you use `without_distance`, the distance can no longer be accessed. 180 181 >>> for agent in self.in_proximity_accuracy().without_distance(): 182 ... # Do things with agent directly 183 """ 184 185 return ProximityIter(agent for agent, _ in self)
Remove the distance from the results.
If you call vi.Agent.in_proximity_accuracy
,
agents are returned along with their measured distance.
However, perhaps you're not interested in the distance.
Note that vi.Agent.in_proximity_performance
does not return the distance.
So you cannot call this function on the performance method.
Example
By default, the vi.Agent.in_proximity_accuracy
method returns a stream
of agent-distance pairs.
>>> for agent, distance in self.in_proximity_accuracy():
... # Do things with both agent and distance
When you use without_distance
, the distance can no longer be accessed.
>>> for agent in self.in_proximity_accuracy().without_distance():
... # Do things with agent directly
187 def first(self) -> Optional[T]: 188 """Retrieve the first agent that's in proximity. 189 190 If there are no agents in proximity, `None` is returned instead. 191 192 Examples 193 -------- 194 195 Want to kill the first agent you see every frame? 196 197 >>> other_agent = self.in_proximity_accuracy().without_distance().first() 198 >>> if other_agent is not None: 199 ... other_agent.kill() 200 201 If you don't call `without_distance`, then you cannot unpack the tuple directly, 202 as it could potentially be None. 203 E.g. the following code would result in a crash: 204 205 >>> agent, distance = self.in_proximity_accuracy().first() 206 207 Therefore, you should unpack the tuple after checking whether it is not None: 208 209 >>> maybe_agent = self.in_proximity_accuracy().first() 210 >>> if maybe_agent is not None: 211 ... agent, distance = maybe_agent 212 ... agent.kill() 213 """ 214 215 return next(self._gen, None)
Retrieve the first agent that's in proximity.
If there are no agents in proximity, None
is returned instead.
Examples
Want to kill the first agent you see every frame?
>>> other_agent = self.in_proximity_accuracy().without_distance().first()
>>> if other_agent is not None:
... other_agent.kill()
If you don't call without_distance
, then you cannot unpack the tuple directly,
as it could potentially be None.
E.g. the following code would result in a crash:
>>> agent, distance = self.in_proximity_accuracy().first()
Therefore, you should unpack the tuple after checking whether it is not None:
>>> maybe_agent = self.in_proximity_accuracy().first()
>>> if maybe_agent is not None:
... agent, distance = maybe_agent
... agent.kill()
217 def collect_set(self) -> set[T]: 218 """Transform the generator into a set of agents that are in proximity. 219 220 This is the same as wrapping the stream in a `set`. 221 222 >>> nearby_agents = set(self.in_proximity_accuracy()) 223 224 >>> nearby_agents = self.in_proximity_accuracy().collect_set() 225 """ 226 227 return set(self._gen)
Transform the generator into a set of agents that are in proximity.
This is the same as wrapping the stream in a set
.
>>> nearby_agents = set(self.in_proximity_accuracy())
>>> nearby_agents = self.in_proximity_accuracy().collect_set()
229 def count(self) -> int: 230 """Count the number of agents that are in proximity. 231 232 Example 233 ------- 234 235 >>> in_proximity = self.in_proximity_accuracy().count() 236 """ 237 238 count = 0 239 for _ in self._gen: 240 count += 1 241 242 return count
Count the number of agents that are in proximity.
Example
>>> in_proximity = self.in_proximity_accuracy().count()
24class TimeMachine: 25 images: list[pg.surface.Surface] 26 window: Window 27 28 history: Series 29 index: int = 0 30 31 background: pg.surface.Surface 32 clock: pg.time.Clock 33 screen: pg.surface.Surface 34 35 running: bool = False 36 37 def __init__( 38 self, 39 history: DataFrame, 40 image_paths: list[str], 41 window: Optional[Window] = None, 42 ): 43 pg.display.init() 44 45 # Convert multiple series (one per column) into one series of structs 46 self.history = history.to_struct("agent") 47 48 self.window = window if window is not None else Window() 49 self.screen = pg.display.set_mode(self.window.as_tuple()) 50 pg.display.set_caption("Violet") 51 52 # Load the images 53 self.images = load_images(image_paths) 54 55 # Initialise background 56 self.background = pg.surface.Surface(self.screen.get_size()).convert() 57 self.background.fill((0, 0, 0)) 58 59 # Initialise the clock. Used to cap FPS. 60 self.clock = pg.time.Clock() 61 62 def tick(self): 63 for event in pg.event.get(): 64 if event.type == pg.QUIT: 65 self.running = False 66 return 67 68 self.screen.blit(self.background, (0, 0)) 69 70 if self.index == len(self.history): 71 self.running = False 72 return 73 74 current_frame: int = self.history[self.index]["frame"] 75 76 while True: 77 if self.index == len(self.history): 78 self.running = False 79 break 80 81 data: dict[str, Any] = self.history[self.index] 82 if data["frame"] != current_frame: 83 break 84 85 image_index = data["image_index"] 86 image = self.images[image_index] 87 88 angle = data.get("angle") 89 if angle is not None: 90 image = pg.transform.rotate(image, angle) 91 92 rect = image.get_rect() 93 rect.center = (data["x"], data["y"]) 94 95 self.screen.blit(image, rect) 96 self.index += 1 97 98 pg.display.flip() 99 self.clock.tick(60) 100 101 def run(self): 102 self.running = True 103 while self.running: 104 self.tick() 105 106 pg.quit()
37 def __init__( 38 self, 39 history: DataFrame, 40 image_paths: list[str], 41 window: Optional[Window] = None, 42 ): 43 pg.display.init() 44 45 # Convert multiple series (one per column) into one series of structs 46 self.history = history.to_struct("agent") 47 48 self.window = window if window is not None else Window() 49 self.screen = pg.display.set_mode(self.window.as_tuple()) 50 pg.display.set_caption("Violet") 51 52 # Load the images 53 self.images = load_images(image_paths) 54 55 # Initialise background 56 self.background = pg.surface.Surface(self.screen.get_size()).convert() 57 self.background.fill((0, 0, 0)) 58 59 # Initialise the clock. Used to cap FPS. 60 self.clock = pg.time.Clock()
62 def tick(self): 63 for event in pg.event.get(): 64 if event.type == pg.QUIT: 65 self.running = False 66 return 67 68 self.screen.blit(self.background, (0, 0)) 69 70 if self.index == len(self.history): 71 self.running = False 72 return 73 74 current_frame: int = self.history[self.index]["frame"] 75 76 while True: 77 if self.index == len(self.history): 78 self.running = False 79 break 80 81 data: dict[str, Any] = self.history[self.index] 82 if data["frame"] != current_frame: 83 break 84 85 image_index = data["image_index"] 86 image = self.images[image_index] 87 88 angle = data.get("angle") 89 if angle is not None: 90 image = pg.transform.rotate(image, angle) 91 92 rect = image.get_rect() 93 rect.center = (data["x"], data["y"]) 94 95 self.screen.blit(image, rect) 96 self.index += 1 97 98 pg.display.flip() 99 self.clock.tick(60)
86class HeadlessSimulation: 87 """The Headless Mode equivalent of `Simulation`. 88 89 Headless Mode removes all the rendering logic from the simulation 90 to not only remove the annoying simulation window from popping up every time, 91 but to also speed up your simulation when it's GPU bound. 92 93 Combining Headless Mode with `vi.config.Matrix` and Python's [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) opens a realm of new possibilities. 94 Vi's Matrix is `vi.config.Config` on steroids. 95 It allows you to pass lists of values instead of single values on supported parameters, 96 to then effortlessly combine each unique combination of values into its own `vi.config.Config`. 97 When combined with [multiprocessing](https://docs.python.org/3/library/multiprocessing.html), 98 we can run multiple configs in parallel. 99 100 >>> from multiprocessing import Pool 101 >>> from vi import Agent, Config, HeadlessSimulation, Matrix 102 >>> import polars as pl 103 >>> 104 >>> 105 >>> def run_simulation(config: Config) -> pl.DataFrame: 106 ... return ( 107 ... HeadlessSimulation(config) 108 ... .batch_spawn_agents(100, Agent, ["examples/images/white.png"]) 109 ... .run() 110 ... .snapshots 111 ... ) 112 >>> 113 >>> 114 >>> if __name__ == "__main__": 115 ... # We create a threadpool to run our simulations in parallel 116 ... with Pool() as p: 117 ... # The matrix will create four unique configs 118 ... matrix = Matrix(radius=[25, 50], seed=[1, 2]) 119 ... 120 ... # Create unique combinations of matrix values 121 ... configs = matrix.to_configs(Config) 122 ... 123 ... # Combine our individual DataFrames into one big DataFrame 124 ... df = pl.concat(p.map(run_simulation, configs)) 125 ... 126 ... print(df) 127 """ 128 129 shared: Shared 130 """Attributes that are shared between the simulation and all agents.""" 131 132 _running: bool = False 133 """The simulation keeps running as long as running is True.""" 134 135 _area: pg.rect.Rect 136 137 # Sprite Groups 138 _all: pg.sprite.Group 139 _agents: pg.sprite.Group 140 _obstacles: pg.sprite.Group 141 _sites: pg.sprite.Group 142 143 _next_agent_id: int = 0 144 """The agent identifier to be given next.""" 145 146 _next_obstacle_id: int = 0 147 """The obstacle identifier to be given next.""" 148 149 _next_site_id: int = 0 150 """The site identifier to be given next.""" 151 152 # Proximity 153 _proximity: ProximityEngine 154 155 # Config that's passed on to agents as well 156 config: Config 157 """The config of the simulation that's shared with all agents. 158 159 The config can be overriden when inheriting the Simulation class. 160 However, the config must always: 161 162 1. Inherit `Config` 163 2. Be decorated by `@serde` 164 """ 165 166 _metrics: Metrics 167 """A collection of all the Snapshots that have been created in the simulation. 168 169 Each agent produces a Snapshot at every frame in the simulation. 170 """ 171 172 def __init__(self, config: Optional[Config] = None): 173 self.config = config if config else Config() 174 self._metrics = Metrics() 175 176 # Initiate the seed as early as possible. 177 random.seed(self.config.seed) 178 179 # Using a custom generator for agent movement 180 prng_move = random.Random() 181 prng_move.seed(self.config.seed) 182 183 self.shared = Shared(prng_move=prng_move) 184 185 width, height = self.config.window.as_tuple() 186 self._area = pg.rect.Rect(0, 0, width, height) 187 188 # Create sprite groups 189 self._all = pg.sprite.Group() 190 self._agents = pg.sprite.Group() 191 self._obstacles = pg.sprite.Group() 192 self._sites = pg.sprite.Group() 193 194 # Proximity! 195 self._proximity = ProximityEngine(self._agents, self.config.radius) 196 197 def batch_spawn_agents( 198 self, 199 count: int, 200 agent_class: Type[AgentClass], 201 images: list[str], 202 ) -> Self: 203 """Spawn multiple agents into the simulation. 204 205 Examples 206 -------- 207 208 Spawn 100 `vi.agent.Agent`'s into the simulation with `examples/images/white.png` as image. 209 210 >>> ( 211 ... Simulation() 212 ... .batch_spawn_agents(100, Agent, ["examples/images/white.png"]) 213 ... .run() 214 ... ) 215 """ 216 217 # Load images once so the files don't have to be read multiple times. 218 loaded_images = self._load_images(images) 219 220 for _ in range(count): 221 agent_class(images=loaded_images, simulation=self) 222 223 return self 224 225 def spawn_agent( 226 self, 227 agent_class: Type[AgentClass], 228 images: list[str], 229 ) -> Self: 230 """Spawn one agent into the simulation. 231 232 While you can run `spawn_agent` in a for-loop, 233 you probably want to call `batch_spawn_agents` instead 234 as `batch_spawn_agents` optimises the image loading process. 235 236 Examples 237 -------- 238 239 Spawn a single `vi.agent.Agent` into the simulation with `examples/images/white.png` as image: 240 241 >>> ( 242 ... Simulation() 243 ... .spawn_agent(Agent, ["examples/images/white.png"]) 244 ... .run() 245 ... ) 246 """ 247 248 agent_class(images=self._load_images(images), simulation=self) 249 250 return self 251 252 def spawn_obstacle(self, image_path: str, x: int, y: int) -> Self: 253 """Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle. 254 255 When agents collide with an obstacle, they will make a 180 degree turn. 256 257 Examples 258 -------- 259 260 Spawn a single obstacle into the simulation with `examples/images/bubble-full.png` as image. 261 In addition, we place the obstacle in the centre of our window. 262 263 >>> config = Config() 264 >>> x, y = config.window.as_tuple() 265 >>> ( 266 ... Simulation(config) 267 ... .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2) 268 ... .run() 269 ... ) 270 """ 271 272 _StaticSprite( 273 containers=[self._all, self._obstacles], 274 id=self._obstacle_id(), 275 image=self._load_image(image_path), 276 pos=Vector2((x, y)), 277 ) 278 279 return self 280 281 def spawn_site(self, image_path: str, x: int, y: int) -> Self: 282 """Spawn one site into the simulation. The given coordinates will be the centre of the site. 283 284 Examples 285 -------- 286 287 Spawn a single site into the simulation with `examples/images/site.png` as image. 288 In addition, we give specific coordinates where the site should be placed. 289 290 >>> ( 291 ... Simulation(config) 292 ... .spawn_site("examples/images/site.png", x=375, y=375) 293 ... .run() 294 ... ) 295 """ 296 297 _StaticSprite( 298 containers=[self._all, self._sites], 299 id=self._site_id(), 300 image=self._load_image(image_path), 301 pos=Vector2((x, y)), 302 ) 303 304 return self 305 306 def run(self) -> Metrics: 307 """Run the simulation until it's ended by closing the window or when the `vi.config.Schema.duration` has elapsed.""" 308 309 self._running = True 310 311 while self._running: 312 self.tick() 313 314 return self._metrics 315 316 def before_update(self): 317 """Run any code before the agents are updated in every tick. 318 319 You should override this method when inheriting Simulation to add your own logic. 320 321 Some examples include: 322 - Processing events from PyGame's event queue. 323 """ 324 325 ... 326 327 def after_update(self): 328 ... 329 330 def tick(self): 331 """Advance the simulation with one tick.""" 332 333 self.before_update() 334 335 # Update the position of all agents 336 self.__update_positions() 337 338 # If the radius was changed by an event, 339 # also update the radius in the proximity engine 340 self._proximity._set_radius(self.config.radius) 341 342 # Calculate proximity chunks 343 self._proximity.update() 344 345 # Save the replay data of all agents 346 self.__collect_replay_data() 347 348 # Update all agents 349 self._all.update() 350 351 # Merge the collected snapshots into the dataframe. 352 self._metrics._merge() 353 354 self.after_update() 355 356 # If we've reached the duration of the simulation, then stop the simulation. 357 if self.config.duration > 0 and self.shared.counter == self.config.duration: 358 self.stop() 359 return 360 361 self.shared.counter += 1 362 363 def stop(self): 364 """Stop the simulation. 365 366 The simulation isn't stopped directly. 367 Instead, the current tick is completed, after which the simulation will end. 368 """ 369 370 self._running = False 371 372 def __collect_replay_data(self): 373 """Collect the replay data for all agents.""" 374 375 for sprite in self._agents: 376 agent: Agent = sprite # type: ignore 377 agent._collect_replay_data() 378 379 def __update_positions(self): 380 """Update the position of all agents.""" 381 382 for sprite in self._agents.sprites(): 383 agent: Agent = sprite # type: ignore 384 agent.change_position() 385 386 def _load_image(self, path: str) -> pg.surface.Surface: 387 return pg.image.load(path) 388 389 def _load_images(self, images: list[str]) -> list[pg.surface.Surface]: 390 return [self._load_image(path) for path in images] 391 392 def _agent_id(self) -> int: 393 agent_id = self._next_agent_id 394 self._next_agent_id += 1 395 396 return agent_id 397 398 def _obstacle_id(self) -> int: 399 obstacle_id = self._next_obstacle_id 400 self._next_obstacle_id += 1 401 402 return obstacle_id 403 404 def _site_id(self) -> int: 405 site_id = self._next_site_id 406 self._next_site_id += 1 407 408 return site_id
The Headless Mode equivalent of Simulation
.
Headless Mode removes all the rendering logic from the simulation to not only remove the annoying simulation window from popping up every time, but to also speed up your simulation when it's GPU bound.
Combining Headless Mode with vi.Matrix
and Python's multiprocessing opens a realm of new possibilities.
Vi's Matrix is vi.Config
on steroids.
It allows you to pass lists of values instead of single values on supported parameters,
to then effortlessly combine each unique combination of values into its own vi.Config
.
When combined with multiprocessing,
we can run multiple configs in parallel.
>>> from multiprocessing import Pool
>>> from vi import Agent, Config, HeadlessSimulation, Matrix
>>> import polars as pl
>>>
>>>
>>> def run_simulation(config: Config) -> pl.DataFrame:
... return (
... HeadlessSimulation(config)
... .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
... .run()
... .snapshots
... )
>>>
>>>
>>> if __name__ == "__main__":
... # We create a threadpool to run our simulations in parallel
... with Pool() as p:
... # The matrix will create four unique configs
... matrix = Matrix(radius=[25, 50], seed=[1, 2])
...
... # Create unique combinations of matrix values
... configs = matrix.to_configs(Config)
...
... # Combine our individual DataFrames into one big DataFrame
... df = pl.concat(p.map(run_simulation, configs))
...
... print(df)
172 def __init__(self, config: Optional[Config] = None): 173 self.config = config if config else Config() 174 self._metrics = Metrics() 175 176 # Initiate the seed as early as possible. 177 random.seed(self.config.seed) 178 179 # Using a custom generator for agent movement 180 prng_move = random.Random() 181 prng_move.seed(self.config.seed) 182 183 self.shared = Shared(prng_move=prng_move) 184 185 width, height = self.config.window.as_tuple() 186 self._area = pg.rect.Rect(0, 0, width, height) 187 188 # Create sprite groups 189 self._all = pg.sprite.Group() 190 self._agents = pg.sprite.Group() 191 self._obstacles = pg.sprite.Group() 192 self._sites = pg.sprite.Group() 193 194 # Proximity! 195 self._proximity = ProximityEngine(self._agents, self.config.radius)
The config of the simulation that's shared with all agents.
The config can be overriden when inheriting the Simulation class. However, the config must always:
- Inherit
Config
- Be decorated by
@serde
197 def batch_spawn_agents( 198 self, 199 count: int, 200 agent_class: Type[AgentClass], 201 images: list[str], 202 ) -> Self: 203 """Spawn multiple agents into the simulation. 204 205 Examples 206 -------- 207 208 Spawn 100 `vi.agent.Agent`'s into the simulation with `examples/images/white.png` as image. 209 210 >>> ( 211 ... Simulation() 212 ... .batch_spawn_agents(100, Agent, ["examples/images/white.png"]) 213 ... .run() 214 ... ) 215 """ 216 217 # Load images once so the files don't have to be read multiple times. 218 loaded_images = self._load_images(images) 219 220 for _ in range(count): 221 agent_class(images=loaded_images, simulation=self) 222 223 return self
Spawn multiple agents into the simulation.
Examples
Spawn 100 vi.Agent
's into the simulation with examples/images/white.png
as image.
>>> (
... Simulation()
... .batch_spawn_agents(100, Agent, ["examples/images/white.png"])
... .run()
... )
225 def spawn_agent( 226 self, 227 agent_class: Type[AgentClass], 228 images: list[str], 229 ) -> Self: 230 """Spawn one agent into the simulation. 231 232 While you can run `spawn_agent` in a for-loop, 233 you probably want to call `batch_spawn_agents` instead 234 as `batch_spawn_agents` optimises the image loading process. 235 236 Examples 237 -------- 238 239 Spawn a single `vi.agent.Agent` into the simulation with `examples/images/white.png` as image: 240 241 >>> ( 242 ... Simulation() 243 ... .spawn_agent(Agent, ["examples/images/white.png"]) 244 ... .run() 245 ... ) 246 """ 247 248 agent_class(images=self._load_images(images), simulation=self) 249 250 return self
Spawn one agent into the simulation.
While you can run spawn_agent
in a for-loop,
you probably want to call batch_spawn_agents
instead
as batch_spawn_agents
optimises the image loading process.
Examples
Spawn a single vi.Agent
into the simulation with examples/images/white.png
as image:
>>> (
... Simulation()
... .spawn_agent(Agent, ["examples/images/white.png"])
... .run()
... )
252 def spawn_obstacle(self, image_path: str, x: int, y: int) -> Self: 253 """Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle. 254 255 When agents collide with an obstacle, they will make a 180 degree turn. 256 257 Examples 258 -------- 259 260 Spawn a single obstacle into the simulation with `examples/images/bubble-full.png` as image. 261 In addition, we place the obstacle in the centre of our window. 262 263 >>> config = Config() 264 >>> x, y = config.window.as_tuple() 265 >>> ( 266 ... Simulation(config) 267 ... .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2) 268 ... .run() 269 ... ) 270 """ 271 272 _StaticSprite( 273 containers=[self._all, self._obstacles], 274 id=self._obstacle_id(), 275 image=self._load_image(image_path), 276 pos=Vector2((x, y)), 277 ) 278 279 return self
Spawn one obstacle into the simulation. The given coordinates will be the centre of the obstacle.
When agents collide with an obstacle, they will make a 180 degree turn.
Examples
Spawn a single obstacle into the simulation with examples/images/bubble-full.png
as image.
In addition, we place the obstacle in the centre of our window.
>>> config = Config()
>>> x, y = config.window.as_tuple()
>>> (
... Simulation(config)
... .spawn_obstacle("examples/images/bubble-full.png", x // 2, y // 2)
... .run()
... )
281 def spawn_site(self, image_path: str, x: int, y: int) -> Self: 282 """Spawn one site into the simulation. The given coordinates will be the centre of the site. 283 284 Examples 285 -------- 286 287 Spawn a single site into the simulation with `examples/images/site.png` as image. 288 In addition, we give specific coordinates where the site should be placed. 289 290 >>> ( 291 ... Simulation(config) 292 ... .spawn_site("examples/images/site.png", x=375, y=375) 293 ... .run() 294 ... ) 295 """ 296 297 _StaticSprite( 298 containers=[self._all, self._sites], 299 id=self._site_id(), 300 image=self._load_image(image_path), 301 pos=Vector2((x, y)), 302 ) 303 304 return self
Spawn one site into the simulation. The given coordinates will be the centre of the site.
Examples
Spawn a single site into the simulation with examples/images/site.png
as image.
In addition, we give specific coordinates where the site should be placed.
>>> (
... Simulation(config)
... .spawn_site("examples/images/site.png", x=375, y=375)
... .run()
... )
306 def run(self) -> Metrics: 307 """Run the simulation until it's ended by closing the window or when the `vi.config.Schema.duration` has elapsed.""" 308 309 self._running = True 310 311 while self._running: 312 self.tick() 313 314 return self._metrics
Run the simulation until it's ended by closing the window or when the vi.config.Schema.duration
has elapsed.
316 def before_update(self): 317 """Run any code before the agents are updated in every tick. 318 319 You should override this method when inheriting Simulation to add your own logic. 320 321 Some examples include: 322 - Processing events from PyGame's event queue. 323 """ 324 325 ...
Run any code before the agents are updated in every tick.
You should override this method when inheriting Simulation to add your own logic.
Some examples include:
- Processing events from PyGame's event queue.
330 def tick(self): 331 """Advance the simulation with one tick.""" 332 333 self.before_update() 334 335 # Update the position of all agents 336 self.__update_positions() 337 338 # If the radius was changed by an event, 339 # also update the radius in the proximity engine 340 self._proximity._set_radius(self.config.radius) 341 342 # Calculate proximity chunks 343 self._proximity.update() 344 345 # Save the replay data of all agents 346 self.__collect_replay_data() 347 348 # Update all agents 349 self._all.update() 350 351 # Merge the collected snapshots into the dataframe. 352 self._metrics._merge() 353 354 self.after_update() 355 356 # If we've reached the duration of the simulation, then stop the simulation. 357 if self.config.duration > 0 and self.shared.counter == self.config.duration: 358 self.stop() 359 return 360 361 self.shared.counter += 1
Advance the simulation with one tick.
363 def stop(self): 364 """Stop the simulation. 365 366 The simulation isn't stopped directly. 367 Instead, the current tick is completed, after which the simulation will end. 368 """ 369 370 self._running = False
Stop the simulation.
The simulation isn't stopped directly. Instead, the current tick is completed, after which the simulation will end.
411class Simulation(HeadlessSimulation): 412 """ 413 This class offers the same functionality as `HeadlessSimulation`, 414 but adds logic to automatically draw all agents, obstacles and sites to your screen. 415 416 If a custom config isn't provided when creating the simulation, the default values of `Config` will be used instead. 417 """ 418 419 _background: pg.surface.Surface 420 _clock: pg.time.Clock 421 _screen: pg.surface.Surface 422 423 def __init__(self, config: Optional[Config] = None): 424 super().__init__(config) 425 426 pg.display.init() 427 pg.display.set_caption("Violet") 428 429 size = self.config.window.as_tuple() 430 self._screen = pg.display.set_mode(size) 431 432 # Initialise background 433 self._background = pg.surface.Surface(size).convert() 434 self._background.fill((0, 0, 0)) 435 436 # Show background immediately (before spawning agents) 437 self._screen.blit(self._background, (0, 0)) 438 pg.display.flip() 439 440 # Initialise the clock. Used to cap FPS. 441 self._clock = pg.time.Clock() 442 443 def before_update(self): 444 rebound = [] 445 for event in pg.event.get(eventtype=[pg.QUIT, pg.KEYDOWN]): 446 if event.type == pg.QUIT: 447 self.stop() 448 elif event.type == pg.KEYDOWN: 449 if event.key == pg.K_HOME: 450 self.config.radius += 1 451 elif event.key == pg.K_END: 452 self.config.radius -= 1 453 else: 454 # If a different key was pressed, then we want to re-emit the vent 455 # so other code can handle it. 456 rebound.append(event) 457 458 for event in rebound: 459 pg.event.post(event) 460 461 # Clear the screen before the update so agents can draw stuff themselves too! 462 self._all.clear(self._screen, self._background) 463 self._screen.blit(self._background, (0, 0)) 464 465 def after_update(self): 466 # Draw everything to the screen 467 self._all.draw(self._screen) 468 469 if self.config.visualise_chunks: 470 self.__visualise_chunks() 471 472 # Update the screen with the new image 473 pg.display.flip() 474 475 self._clock.tick(self.config.fps_limit) 476 477 current_fps = self._clock.get_fps() 478 if current_fps > 0: 479 self._metrics.fps._push(current_fps) 480 481 if self.config.print_fps: 482 print(f"FPS: {current_fps:.1f}") 483 484 def __visualise_chunks(self): 485 """Visualise the proximity chunks by drawing their borders.""" 486 487 colour = pg.Color(255, 255, 255, 122) 488 chunk_size = self._proximity.chunk_size 489 490 width, height = self.config.window.as_tuple() 491 492 for x in range(chunk_size, width, chunk_size): 493 vline(self._screen, x, 0, height, colour) 494 495 for y in range(chunk_size, height, chunk_size): 496 hline(self._screen, 0, width, y, colour) 497 498 def _load_image(self, path: str) -> pg.surface.Surface: 499 return super()._load_image(path).convert_alpha()
This class offers the same functionality as HeadlessSimulation
,
but adds logic to automatically draw all agents, obstacles and sites to your screen.
If a custom config isn't provided when creating the simulation, the default values of Config
will be used instead.
423 def __init__(self, config: Optional[Config] = None): 424 super().__init__(config) 425 426 pg.display.init() 427 pg.display.set_caption("Violet") 428 429 size = self.config.window.as_tuple() 430 self._screen = pg.display.set_mode(size) 431 432 # Initialise background 433 self._background = pg.surface.Surface(size).convert() 434 self._background.fill((0, 0, 0)) 435 436 # Show background immediately (before spawning agents) 437 self._screen.blit(self._background, (0, 0)) 438 pg.display.flip() 439 440 # Initialise the clock. Used to cap FPS. 441 self._clock = pg.time.Clock()
443 def before_update(self): 444 rebound = [] 445 for event in pg.event.get(eventtype=[pg.QUIT, pg.KEYDOWN]): 446 if event.type == pg.QUIT: 447 self.stop() 448 elif event.type == pg.KEYDOWN: 449 if event.key == pg.K_HOME: 450 self.config.radius += 1 451 elif event.key == pg.K_END: 452 self.config.radius -= 1 453 else: 454 # If a different key was pressed, then we want to re-emit the vent 455 # so other code can handle it. 456 rebound.append(event) 457 458 for event in rebound: 459 pg.event.post(event) 460 461 # Clear the screen before the update so agents can draw stuff themselves too! 462 self._all.clear(self._screen, self._background) 463 self._screen.blit(self._background, (0, 0))
Run any code before the agents are updated in every tick.
You should override this method when inheriting Simulation to add your own logic.
Some examples include:
- Processing events from PyGame's event queue.
465 def after_update(self): 466 # Draw everything to the screen 467 self._all.draw(self._screen) 468 469 if self.config.visualise_chunks: 470 self.__visualise_chunks() 471 472 # Update the screen with the new image 473 pg.display.flip() 474 475 self._clock.tick(self.config.fps_limit) 476 477 current_fps = self._clock.get_fps() 478 if current_fps > 0: 479 self._metrics.fps._push(current_fps) 480 481 if self.config.print_fps: 482 print(f"FPS: {current_fps:.1f}")
Inherited Members
22def probability(threshold: float, prng: Optional[random.Random] = None) -> bool: 23 """Randomly retrieve True or False depending on the given probability. 24 25 The probability should be between 0 and 1. 26 If you give a probability equal or higher than 1, this function will always return True. 27 Likewise, if you give a probability equal or lower than 0, this function will always return False. 28 """ 29 30 get_random = prng.random if prng else random.random 31 return threshold > get_random()
Randomly retrieve True or False depending on the given probability.
The probability should be between 0 and 1. If you give a probability equal or higher than 1, this function will always return True. Likewise, if you give a probability equal or lower than 0, this function will always return False.
999def dataclass(cls=None, /, *, init=True, repr=True, eq=True, order=False, 1000 unsafe_hash=False, frozen=False): 1001 """Returns the same class as was passed in, with dunder methods 1002 added based on the fields defined in the class. 1003 1004 Examines PEP 526 __annotations__ to determine fields. 1005 1006 If init is true, an __init__() method is added to the class. If 1007 repr is true, a __repr__() method is added. If order is true, rich 1008 comparison dunder methods are added. If unsafe_hash is true, a 1009 __hash__() method function is added. If frozen is true, fields may 1010 not be assigned to after instance creation. 1011 """ 1012 1013 def wrap(cls): 1014 return _process_class(cls, init, repr, eq, order, unsafe_hash, frozen) 1015 1016 # See if we're being called as @dataclass or @dataclass(). 1017 if cls is None: 1018 # We're called with parens. 1019 return wrap 1020 1021 # We're called as @dataclass without parens. 1022 return wrap(cls)
Returns the same class as was passed in, with dunder methods added based on the fields defined in the class.
Examines PEP 526 __annotations__ to determine fields.
If init is true, an __init__() method is added to the class. If repr is true, a __repr__() method is added. If order is true, rich comparison dunder methods are added. If unsafe_hash is true, a __hash__() method function is added. If frozen is true, fields may not be assigned to after instance creation.
157@dataclass_transform() 158def deserialize( 159 _cls=None, 160 rename_all: Optional[str] = None, 161 reuse_instances_default: bool = True, 162 convert_sets_default: bool = False, 163 deserializer: Optional[DeserializeFunc] = None, 164 tagging: Tagging = DefaultTagging, 165 type_check: TypeCheck = NoCheck, 166 **kwargs, 167): 168 """ 169 A dataclass with this decorator is deserializable from any of the data formats supported by pyserde. 170 171 >>> from serde import deserialize 172 >>> from serde.json import from_json 173 >>> 174 >>> @deserialize 175 ... class Foo: 176 ... i: int 177 ... s: str 178 ... f: float 179 ... b: bool 180 >>> 181 >>> from_json(Foo, '{"i": 10, "s": "foo", "f": 100.0, "b": true}') 182 Foo(i=10, s='foo', f=100.0, b=True) 183 184 #### Class Attributes 185 186 Class attributes can be specified as arguments in the `deserialize` decorator in order to customize the 187 deserialization behaviour of the class entirely. 188 189 * `rename_all` attribute converts field names into the specified string case. 190 The following example converts camel-case field names into snake-case names. 191 192 >>> @deserialize(rename_all = 'camelcase') 193 ... class Foo: 194 ... int_field: int 195 ... str_field: str 196 >>> 197 >>> from_json(Foo, '{"intField": 10, "strField": "foo"}') 198 Foo(int_field=10, str_field='foo') 199 200 * `deserializer` takes a custom class-level deserialize function. The function applies to the all the fields 201 in the class. 202 203 >>> from datetime import datetime 204 >>> def deserializer(cls, o): 205 ... if cls is datetime: 206 ... return datetime.strptime(o, '%d/%m/%y') 207 ... else: 208 ... raise SerdeSkip() 209 210 The first argument `cls` is a class of the field and the second argument `o` is value to deserialize from. 211 `deserializer` function will be called for every field. If you don't want to use the custom deserializer 212 for a certain field, raise `serde.SerdeSkip` exception, pyserde will use the default deserializer for that field. 213 214 >>> @deserialize(deserializer=deserializer) 215 ... class Foo: 216 ... i: int 217 ... dt: datetime 218 219 This custom deserializer deserializes `datetime` the string in `MM/DD/YY` format into datetime object. 220 221 >>> from_json(Foo, '{"i": 10, "dt": "01/01/21"}') 222 Foo(i=10, dt=datetime.datetime(2021, 1, 1, 0, 0)) 223 """ 224 225 stack = [] 226 227 def wrap(cls: Type): 228 if cls in stack: 229 return 230 stack.append(cls) 231 232 tagging.check() 233 234 # If no `dataclass` found in the class, dataclassify it automatically. 235 if not is_dataclass(cls): 236 dataclass(cls) 237 238 g: Dict[str, Any] = {} 239 240 # Create a scope storage used by serde. 241 # Each class should get own scope. Child classes can not share scope with parent class. 242 # That's why we need the "scope.cls is not cls" check. 243 scope: Optional[SerdeScope] = getattr(cls, SERDE_SCOPE, None) 244 if scope is None or scope.cls is not cls: 245 scope = SerdeScope(cls, reuse_instances_default=reuse_instances_default) 246 setattr(cls, SERDE_SCOPE, scope) 247 248 # Set some globals for all generated functions 249 g['cls'] = cls 250 g['serde_scope'] = scope 251 g['SerdeError'] = SerdeError 252 g['UserError'] = UserError 253 g['raise_unsupported_type'] = raise_unsupported_type 254 g['typename'] = typename # used in union functions 255 g['ensure'] = ensure 256 g['typing'] = typing 257 g['collections'] = collections 258 g['Literal'] = Literal 259 g['from_obj'] = from_obj 260 g['get_generic_arg'] = get_generic_arg 261 g['is_instance'] = is_instance 262 g['TypeCheck'] = TypeCheck 263 g['NoCheck'] = NoCheck 264 g['coerce'] = coerce 265 g['_get_by_aliases'] = _get_by_aliases 266 if deserialize: 267 g['serde_custom_class_deserializer'] = functools.partial( 268 serde_custom_class_deserializer, custom=deserializer 269 ) 270 271 # Collect types used in the generated code. 272 for typ in iter_types(cls): 273 # When we encounter a dataclass not marked with deserialize, then also generate 274 # deserialize functions for it. 275 if is_dataclass_without_de(typ): 276 # We call deserialize and not wrap to make sure that we will use the default serde 277 # configuration for generating the deserialization function. 278 deserialize(typ) 279 if typ is cls or (is_primitive(typ) and not is_enum(typ)): 280 continue 281 if is_generic(typ): 282 g[typename(typ)] = get_origin(typ) 283 else: 284 g[typename(typ)] = typ 285 286 # render all union functions 287 for union in iter_unions(cls): 288 union_args = type_args(union) 289 add_func( 290 scope, union_func_name(UNION_DE_PREFIX, union_args), render_union_func(cls, union_args, tagging), g 291 ) 292 293 # render literal functions 294 for literal in iter_literals(cls): 295 literal_args = type_args(literal) 296 add_func(scope, literal_func_name(literal_args), render_literal_func(cls, literal_args), g) 297 298 # Collect default values and default factories used in the generated code. 299 for f in defields(cls): 300 assert f.name 301 if has_default(f): 302 scope.defaults[f.name] = f.default 303 elif has_default_factory(f): 304 scope.defaults[f.name] = f.default_factory 305 if f.deserializer: 306 g[f.deserializer.name] = f.deserializer 307 308 add_func(scope, FROM_ITER, render_from_iter(cls, deserializer, type_check), g) 309 add_func(scope, FROM_DICT, render_from_dict(cls, rename_all, deserializer, type_check), g) 310 add_func(scope, TYPE_CHECK, render_type_check(cls), g) 311 312 logger.debug(f'{typename(cls)}: {SERDE_SCOPE} {scope}') 313 314 stack.pop() 315 return cls 316 317 if _cls is None: 318 return wrap # type: ignore 319 320 if _cls in GENERATION_STACK: 321 return _cls 322 323 GENERATION_STACK.append(_cls) 324 try: 325 return wrap(_cls) 326 finally: 327 GENERATION_STACK.pop()
A dataclass with this decorator is deserializable from any of the data formats supported by pyserde.
>>> from serde import deserialize
>>> from serde.json import from_json
>>>
>>> @deserialize
... class Foo:
... i: int
... s: str
... f: float
... b: bool
>>>
>>> from_json(Foo, '{"i": 10, "s": "foo", "f": 100.0, "b": true}')
Foo(i=10, s='foo', f=100.0, b=True)
Class Attributes
Class attributes can be specified as arguments in the deserialize
decorator in order to customize the
deserialization behaviour of the class entirely.
rename_all
attribute converts field names into the specified string case. The following example converts camel-case field names into snake-case names.
>>> @deserialize(rename_all = 'camelcase')
... class Foo:
... int_field: int
... str_field: str
>>>
>>> from_json(Foo, '{"intField": 10, "strField": "foo"}')
Foo(int_field=10, str_field='foo')
deserializer
takes a custom class-level deserialize function. The function applies to the all the fields in the class.
>>> from datetime import datetime
>>> def deserializer(cls, o):
... if cls is datetime:
... return datetime.strptime(o, '%d/%m/%y')
... else:
... raise SerdeSkip()
The first argument cls
is a class of the field and the second argument o
is value to deserialize from.
deserializer
function will be called for every field. If you don't want to use the custom deserializer
for a certain field, raise serde.SerdeSkip
exception, pyserde will use the default deserializer for that field.
>>> @deserialize(deserializer=deserializer)
... class Foo:
... i: int
... dt: datetime
This custom deserializer deserializes datetime
the string in MM/DD/YY
format into datetime object.
>>> from_json(Foo, '{"i": 10, "dt": "01/01/21"}')
Foo(i=10, dt=datetime.datetime(2021, 1, 1, 0, 0))
151@dataclass_transform() 152def serialize( 153 _cls=None, 154 rename_all: Optional[str] = None, 155 reuse_instances_default: bool = True, 156 convert_sets_default: bool = False, 157 serializer: Optional[SerializeFunc] = None, 158 tagging: Tagging = DefaultTagging, 159 type_check: TypeCheck = NoCheck, 160 serialize_class_var: bool = False, 161 **kwargs, 162): 163 """ 164 A dataclass with this decorator is serializable into any of the data formats supported by pyserde. 165 166 >>> from datetime import datetime 167 >>> from serde import serialize 168 >>> from serde.json import to_json 169 >>> 170 >>> @serialize 171 ... class Foo: 172 ... i: int 173 ... s: str 174 ... f: float 175 ... b: bool 176 >>> 177 >>> to_json(Foo(i=10, s='foo', f=100.0, b=True)) 178 '{"i":10,"s":"foo","f":100.0,"b":true}' 179 180 #### Class Attributes 181 182 Class attributes can be specified as arguments in the `serialize` decorator in order to customize the serialization 183 behaviour of the class entirely. 184 185 * `rename_all` attribute converts field names into the specified string case. 186 The following example converts snake-case field names into camel-case names. 187 188 >>> @serialize(rename_all = 'camelcase') 189 ... class Foo: 190 ... int_field: int 191 ... str_field: str 192 >>> 193 >>> to_json(Foo(int_field=10, str_field='foo')) 194 '{"intField":10,"strField":"foo"}' 195 196 * `serializer` takes a custom class-level serialize function. The function applies to the all the fields 197 in the class. 198 199 >>> def serializer(cls, o): 200 ... if cls is datetime: 201 ... return o.strftime('%d/%m/%y') 202 ... else: 203 ... raise SerdeSkip() 204 205 The first argument `cls` is a class of the field and the second argument `o` is a value of the field. 206 `serializer` function will be called for every field. If you don't want to use the custom serializer 207 for a certain field, raise `serde.SerdeSkip` exception, pyserde will use the default serializer for that field. 208 209 >>> @serialize(serializer=serializer) 210 ... class Foo: 211 ... i: int 212 ... dt: datetime 213 214 This custom serializer serializes `datetime` object into the string in `MM/DD/YY` format. 215 216 >>> to_json(Foo(10, datetime(2021, 1, 1, 0, 0, 0))) 217 '{"i":10,"dt":"01/01/21"}' 218 219 * `serialize_class_var` enables `typing.ClassVar` serialization. 220 221 >>> @serialize(serialize_class_var=True) 222 ... class Foo: 223 ... v: typing.ClassVar[int] = 10 224 >>> 225 >>> to_json(Foo()) 226 '{"v":10}' 227 228 """ 229 230 def wrap(cls: Type[Any]): 231 tagging.check() 232 233 # If no `dataclass` found in the class, dataclassify it automatically. 234 if not is_dataclass(cls): 235 dataclass(cls) 236 237 g: Dict[str, Any] = {} 238 239 # Create a scope storage used by serde. 240 # Each class should get own scope. Child classes can not share scope with parent class. 241 # That's why we need the "scope.cls is not cls" check. 242 scope: Optional[SerdeScope] = getattr(cls, SERDE_SCOPE, None) 243 if scope is None or scope.cls is not cls: 244 scope = SerdeScope( 245 cls, 246 reuse_instances_default=reuse_instances_default, 247 convert_sets_default=convert_sets_default, 248 ) 249 setattr(cls, SERDE_SCOPE, scope) 250 251 # Set some globals for all generated functions 252 g["cls"] = cls 253 g["copy"] = copy 254 g["serde_scope"] = scope 255 g["SerdeError"] = SerdeError 256 g["raise_unsupported_type"] = raise_unsupported_type 257 g["enum_value"] = enum_value 258 g["is_dataclass"] = is_dataclass 259 g["typename"] = typename # used in union functions 260 g["is_instance"] = is_instance # used in union functions 261 g["to_obj"] = to_obj 262 g["typing"] = typing 263 g["Literal"] = Literal 264 g["TypeCheck"] = TypeCheck 265 g["NoCheck"] = NoCheck 266 g["coerce"] = coerce 267 if serialize: 268 g["serde_custom_class_serializer"] = functools.partial(serde_custom_class_serializer, custom=serializer) 269 270 # Collect types used in the generated code. 271 for typ in iter_types(cls): 272 # When we encounter a dataclass not marked with serialize, then also generate serialize 273 # functions for it. 274 if is_dataclass_without_se(typ): 275 # We call serialize and not wrap to make sure that we will use the default serde 276 # configuration for generating the serialization function. 277 serialize(typ) 278 279 if typ is cls or (is_primitive(typ) and not is_enum(typ)): 280 continue 281 g[typename(typ)] = typ 282 283 # render all union functions 284 for union in iter_unions(cls): 285 union_args = type_args(union) 286 union_key = union_func_name(UNION_SE_PREFIX, union_args) 287 add_func(scope, union_key, render_union_func(cls, union_args, tagging), g) 288 scope.union_se_args[union_key] = union_args 289 290 for f in sefields(cls, serialize_class_var): 291 if f.skip_if: 292 g[f.skip_if.name] = f.skip_if 293 if f.serializer: 294 g[f.serializer.name] = f.serializer 295 296 add_func(scope, TO_ITER, render_to_tuple(cls, serializer, type_check, serialize_class_var), g) 297 add_func(scope, TO_DICT, render_to_dict(cls, rename_all, serializer, type_check, serialize_class_var), g) 298 add_func(scope, TYPE_CHECK, render_type_check(cls), g) 299 300 logger.debug(f"{typename(cls)}: {SERDE_SCOPE} {scope}") 301 302 return cls 303 304 if _cls is None: 305 return wrap # type: ignore 306 307 if _cls in GENERATION_STACK: 308 return _cls 309 310 GENERATION_STACK.append(_cls) 311 try: 312 return wrap(_cls) 313 finally: 314 GENERATION_STACK.pop()
A dataclass with this decorator is serializable into any of the data formats supported by pyserde.
>>> from datetime import datetime
>>> from serde import serialize
>>> from serde.json import to_json
>>>
>>> @serialize
... class Foo:
... i: int
... s: str
... f: float
... b: bool
>>>
>>> to_json(Foo(i=10, s='foo', f=100.0, b=True))
'{"i":10,"s":"foo","f":100.0,"b":true}'
Class Attributes
Class attributes can be specified as arguments in the serialize
decorator in order to customize the serialization
behaviour of the class entirely.
rename_all
attribute converts field names into the specified string case. The following example converts snake-case field names into camel-case names.
>>> @serialize(rename_all = 'camelcase')
... class Foo:
... int_field: int
... str_field: str
>>>
>>> to_json(Foo(int_field=10, str_field='foo'))
'{"intField":10,"strField":"foo"}'
serializer
takes a custom class-level serialize function. The function applies to the all the fields in the class.
>>> def serializer(cls, o):
... if cls is datetime:
... return o.strftime('%d/%m/%y')
... else:
... raise SerdeSkip()
The first argument cls
is a class of the field and the second argument o
is a value of the field.
serializer
function will be called for every field. If you don't want to use the custom serializer
for a certain field, raise serde.SerdeSkip
exception, pyserde will use the default serializer for that field.
>>> @serialize(serializer=serializer)
... class Foo:
... i: int
... dt: datetime
This custom serializer serializes datetime
object into the string in MM/DD/YY
format.
>>> to_json(Foo(10, datetime(2021, 1, 1, 0, 0, 0)))
'{"i":10,"dt":"01/01/21"}'
serialize_class_var
enablestyping.ClassVar
serialization.
>>> @serialize(serialize_class_var=True)
... class Foo:
... v: typing.ClassVar[int] = 10
>>>
>>> to_json(Foo())
'{"v":10}'
Vector2() -> Vector2(0, 0) Vector2(int) -> Vector2 Vector2(float) -> Vector2 Vector2(Vector2) -> Vector2 Vector2(x, y) -> Vector2 Vector2((x, y)) -> Vector2 a 2-Dimensional Vector
length_squared() -> float returns the squared Euclidean length of the vector.
magnitude_squared() -> float returns the squared magnitude of the vector.
rotate_ip(angle) -> None rotates the vector by a given angle in degrees in place.
rotate_rad_ip(angle) -> None rotates the vector by a given angle in radians in place.
rotate_ip_rad(angle) -> None rotates the vector by a given angle in radians in place.
move_towards(Vector2, float) -> Vector2 returns a vector moved toward the target by a given distance.
move_towards_ip(Vector2, float) -> None moves the vector toward its target at a given distance.
slerp(Vector2, float) -> Vector2 returns a spherical interpolation to the given vector.
lerp(Vector2, float) -> Vector2 returns a linear interpolation to the given vector.
normalize() -> Vector2 returns a vector with the same direction but length 1.
normalize_ip() -> None normalizes the vector in place so that its length is 1.
is_normalized() -> Bool tests if the vector is normalized i.e. has length == 1.
angle_to(Vector2) -> float calculates the angle to a given vector in degrees.
update() -> None update(int) -> None update(float) -> None update(Vector2) -> None update(x, y) -> None update((x, y)) -> None Sets the coordinates of the vector.
distance_to(Vector2) -> float calculates the Euclidean distance to a given vector.
distance_squared_to(Vector2) -> float calculates the squared Euclidean distance to a given vector.
elementwise() -> VectorElementwiseProxy The next operation will be performed elementwise.
as_polar() -> (r, phi) returns a tuple with radial distance and azimuthal angle.
clamp_magnitude(max_length) -> Vector2 clamp_magnitude(min_length, max_length) -> Vector2 Returns a copy of a vector with the magnitude clamped between max_length and min_length.