refactoring and dynamic neighbourhood
This commit is contained in:
parent
4c44cc1002
commit
1b177ff686
@ -1,43 +1,43 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import random
|
||||
from multiprocessing import freeze_support
|
||||
from cellular_automaton import *
|
||||
from cellular_automaton.ca_rule import Rule
|
||||
from cellular_automaton.ca_cell_state import CellState, SynchronousCellState
|
||||
|
||||
|
||||
class TestRule(Rule):
|
||||
@staticmethod
|
||||
def evolve_cell(last_cell_state, neighbours_last_states):
|
||||
def evolve_cell(last_cell_state, neighbors_last_states):
|
||||
try:
|
||||
return neighbours_last_states[0]
|
||||
return neighbors_last_states[0]
|
||||
except IndexError:
|
||||
return last_cell_state
|
||||
|
||||
|
||||
class MyState(SynchronousCellState):
|
||||
# class MyState(SynchronousCellState):
|
||||
class MyState(CellState):
|
||||
random_seed = random.seed(1000)
|
||||
|
||||
def __init__(self):
|
||||
rand = random.randrange(0, 101, 1)
|
||||
init = max(.0, float(rand - 99))
|
||||
super().__init__((init,), draw_first_state=init > 0)
|
||||
|
||||
def get_state_draw_color(self, iteration):
|
||||
state1 = self.get_state_of_iteration(iteration)[0]
|
||||
return [255 if state1 else 0, 0, 0]
|
||||
|
||||
|
||||
def make_cellular_automaton(dimension, neighborhood, rule, state_class):
|
||||
cells = CAFactory.make_cellular_automaton(dimension=dimension, neighborhood=neighborhood, state_class=state_class)
|
||||
return CellularAutomaton(cells, dimension, rule)
|
||||
def get_state_draw_color(self, evolution_step):
|
||||
state = self.get_state_of_evolution_step(evolution_step)[0]
|
||||
return [255 if state else 0, 0, 0]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
freeze_support()
|
||||
|
||||
random.seed(1000)
|
||||
from cellular_automaton import *
|
||||
# best single is 400/400 with 0,2 ca speed and 0,09 redraw / multi is 300/300 with 0.083
|
||||
neighborhood = MooreNeighborhood(EdgeRule.FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS)
|
||||
ca = make_cellular_automaton(dimension=[100, 100], neighborhood=neighborhood, rule=TestRule(), state_class=MyState)
|
||||
ca_processor = CellularAutomatonMultiProcessor(cellular_automaton=ca, process_count=4)
|
||||
ca = CAFactory.make_cellular_automaton(dimension=[100, 100],
|
||||
neighborhood=neighborhood,
|
||||
rule=TestRule(),
|
||||
state_class=MyState)
|
||||
# ca_processor = CellularAutomatonMultiProcessor(cellular_automaton=ca, process_count=4)
|
||||
ca_processor = CellularAutomatonProcessor(cellular_automaton=ca)
|
||||
|
||||
ca_window = PyGameFor2D(window_size=[1000, 800], cellular_automaton=ca)
|
||||
ca_window.main_loop(cellular_automaton_processor=ca_processor, ca_iterations_per_draw=1)
|
||||
ca_window.main_loop(cellular_automaton_processor=ca_processor, evolution_steps_per_draw=1)
|
||||
|
Binary file not shown.
@ -1,7 +1,8 @@
|
||||
from .ca_cell import *
|
||||
from .ca_cell_state import *
|
||||
from .ca_display import *
|
||||
from .ca_factory import *
|
||||
from .ca_neighborhood import *
|
||||
from .ca_rule import *
|
||||
from .ca_state import *
|
||||
from .cellular_automaton import *
|
||||
from .ca_factory import *
|
||||
|
@ -5,19 +5,17 @@ from typing import Type
|
||||
class Cell:
|
||||
def __init__(self, state_class: Type[CellState]):
|
||||
self.state = state_class()
|
||||
self.neighbours = []
|
||||
self.neighbor_states = []
|
||||
|
||||
@staticmethod
|
||||
def evolve_if_ready(cell, rule, iteration):
|
||||
if cell.state.is_active(iteration):
|
||||
new_state = rule(cell.state.get_state_of_last_iteration(iteration),
|
||||
[n.get_state_of_last_iteration(iteration) for n in cell.neighbours])
|
||||
Cell.set_new_state_and_activate(cell, new_state, iteration)
|
||||
def evolve_if_ready(self, rule, evolution_step):
|
||||
if self.state.is_active(evolution_step):
|
||||
new_state = rule(self.state.get_state_of_last_evolution_step(evolution_step),
|
||||
[n.get_state_of_last_evolution_step(evolution_step) for n in self.neighbor_states])
|
||||
self.set_new_state_and_activate(new_state, evolution_step)
|
||||
|
||||
@staticmethod
|
||||
def set_new_state_and_activate(cell, new_state: CellState, iteration):
|
||||
changed = cell.state.set_state_of_iteration(new_state, iteration)
|
||||
def set_new_state_and_activate(self, new_state: CellState, evolution_step):
|
||||
changed = self.state.set_state_of_evolution_step(new_state, evolution_step)
|
||||
if changed:
|
||||
cell.state.set_active_for_next_iteration(iteration)
|
||||
for n in cell.neighbours:
|
||||
n.set_active_for_next_iteration(iteration)
|
||||
self.state.set_active_for_next_evolution_step(evolution_step)
|
||||
for n in self.neighbor_states:
|
||||
n.set_active_for_next_evolution_step(evolution_step)
|
||||
|
@ -16,63 +16,82 @@ class CellState:
|
||||
self._active[0] = True
|
||||
self._dirty = draw_first_state
|
||||
|
||||
def is_active(self, iteration):
|
||||
return self._active[self._calculate_slot(iteration)]
|
||||
def is_active(self, current_evolution_step):
|
||||
""" Returns the active status for the requested evolution_step
|
||||
:param current_evolution_step: The evolution_step of interest.
|
||||
:return: True if the cell state is set active for this evolution_step.
|
||||
"""
|
||||
return self._active[self._calculate_slot(current_evolution_step)]
|
||||
|
||||
def set_active_for_next_iteration(self, iteration):
|
||||
self._active[self._calculate_slot(iteration + 1)] = True
|
||||
def set_active_for_next_evolution_step(self, current_evolution_step):
|
||||
""" Sets the cell active for the next evolution_step, so it will be evolved.
|
||||
:param current_evolution_step: The current evolution_step index.
|
||||
:return:
|
||||
"""
|
||||
self._active[self._calculate_slot(current_evolution_step + 1)] = True
|
||||
|
||||
def is_set_for_redraw(self):
|
||||
""" States if this state should be redrawn.
|
||||
:return: True if redraw is needed.
|
||||
"""
|
||||
return self._dirty
|
||||
|
||||
def was_redrawn(self):
|
||||
""" Remove the state from redraw cycle until next state change """
|
||||
self._dirty = False
|
||||
|
||||
def get_state_of_last_iteration(self, current_iteration_index):
|
||||
return self.get_state_of_iteration(current_iteration_index - 1)
|
||||
def get_state_of_last_evolution_step(self, current_evolution_step):
|
||||
return self.get_state_of_evolution_step(current_evolution_step - 1)
|
||||
|
||||
def get_state_of_iteration(self, iteration):
|
||||
""" Will return the state for the iteration modulo number of saved states.
|
||||
:param iteration: Uses the iteration index, to differ between concurrent states.
|
||||
:return The state for this iteration.
|
||||
def get_state_of_evolution_step(self, evolution_step):
|
||||
""" Returns the state of the evolution_step.
|
||||
:param evolution_step: Uses the evolution_step index, to differ between concurrent states.
|
||||
:return The state of this evolution_step.
|
||||
"""
|
||||
return self._state_slots[self._calculate_slot(iteration)]
|
||||
return self._state_slots[self._calculate_slot(evolution_step)]
|
||||
|
||||
def set_state_of_iteration(self, new_state, iteration):
|
||||
""" Will set the new state for the iteration modulo number of saved states.
|
||||
def set_state_of_evolution_step(self, new_state, evolution_step):
|
||||
""" Sets the new state for the evolution_step.
|
||||
:param new_state: The new state to set.
|
||||
:param iteration: Uses the iteration index, to differ between concurrent states.
|
||||
:return True if state has changed.
|
||||
:param evolution_step: The evolution_step index, to differ between concurrent states.
|
||||
:return True if the state really changed.
|
||||
"""
|
||||
self._change_state_values(new_state, iteration)
|
||||
changed = self._did_state_change(iteration)
|
||||
changed = self._set_new_state_if_valid(new_state, evolution_step)
|
||||
self._dirty |= changed
|
||||
self._active[self._calculate_slot(iteration)] = False
|
||||
|
||||
self._active[self._calculate_slot(evolution_step)] = False
|
||||
return changed
|
||||
|
||||
def _did_state_change(self, iteration):
|
||||
for a, b in zip(self._state_slots[self._calculate_slot(iteration)],
|
||||
self._state_slots[self._calculate_slot(iteration - 1)]):
|
||||
if a != b:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _change_state_values(self, new_state, iteration):
|
||||
current_state = self.get_state_of_iteration(iteration)
|
||||
def _set_new_state_if_valid(self, new_state, evolution_step):
|
||||
current_state = self.get_state_of_evolution_step(evolution_step)
|
||||
if len(new_state) != len(current_state):
|
||||
raise IndexError("State length may not change!")
|
||||
|
||||
self.__change_current_state_values(current_state, new_state)
|
||||
return self.__did_state_change(evolution_step)
|
||||
|
||||
@staticmethod
|
||||
def __change_current_state_values(current_state, new_state):
|
||||
for i, ns in enumerate(new_state):
|
||||
if current_state[i] != ns:
|
||||
current_state[i] = ns
|
||||
|
||||
def get_state_draw_color(self, iteration):
|
||||
def __did_state_change(self, evolution_step):
|
||||
for a, b in zip(self.get_state_of_evolution_step(evolution_step),
|
||||
self.get_state_of_last_evolution_step(evolution_step)):
|
||||
if a != b:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_state_draw_color(self, evolution_step):
|
||||
""" When implemented should return the color representing the requested state.
|
||||
:param evolution_step: Requested evolution_step.
|
||||
:return: Color of the state as rgb tuple
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def _calculate_slot(cls, iteration):
|
||||
return iteration % cls._state_save_slot_count
|
||||
def _calculate_slot(cls, evolution_step):
|
||||
return evolution_step % cls._state_save_slot_count
|
||||
|
||||
|
||||
class SynchronousCellState(CellState):
|
||||
@ -86,8 +105,8 @@ class SynchronousCellState(CellState):
|
||||
self._active[0].value = True
|
||||
self._dirty = RawValue(c_bool, draw_first_state)
|
||||
|
||||
def set_active_for_next_iteration(self, iteration):
|
||||
self._active[self._calculate_slot(iteration + 1)].value = True
|
||||
def set_active_for_next_evolution_step(self, current_evolution_step):
|
||||
self._active[self._calculate_slot(current_evolution_step + 1)].value = True
|
||||
|
||||
def is_set_for_redraw(self):
|
||||
return self._dirty.value
|
||||
@ -95,13 +114,8 @@ class SynchronousCellState(CellState):
|
||||
def was_redrawn(self):
|
||||
self._dirty.value = False
|
||||
|
||||
def set_state_of_iteration(self, new_state, iteration):
|
||||
self._change_state_values(new_state, iteration)
|
||||
changed = self._did_state_change(iteration)
|
||||
def set_state_of_evolution_step(self, new_state, evolution_step):
|
||||
changed = self._set_new_state_if_valid(new_state, evolution_step)
|
||||
self._dirty.value |= changed
|
||||
self._active[self._calculate_slot(iteration)].value = False
|
||||
self._active[self._calculate_slot(evolution_step)].value = False
|
||||
return changed
|
||||
|
||||
@classmethod
|
||||
def _calculate_slot(cls, iteration):
|
||||
return iteration % cls._state_save_slot_count
|
||||
|
@ -7,7 +7,8 @@ import pstats
|
||||
from pympler import asizeof
|
||||
|
||||
|
||||
from cellular_automaton.cellular_automaton import CellularAutomaton, CellularAutomatonProcessor
|
||||
from cellular_automaton.ca_state import CellularAutomatonState
|
||||
from cellular_automaton.cellular_automaton import CellularAutomatonProcessor
|
||||
|
||||
|
||||
class _DisplayInfo:
|
||||
@ -19,7 +20,7 @@ class _DisplayInfo:
|
||||
|
||||
|
||||
class DisplayFor2D:
|
||||
def __init__(self, grid_rect: list, cellular_automaton: CellularAutomaton, screen):
|
||||
def __init__(self, grid_rect: list, cellular_automaton: CellularAutomatonState, screen):
|
||||
self._cellular_automaton = cellular_automaton
|
||||
cell_size = self._calculate_cell_display_size(grid_rect[-2:])
|
||||
self._display_info = _DisplayInfo(grid_rect[-2:], grid_rect[:2], cell_size, screen)
|
||||
@ -31,8 +32,8 @@ class DisplayFor2D:
|
||||
def _cell_redraw_rectangles(self):
|
||||
for coordinate, cell in self._cellular_automaton.cells.items():
|
||||
if cell.state.is_set_for_redraw():
|
||||
cell_color = cell.state.get_state_draw_color(self._cellular_automaton.evolution_iteration_index)
|
||||
cell_pos = _calculate_cell_position(self._display_info.cell_size, coordinate)
|
||||
cell_color = cell.state.get_state_draw_color(self._cellular_automaton.current_evolution_step)
|
||||
cell_pos = self._calculate_cell_position(self._display_info.cell_size, coordinate)
|
||||
surface_pos = list(map(operator.add, cell_pos, self._display_info.grid_pos))
|
||||
yield self._display_info.screen.fill(cell_color, (surface_pos, self._display_info.cell_size))
|
||||
cell.state.was_redrawn()
|
||||
@ -41,9 +42,13 @@ class DisplayFor2D:
|
||||
grid_dimension = self._cellular_automaton.dimension
|
||||
return list(map(operator.truediv, grid_size, grid_dimension))
|
||||
|
||||
@staticmethod
|
||||
def _calculate_cell_position(cell_size, coordinate):
|
||||
return list(map(operator.mul, cell_size, coordinate))
|
||||
|
||||
|
||||
class PyGameFor2D:
|
||||
def __init__(self, window_size: list, cellular_automaton: CellularAutomaton):
|
||||
def __init__(self, window_size: list, cellular_automaton: CellularAutomatonState):
|
||||
self._window_size = window_size
|
||||
self._cellular_automaton = cellular_automaton
|
||||
pygame.init()
|
||||
@ -63,7 +68,7 @@ class PyGameFor2D:
|
||||
update_rect = self._screen.blit(label, pos)
|
||||
pygame.display.update(update_rect)
|
||||
|
||||
def main_loop(self, cellular_automaton_processor: CellularAutomatonProcessor, ca_iterations_per_draw):
|
||||
def main_loop(self, cellular_automaton_processor: CellularAutomatonProcessor, evolution_steps_per_draw):
|
||||
running = True
|
||||
cellular_automaton_processor.evolve()
|
||||
first = True
|
||||
@ -75,7 +80,7 @@ class PyGameFor2D:
|
||||
self._evolve_with_performance(cellular_automaton_processor)
|
||||
first = False
|
||||
else:
|
||||
cellular_automaton_processor.evolve_x_times(ca_iterations_per_draw)
|
||||
cellular_automaton_processor.evolve_x_times(evolution_steps_per_draw)
|
||||
time_ca_end = time.time()
|
||||
self.ca_display.redraw_cellular_automaton()
|
||||
time_ds_end = time.time()
|
||||
@ -95,7 +100,3 @@ class PyGameFor2D:
|
||||
p.sort_stats('time').print_stats(10)
|
||||
print("TOTAL TIME: " + "{0:.4f}".format(time_ca_end - time_ca_start) + "s")
|
||||
print("SIZE: " + "{0:.4f}".format(size / (1024 * 1024)) + "MB")
|
||||
|
||||
|
||||
def _calculate_cell_position(cell_size, coordinate):
|
||||
return list(map(operator.mul, cell_size, coordinate))
|
||||
|
@ -1,5 +1,4 @@
|
||||
from cellular_automaton.ca_cell import Cell, CellState
|
||||
from cellular_automaton.ca_neighborhood import Neighborhood
|
||||
from cellular_automaton import *
|
||||
from typing import Type
|
||||
import itertools
|
||||
|
||||
@ -7,12 +6,12 @@ import itertools
|
||||
class CAFactory:
|
||||
@staticmethod
|
||||
def make_cellular_automaton(dimension,
|
||||
neighborhood: Type[Neighborhood],
|
||||
state_class: Type[CellState]):
|
||||
|
||||
neighborhood: Neighborhood,
|
||||
state_class: Type[CellState],
|
||||
rule: Type[Rule]):
|
||||
cells = CAFactory._make_cells(dimension, state_class)
|
||||
CAFactory._apply_neighbourhood_to_cells(cells, neighborhood, dimension)
|
||||
return cells
|
||||
CAFactory._apply_neighborhood_to_cells(cells, neighborhood, dimension)
|
||||
return CellularAutomatonState(cells, dimension, rule)
|
||||
|
||||
@staticmethod
|
||||
def _make_cells(dimension, state_class):
|
||||
@ -22,8 +21,8 @@ class CAFactory:
|
||||
return cells
|
||||
|
||||
@staticmethod
|
||||
def _apply_neighbourhood_to_cells(cells, neighborhood, dimension):
|
||||
def _apply_neighborhood_to_cells(cells, neighborhood, dimension):
|
||||
for coordinate, cell in cells.items():
|
||||
n_coordinates = neighborhood.calculate_cell_neighbor_coordinates(coordinate, dimension)
|
||||
cell.neighbours = [cells[tuple(nc)].state for nc in n_coordinates]
|
||||
cell.neighbor_states = [cells[tuple(nc)].state for nc in n_coordinates]
|
||||
|
||||
|
@ -1,56 +1,75 @@
|
||||
from enum import Enum
|
||||
from operator import add
|
||||
from itertools import product
|
||||
|
||||
|
||||
class EdgeRule(Enum):
|
||||
IGNORE_MISSING_NEIGHBORS_OF_EDGE_CELLS = 0
|
||||
IGNORE_EDGE_CELLS = 1
|
||||
IGNORE_EDGE_CELLS = 0
|
||||
IGNORE_MISSING_NEIGHBORS_OF_EDGE_CELLS = 1
|
||||
FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS = 2
|
||||
|
||||
|
||||
class Neighborhood:
|
||||
def __init__(self, neighbours_relative: list, edge_rule: EdgeRule):
|
||||
""" Defines a neighborhood for cells.
|
||||
:param neighbours_relative: List of relative coordinates of cells neighbours.
|
||||
def __init__(self, neighbors_relative, edge_rule: EdgeRule):
|
||||
""" Defines a neighborhood of a cell.
|
||||
:param neighbors_relative: List of relative coordinates for cell neighbors.
|
||||
:param edge_rule: EdgeRule to define, how cells on the edge of the grid will be handled.
|
||||
"""
|
||||
self._rel_neighbors = neighbours_relative
|
||||
self.edge_rule = edge_rule
|
||||
self.grid_dimensions = []
|
||||
self._rel_neighbors = neighbors_relative
|
||||
self.__edge_rule = edge_rule
|
||||
self.__grid_dimensions = []
|
||||
|
||||
def calculate_cell_neighbor_coordinates(self, cell_coordinate, grid_dimensions):
|
||||
""" Get a list of coordinates for the cell neighbors. The EdgeRule can reduce the returned neighbor count.
|
||||
:param cell_coordinate: The coordinate of the cell to get the neighbors
|
||||
:param grid_dimensions: The dimensions of the grid, to apply edge the rule.
|
||||
:return:
|
||||
""" Get a list of absolute coordinates for the cell neighbors.
|
||||
The EdgeRule can reduce the returned neighbor count.
|
||||
:param cell_coordinate: The coordinate of the cell.
|
||||
:param grid_dimensions: The dimensions of the grid, to apply the edge the rule.
|
||||
:return: list of absolute coordinates for the cells neighbors.
|
||||
"""
|
||||
self.grid_dimensions = grid_dimensions
|
||||
return list(self._neighbours_generator(cell_coordinate))
|
||||
self.__grid_dimensions = grid_dimensions
|
||||
return list(self.__neighbors_generator(cell_coordinate))
|
||||
|
||||
def _neighbours_generator(self, cell_coordinate):
|
||||
if not self._does_ignore_edge_cell_rule_apply(cell_coordinate):
|
||||
def __neighbors_generator(self, cell_coordinate):
|
||||
if not self.__does_ignore_edge_cell_rule_apply(cell_coordinate):
|
||||
for rel_n in self._rel_neighbors:
|
||||
yield from self._calculate_abs_neighbour_and_decide_validity(cell_coordinate, rel_n)
|
||||
yield from self.__calculate_abs_neighbor_and_decide_validity(cell_coordinate, rel_n)
|
||||
|
||||
def _calculate_abs_neighbour_and_decide_validity(self, cell_coordinate, rel_n):
|
||||
def __calculate_abs_neighbor_and_decide_validity(self, cell_coordinate, rel_n):
|
||||
n = list(map(add, rel_n, cell_coordinate))
|
||||
n_folded = self._apply_edge_overflow(n)
|
||||
if n == n_folded or self.edge_rule == EdgeRule.FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS:
|
||||
n_folded = self.__apply_edge_overflow(n)
|
||||
if n == n_folded or self.__edge_rule == EdgeRule.FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS:
|
||||
yield n_folded
|
||||
|
||||
def _does_ignore_edge_cell_rule_apply(self, coordinate):
|
||||
return self.edge_rule == EdgeRule.IGNORE_EDGE_CELLS and self._is_coordinate_on_an_edge(coordinate)
|
||||
def __does_ignore_edge_cell_rule_apply(self, coordinate):
|
||||
return self.__edge_rule == EdgeRule.IGNORE_EDGE_CELLS and self.__is_coordinate_on_an_edge(coordinate)
|
||||
|
||||
def _is_coordinate_on_an_edge(self, coordinate):
|
||||
return all(0 == ci or ci == di-1 for ci, di in zip(coordinate, self.grid_dimensions))
|
||||
def __is_coordinate_on_an_edge(self, coordinate):
|
||||
return all(0 == ci or ci == di-1 for ci, di in zip(coordinate, self.__grid_dimensions))
|
||||
|
||||
def _apply_edge_overflow(self, n):
|
||||
return list(map(lambda ni, di: (ni + di) % di, n, self.grid_dimensions))
|
||||
def __apply_edge_overflow(self, n):
|
||||
return list(map(lambda ni, di: (ni + di) % di, n, self.__grid_dimensions))
|
||||
|
||||
|
||||
class MooreNeighborhood(Neighborhood):
|
||||
def __init__(self, edge_rule: EdgeRule = EdgeRule.IGNORE_EDGE_CELLS):
|
||||
super().__init__([[-1, -1], [0, -1], [1, -1],
|
||||
[-1, 0], [1, 0],
|
||||
[-1, 1], [0, 1], [1, 1]],
|
||||
def __init__(self, edge_rule: EdgeRule = EdgeRule.IGNORE_EDGE_CELLS, range_=1, dimension=2):
|
||||
super().__init__(tuple(_rel_neighbor_generator(dimension, range_, lambda rel_n: True)),
|
||||
edge_rule)
|
||||
|
||||
|
||||
class VonNeumannNeighborhood(Neighborhood):
|
||||
def __init__(self, edge_rule: EdgeRule = EdgeRule.IGNORE_EDGE_CELLS, range_=1, dimension=2):
|
||||
self.range_ = range_
|
||||
super().__init__(tuple(_rel_neighbor_generator(dimension, range_, self.neighbor_rule)),
|
||||
edge_rule)
|
||||
|
||||
def neighbor_rule(self, rel_n):
|
||||
cross_sum = 0
|
||||
for ci in rel_n:
|
||||
cross_sum += abs(ci)
|
||||
return cross_sum <= self.range_
|
||||
|
||||
|
||||
def _rel_neighbor_generator(dimension, range_, rule):
|
||||
for c in product(range(-range_, range_ + 1), repeat=dimension):
|
||||
if rule(c) and c != (0, ) * dimension:
|
||||
yield tuple(reversed(c))
|
||||
|
@ -7,11 +7,11 @@ class Rule:
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def evolve_cell(last_cell_state, neighbours_last_states):
|
||||
def evolve_cell(last_cell_state, neighbors_last_states):
|
||||
""" Calculates and sets new state of 'cell'.
|
||||
:param last_cell_state: The cells current state to calculate new state for.
|
||||
:param neighbours_last_states: The cells neighbours current states.
|
||||
:param neighbors_last_states: The cells neighbors current states.
|
||||
:return: True if state changed, False if not.
|
||||
A cells evolution will only be called if it or at least one of its neighbours has changed last iteration cycle.
|
||||
A cells evolution will only be called if it or at least one of its neighbors has changed last evolution_step cycle.
|
||||
"""
|
||||
return False
|
||||
|
10
src/cellular_automaton/ca_state.py
Normal file
10
src/cellular_automaton/ca_state.py
Normal file
@ -0,0 +1,10 @@
|
||||
from cellular_automaton.ca_rule import Rule
|
||||
from typing import Type
|
||||
|
||||
|
||||
class CellularAutomatonState:
|
||||
def __init__(self, cells, dimension, evolution_rule: Type[Rule]):
|
||||
self.cells = cells
|
||||
self.dimension = dimension
|
||||
self.evolution_rule = evolution_rule
|
||||
self.current_evolution_step = -1
|
@ -1,18 +1,8 @@
|
||||
import multiprocessing
|
||||
|
||||
from cellular_automaton.ca_rule import Rule
|
||||
from cellular_automaton.ca_cell import Cell
|
||||
from multiprocessing import freeze_support
|
||||
from ctypes import c_int
|
||||
|
||||
|
||||
class CellularAutomaton:
|
||||
def __init__(self, cells, dimension, evolution_rule: Rule):
|
||||
self.cells = cells
|
||||
self.dimension = dimension
|
||||
self.evolution_rule = evolution_rule
|
||||
self.evolution_iteration_index = -1
|
||||
|
||||
|
||||
class CellularAutomatonProcessor:
|
||||
def __init__(self, cellular_automaton):
|
||||
self._ca = cellular_automaton
|
||||
@ -22,51 +12,51 @@ class CellularAutomatonProcessor:
|
||||
self.evolve()
|
||||
|
||||
def evolve(self):
|
||||
self._ca.evolution_iteration_index += 1
|
||||
i = self._ca.evolution_iteration_index
|
||||
self._ca.current_evolution_step += 1
|
||||
i = self._ca.current_evolution_step
|
||||
r = self._ca.evolution_rule.evolve_cell
|
||||
list(map(lambda c: Cell.evolve_if_ready((c.state, c.neighbours), r, i), tuple(self._ca.cells.items())))
|
||||
# print(sum(1 for c in self._ca.cells if c.state.is_set_for_redraw()))
|
||||
list(map(lambda c: c.evolve_if_ready(r, i), tuple(self._ca.cells.values())))
|
||||
|
||||
|
||||
class CellularAutomatonMultiProcessor(CellularAutomatonProcessor):
|
||||
def __init__(self, cellular_automaton, process_count: int = 2):
|
||||
freeze_support()
|
||||
if process_count < 1:
|
||||
raise ValueError
|
||||
|
||||
super().__init__(cellular_automaton)
|
||||
|
||||
self.evolve_range = range(len(self._ca.cells))
|
||||
self.evolution_iteration_index = multiprocessing.RawValue(c_int, -1)
|
||||
self.shared_evolution_step = multiprocessing.RawValue(c_int, self._ca.current_evolution_step)
|
||||
|
||||
self.__init_processes_and_clean_cell_instances(process_count)
|
||||
|
||||
def __init_processes_and_clean_cell_instances(self, process_count):
|
||||
self.pool = multiprocessing.Pool(processes=process_count,
|
||||
initializer=_init_process,
|
||||
initargs=(tuple(self._ca.cells.values()),
|
||||
self._ca.evolution_rule,
|
||||
self.evolution_iteration_index))
|
||||
self._evolve_method = self.pool.map
|
||||
|
||||
self.shared_evolution_step))
|
||||
for cell in self._ca.cells.values():
|
||||
del cell.neighbours
|
||||
del cell.neighbor_states
|
||||
|
||||
def evolve(self):
|
||||
self._ca.evolution_iteration_index += 1
|
||||
self.evolution_iteration_index.value = self._ca.evolution_iteration_index
|
||||
self._ca.current_evolution_step += 1
|
||||
self.shared_evolution_step.value = self._ca.current_evolution_step
|
||||
self.pool.map(_process_routine, self.evolve_range)
|
||||
|
||||
|
||||
global_cells = None
|
||||
global_rule = None
|
||||
global_iteration = None
|
||||
global_evolution_step = None
|
||||
|
||||
|
||||
def _init_process(cells, rule, index):
|
||||
global global_rule, global_cells, global_iteration
|
||||
global global_rule, global_cells, global_evolution_step
|
||||
global_cells = cells
|
||||
global_rule = rule
|
||||
global_iteration = index
|
||||
global_evolution_step = index
|
||||
|
||||
|
||||
def _process_routine(i):
|
||||
Cell.evolve_if_ready(global_cells[i], global_rule.evolve_cell, global_iteration.value)
|
||||
|
||||
global_cells[i].evolve_if_ready(global_rule.evolve_cell, global_evolution_step.value)
|
||||
|
@ -13,27 +13,27 @@ class TestState(CellState):
|
||||
class TestCellState(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.cell = Cell(TestState)
|
||||
self.neighbours = [TestState() for x in range(5)]
|
||||
for neighbour in self.neighbours:
|
||||
neighbour.set_state_of_iteration((0, ), 0)
|
||||
self.cell.neighbours = self.neighbours
|
||||
self.neighbors = [TestState() for x in range(5)]
|
||||
for neighbor in self.neighbors:
|
||||
neighbor.set_state_of_evolution_step((0, ), 0)
|
||||
self.cell.neighbor_states = self.neighbors
|
||||
|
||||
def cell_and_neighbours_active(self, iteration):
|
||||
self.neighbours.append(self.cell.state)
|
||||
def cell_and_neighbors_active(self, evolution_step):
|
||||
self.neighbors.append(self.cell.state)
|
||||
all_active = True
|
||||
for state in self.neighbours:
|
||||
if not state.is_active(iteration):
|
||||
for state in self.neighbors:
|
||||
if not state.is_active(evolution_step):
|
||||
all_active = False
|
||||
return all_active
|
||||
|
||||
def test_evolve_activation(self):
|
||||
Cell.evolve_if_ready(self.cell, (lambda a, b: (1,)), 0)
|
||||
all_active = self.cell_and_neighbours_active(1)
|
||||
self.cell.evolve_if_ready((lambda a, b: (1,)), 0)
|
||||
all_active = self.cell_and_neighbors_active(1)
|
||||
self.assertTrue(all_active)
|
||||
|
||||
def test_evolve_activation_on_no_change(self):
|
||||
Cell.evolve_if_ready(self.cell, (lambda a, b: (0,)), 0)
|
||||
all_active = self.cell_and_neighbours_active(1)
|
||||
self.cell.evolve_if_ready((lambda a, b: (0,)), 0)
|
||||
all_active = self.cell_and_neighbors_active(1)
|
||||
self.assertFalse(all_active)
|
||||
|
||||
|
||||
|
@ -10,33 +10,33 @@ class TestCellState(unittest.TestCase):
|
||||
self.cell_state = cs.SynchronousCellState(initial_state=(0,), draw_first_state=False)
|
||||
|
||||
def test_get_state_with_overflow(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(1,), iteration=0)
|
||||
self.assertEqual(tuple(self.cell_state.get_state_of_iteration(2)), (1,))
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(1,), evolution_step=0)
|
||||
self.assertEqual(tuple(self.cell_state.get_state_of_evolution_step(2)), (1,))
|
||||
|
||||
def test_set_state_with_overflow(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(1,), iteration=3)
|
||||
self.assertEqual(tuple(self.cell_state.get_state_of_iteration(1)), (1,))
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(1,), evolution_step=3)
|
||||
self.assertEqual(tuple(self.cell_state.get_state_of_evolution_step(1)), (1,))
|
||||
|
||||
def test_set_state_does_not_effect_all_slots(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(1,), iteration=0)
|
||||
self.assertEqual(tuple(self.cell_state.get_state_of_iteration(1)), (0,))
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(1,), evolution_step=0)
|
||||
self.assertEqual(tuple(self.cell_state.get_state_of_evolution_step(1)), (0,))
|
||||
|
||||
def test_redraw_state_on_change(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(1,), iteration=0)
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(1,), evolution_step=0)
|
||||
self.assertTrue(self.cell_state.is_set_for_redraw())
|
||||
|
||||
def test_redraw_state_on_nochange(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(0,), iteration=0)
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(0,), evolution_step=0)
|
||||
self.assertFalse(self.cell_state.is_set_for_redraw())
|
||||
|
||||
def test_active_state_after_set(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(1,), iteration=0)
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(1,), evolution_step=0)
|
||||
self.assertFalse(self.cell_state.is_active(0))
|
||||
self.assertFalse(self.cell_state.is_active(1))
|
||||
|
||||
def test_set_active_for_next_iteration(self):
|
||||
self.cell_state.set_state_of_iteration(new_state=(1,), iteration=0)
|
||||
self.cell_state.set_active_for_next_iteration(0)
|
||||
def test_set_active_for_next_evolution_step(self):
|
||||
self.cell_state.set_state_of_evolution_step(new_state=(1,), evolution_step=0)
|
||||
self.cell_state.set_active_for_next_evolution_step(0)
|
||||
self.assertFalse(self.cell_state.is_active(0))
|
||||
self.assertTrue(self.cell_state.is_active(1))
|
||||
|
||||
@ -44,7 +44,7 @@ class TestCellState(unittest.TestCase):
|
||||
self.assertRaises(IndexError, self.__set_state_with_new_length)
|
||||
|
||||
def __set_state_with_new_length(self):
|
||||
return self.cell_state.set_state_of_iteration(new_state=(1, 1), iteration=0)
|
||||
return self.cell_state.set_state_of_evolution_step(new_state=(1, 1), evolution_step=0)
|
||||
|
||||
def test_redraw_flag(self):
|
||||
self.cell_state = cs.SynchronousCellState(initial_state=(0,), draw_first_state=True)
|
||||
|
@ -12,56 +12,56 @@ class TestFac(CAFactory):
|
||||
return CAFactory._make_cells(dimension, state_class)
|
||||
|
||||
@staticmethod
|
||||
def apply_neighbourhood(cells, neighborhood, dimension):
|
||||
return CAFactory._apply_neighbourhood_to_cells(cells, neighborhood, dimension)
|
||||
def apply_neighborhood(cells, neighborhood, dimension):
|
||||
return CAFactory._apply_neighborhood_to_cells(cells, neighborhood, dimension)
|
||||
|
||||
|
||||
class TestCAFactory(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._neighborhood = MooreNeighborhood(EdgeRule.IGNORE_EDGE_CELLS)
|
||||
|
||||
def test_make_ca_calls_correct_methods(self):
|
||||
with mock.patch.object(CAFactory, '_make_cells', return_value={1: True}) as m1:
|
||||
with mock.patch.object(CAFactory, '_apply_neighbourhood_to_cells') as m2:
|
||||
CAFactory.make_cellular_automaton([10], Neighborhood, CellState)
|
||||
with mock.patch.object(CAFactory, '_apply_neighborhood_to_cells') as m2:
|
||||
CAFactory.make_cellular_automaton([10], self._neighborhood, CellState, Rule())
|
||||
m1.assert_called_once_with([10], CellState)
|
||||
m2.assert_called_once_with({1: True}, Neighborhood, [10])
|
||||
m2.assert_called_once_with({1: True}, self._neighborhood, [10])
|
||||
|
||||
def test_make_ca_returns_correct_values(self):
|
||||
with mock.patch.object(CAFactory, '_make_cells', return_value={1: True}):
|
||||
with mock.patch.object(CAFactory, '_apply_neighbourhood_to_cells'):
|
||||
cells = CAFactory.make_cellular_automaton([10], Neighborhood, CellState)
|
||||
self.assertEqual(tuple(cells.values()), (True, ))
|
||||
with mock.patch.object(CAFactory, '_apply_neighborhood_to_cells'):
|
||||
ca = CAFactory.make_cellular_automaton([10], self._neighborhood, CellState, Rule())
|
||||
self.assertIsInstance(ca, CellularAutomatonState)
|
||||
self.assertEqual(tuple(ca.cells.values()), (True, ))
|
||||
|
||||
def test_1dimension_coordinates(self):
|
||||
fac = TestFac()
|
||||
c = fac.make_cells([3], CellState)
|
||||
c = TestFac.make_cells([3], CellState)
|
||||
self.assertEqual(list(c.keys()), [(0,), (1,), (2,)])
|
||||
|
||||
def test_2dimension_coordinates(self):
|
||||
fac = TestFac()
|
||||
c = fac.make_cells([2, 2], CellState)
|
||||
c = TestFac.make_cells([2, 2], CellState)
|
||||
self.assertEqual(list(c.keys()), [(0, 0), (0, 1), (1, 0), (1, 1)])
|
||||
|
||||
def test_3dimension_coordinates(self):
|
||||
fac = TestFac()
|
||||
c = fac.make_cells([2, 2, 2], CellState)
|
||||
c = TestFac.make_cells([2, 2, 2], CellState)
|
||||
self.assertEqual(list(c.keys()), [(0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1),
|
||||
(1, 0, 0), (1, 0, 1), (1, 1, 0), (1, 1, 1)])
|
||||
|
||||
def test_apply_neighbourhood(self):
|
||||
fac = TestFac()
|
||||
cells = fac.make_cells([3, 3], CellState)
|
||||
fac.apply_neighbourhood(cells, MooreNeighborhood(EdgeRule.IGNORE_EDGE_CELLS), [3, 3])
|
||||
def test_apply_neighborhood(self):
|
||||
cells = TestFac.make_cells([3, 3], CellState)
|
||||
TestFac.apply_neighborhood(cells, self._neighborhood, [3, 3])
|
||||
|
||||
neighbours = self.__create_neighbour_list_of_cell((1, 1), cells)
|
||||
neighbors = self.__create_neighbor_list_of_cell((1, 1), cells)
|
||||
|
||||
self.assertEqual(set(neighbours), set(cells[(1, 1)].neighbours))
|
||||
self.assertEqual(set(neighbors), set(cells[(1, 1)].neighbor_states))
|
||||
|
||||
@staticmethod
|
||||
def __create_neighbour_list_of_cell(cell_id, cells):
|
||||
neighbours = []
|
||||
def __create_neighbor_list_of_cell(cell_id, cells):
|
||||
neighbors = []
|
||||
for c in cells.values():
|
||||
if c != cells[cell_id]:
|
||||
neighbours.append(c.state)
|
||||
return neighbours
|
||||
neighbors.append(c.state)
|
||||
return neighbors
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -7,11 +7,12 @@ import unittest
|
||||
|
||||
class TestNeighborhood(unittest.TestCase):
|
||||
@staticmethod
|
||||
def check_neighbors(neighborhood, neighborhood_sets):
|
||||
def check_neighbors(neighborhood, neighborhood_sets, dimension=(3, 3)):
|
||||
for neighborhood_set in neighborhood_sets:
|
||||
neighbors = neighborhood.calculate_cell_neighbor_coordinates(neighborhood_set[0], [3, 3])
|
||||
neighbors = neighborhood.calculate_cell_neighbor_coordinates(neighborhood_set[0], dimension)
|
||||
if neighborhood_set[1] != neighbors:
|
||||
print("Error neighbours do not fit (expected, real): ", (neighborhood_set[1]), neighbors)
|
||||
print("\nrel_n:", neighborhood._rel_neighbors)
|
||||
print("\nWrong neighbors (expected, real): ", (neighborhood_set[1]), neighbors)
|
||||
return False
|
||||
return True
|
||||
|
||||
@ -36,6 +37,22 @@ class TestNeighborhood(unittest.TestCase):
|
||||
n22 = [[2, 2], [[1, 1], [2, 1], [0, 1], [1, 2], [0, 2], [1, 0], [2, 0], [0, 0]]]
|
||||
self.assertTrue(self.check_neighbors(neighborhood, [n00, n11, n22]))
|
||||
|
||||
def test_von_neumann_r1(self):
|
||||
neighborhood = csn.VonNeumannNeighborhood(csn.EdgeRule.FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS)
|
||||
n1 = [[1, 1], [[1, 0], [0, 1], [2, 1], [1, 2]]]
|
||||
self.assertTrue(self.check_neighbors(neighborhood, [n1]))
|
||||
|
||||
def test_von_neumann_r2(self):
|
||||
neighborhood = csn.VonNeumannNeighborhood(csn.EdgeRule.FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS, range_=2)
|
||||
n1 = [[2, 2], [[2, 0], [1, 1], [2, 1], [3, 1], [0, 2], [1, 2], [3, 2], [4, 2], [1, 3], [2, 3], [3, 3], [2, 4]]]
|
||||
self.assertTrue(self.check_neighbors(neighborhood, [n1], dimension=[5, 5]))
|
||||
|
||||
def test_von_neumann_d3(self):
|
||||
neighborhood = csn.VonNeumannNeighborhood(csn.EdgeRule.FIRST_AND_LAST_CELL_OF_DIMENSION_ARE_NEIGHBORS,
|
||||
dimension=3)
|
||||
n1 = [[1, 1, 1], [[1, 1, 0], [1, 0, 1], [0, 1, 1], [2, 1, 1], [1, 2, 1], [1, 1, 2]]]
|
||||
self.assertTrue(self.check_neighbors(neighborhood, [n1], dimension=[3, 3, 3]))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user