"""A class that manages evolving a genome based on certain specifications."""
from multiprocessing import Pool # To distribute processes
import cv2 # opencv2 for image management
import numpy as np # for linear algebra help
from PIL import Image # for initial image palette and bgcolor generation with help of colorthief
from shapevolve.callbacks import default_callback
from shapevolve.drawers import add_circle
# noinspection PyUnresolvedReferences
from shapevolve.error_metrics import mean_squared_error
from shapevolve.gene import Gene
from shapevolve.genome import Genome, is_compatible
from shapevolve.mutations import simple_mutation, complex_mutation
# noinspection PyUnresolvedReferences
from shapevolve.preprocessors import smooth_preprocess
from shapevolve.utils import get_rescale_ratio, convert_RGB_to_BGR, adjust, ColorThiefFromImage
[docs]class Evolver:
"""A class that manages evolving a genome based on certain specifications."""
_EARLY_STOPPING_LIMIT = 25
_EARLY_STOPPING_LIMIT_EXTREME = 200
_SIMPLE_POOL_SIZE = 32
_COMPLEX_POOL_SIZE = 4
def __init__(self, base_image, saved_genome=None, num_shapes=1000, num_colors=256, target_resolution=250,
adjusters=None, preprocesses=None, draw=add_circle, calculate_error=mean_squared_error):
"""A constructor that specifies base images, a saved genome, and settings for the evolution.
:param base_image: The source PIL image object that the evolution will be performed against.
:param saved_genome: An optional saved genome. If unspecified, a genome will be randomly generated.
:param num_shapes: The number of shapes to be used in the genome construction.
:param num_colors: The number of colors that should be used by the shapes.
:param target_resolution: The resolution that images will be resized to during the evolution.
:param adjusters: A list of reversible adjusters that can be applied during the evolution. See adjusters module.
:param preprocesses: A list of preprocessors that will be applied to the image before the evolution. See preprocessors module.
:param draw: The function used to draw shapes on the image. See drawers module.
:param calculate_error: The function used to calculate errors between images. See error_metrics module.
:type base_image: Image
:type saved_genome: Genome
:type num_shapes: int
:type num_colors: int
:type target_resolution: int
:type adjusters: List[Dict[str, Callable]]
:type preprocesses: List[Callable]
:type draw: Callable
:type calculate_error: Callable
"""
if adjusters is None:
self.adjusters = []
else:
self.adjusters = adjusters
if preprocesses is None:
preprocesses = [smooth_preprocess]
if num_shapes > 10000:
raise ValueError("Too many inputted shapes: num_shapes must be in range[10, 10000].")
if num_shapes < 10:
raise ValueError("Too few inputted shapes: num_shapes must be in range[10, 10000].")
if num_colors > 256:
raise ValueError("Too many inputted colors: num_colors must be in range [8, 256].")
if num_colors < 8:
raise ValueError("Too few inputted colors: num_colors must be in range [8, 256].")
if target_resolution < 100:
raise ValueError("Resolution too small: target_resolution must be >= 100.")
self.draw = draw
self.num_shapes = num_shapes
self.num_colors = num_colors
self.calculate_error = calculate_error
loaded_image = base_image.copy()
for preprocess in preprocesses:
loaded_image = preprocess(loaded_image)
self.ratio = get_rescale_ratio(loaded_image, target_resolution)
self.width = round(loaded_image.width * self.ratio)
self.height = round(loaded_image.height * self.ratio)
loaded_image = loaded_image.resize((self.width, self.height), Image.LANCZOS)
image_array = adjust(np.asarray(loaded_image), adjusters)
loaded_image = Image.fromarray(image_array)
self.base_image = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
color_thief = ColorThiefFromImage(loaded_image)
background_color = convert_RGB_to_BGR(color_thief.get_color(quality=1)) # gets the background color
# Gets the top colours used by the image
self.palette = color_thief.get_palette(color_count=num_colors, quality=1)
for index, color in enumerate(self.palette):
self.palette[index] = convert_RGB_to_BGR(color)
resolution = (self.width, self.height)
self.min_radius = round(0.02 * min(resolution))
self.max_radius = round(0.08 * min(resolution))
self.ancestor_image = np.zeros((self.height, self.width, 3), np.uint8)
self.ancestor_image[:] = background_color
self.sequence = [Gene(self.max_radius, self.min_radius, self.height, self.width, self.num_colors)
for _ in range(self.num_shapes)] # Generate initial gene sequence.
self.genome = Genome(self.sequence, self.ratio, self.height, self.width,
background_color, self.adjusters, self.palette, self.draw) # Build a genome
if saved_genome is not None:
if is_compatible(self.genome, saved_genome):
self.genome = saved_genome
self.sequence = saved_genome.sequence
[docs] def evolve(self, num_generations=5000, callbacks=None, silent=False):
"""Evolves a genome, and returns it after evolution.
:param num_generations: The number of generations to evolve to. Note: early stoppages are possible.
:param callbacks: Callbacks that can be run when a new generation is evolved.
:param silent: Whether callbacks will be run at all.
:type num_generations: int
:type callbacks: List[Callable]
:type silent: bool
:return: Genome
"""
if num_generations < 1:
raise ValueError("Too few generations, num_generations must be in range [1, 100000].")
elif num_generations > 100000:
raise ValueError("Too many generations, num_generations must be in range [1, 100000].")
if silent:
callbacks = []
elif callbacks is None:
callbacks = [default_callback]
cached_image = None # Location to store a cached image for later use.
recall_from_cache = False # Whether to load an image from the cache or build it from scratch.
num_consecutive_failed_loops = 0 # counter for above.
num_mutation_type_switches = 0 # number of total generation changes,
# used to determine how long an algorithm should stay complex.
num_complex_mutation_successes_since_switch = 0 # counter for early stopping.
use_complex_mutation = False # default value of whether to use extreme mutations.
final_run = False
simple_pool = Pool(self._SIMPLE_POOL_SIZE)
complex_pool = Pool(self._COMPLEX_POOL_SIZE)
previous_changes = -1
loop_index = 0
offspring = 1 # Record number of generations.
changes = 0 # Record number of total changes made.
# (some generations may have failed mutations that do not affect the sequence.)
best_image = self.genome.render_raw_image()
best_error = self.calculate_error(best_image, self.base_image)
for callback in callbacks:
callback(offspring, changes, loop_index, num_mutation_type_switches, best_error,
use_complex_mutation, best_image, self.genome)
while changes < num_generations:
if use_complex_mutation:
final_run = False
offspring += self._COMPLEX_POOL_SIZE
results = complex_pool.starmap(complex_mutation, [
(self.ancestor_image, self.base_image, self.sequence, self.num_shapes, self.calculate_error,
self.palette, self.draw) for _ in range(self._COMPLEX_POOL_SIZE)])
error_list = [results[index][0] for index in range(self._COMPLEX_POOL_SIZE)]
best_index = error_list.index(min(error_list))
error = results[best_index][0]
mutated_gene = results[best_index][1]
image = results[best_index][2]
mutated_index = results[best_index][3]
is_top_mutation = results[best_index][4]
# If successful...
if error < best_error:
best_error = error
best_image = image
del self.sequence[mutated_index]
if is_top_mutation:
self.sequence.append(mutated_gene)
else:
self.sequence.insert(mutated_index, mutated_gene)
changes += 1
# Reset all values and go back to regular mutations.
num_consecutive_failed_loops = 0
num_complex_mutation_successes_since_switch += 1
if num_complex_mutation_successes_since_switch >= num_mutation_type_switches ** 2:
recall_from_cache = False
use_complex_mutation = False
else:
num_consecutive_failed_loops += 1
# If we really can't get anywhere, then quit.
if num_consecutive_failed_loops >= self._EARLY_STOPPING_LIMIT_EXTREME:
final_run = True
num_consecutive_failed_loops = 0
recall_from_cache = False
use_complex_mutation = False
else:
offspring += self._SIMPLE_POOL_SIZE
# add the rest of the circles normally
if recall_from_cache:
image = cached_image.copy() # Take built image
else:
image = self.ancestor_image.copy()
for index, gene in enumerate(self.sequence):
if index != 0:
self.draw(image, gene, self.palette)
cached_image = image.copy()
recall_from_cache = True
results = simple_pool.starmap(simple_mutation,
[(image, self.base_image, self.sequence, self.calculate_error,
self.palette, self.draw) for _ in range(self._SIMPLE_POOL_SIZE)])
error_list = [results[index][0] for index in range(self._SIMPLE_POOL_SIZE)]
best_index = error_list.index(min(error_list))
error = results[best_index][0]
mutated_gene = results[best_index][1]
image = results[best_index][2]
# if it was beneficial...
if error < best_error:
best_error = error
best_image = image
del self.sequence[0]
self.sequence.append(mutated_gene) # Place the gene on top of the sequence again.
changes += 1 # record a change
num_consecutive_failed_loops = 0
recall_from_cache = False # Build next image from scratch.
final_run = False
else:
num_consecutive_failed_loops += 1
if num_consecutive_failed_loops >= self._EARLY_STOPPING_LIMIT:
num_mutation_type_switches += 1
use_complex_mutation = True
num_consecutive_failed_loops = 0
num_complex_mutation_successes_since_switch = 0
if final_run:
break
loop_index += 1
if previous_changes != changes:
previous_changes = changes
for callback in callbacks:
callback(offspring, changes, loop_index, num_mutation_type_switches, best_error,
use_complex_mutation, best_image, self.genome)
simple_pool.close()
simple_pool.join()
complex_pool.close()
complex_pool.join()
return self.genome