Source code for lore_sa.neighgen.genetic

import numbers
import pickle
import random

from deap.algorithms import varAnd, eaSimple
from scipy.spatial.distance import cdist, hamming
from .neighborhood_generator import NeighborhoodGenerator
from deap import base, creator, tools, algorithms

import numpy as np

__all__ = ["NeighborhoodGenerator", "GeneticGenerator"]

from ..util import neuclidean, sigmoid


class LegacyGeneticGenerator(NeighborhoodGenerator):
    """
    Random Generator creates neighbor instances by generating random values starting from an input instance and
    pruning the generation around a fitness function based on proximity to the instance to explain
    """
    def __init__(self, bbox=None, dataset=None, encoder=None, ocr=0.1,
                 alpha1=0.5, alpha2=0.5, metric=neuclidean, ngen=100, mutpb=0.2, cxpb=0.5,
                 tournsize=3, halloffame_ratio=0.1, random_seed=None):
        """

        :param bbox: the Black Box model to explain
        :param dataset: the dataset with the descriptor of the original dataset
        :param encoder: an encoder to transfrom the data from/to the black box model
        :param ocr: acronym for One Class Ratio, it is the ratio of the number of instances of the minority class
        :param alpha1: the weight of the similarity of the features from the given instance. The sum of alpha1 and alpha2 must be 1
        :param alpha2: the weight of the similiarity of the target class from the given instance. The sum of alpha1 and alpha2 must be 1
        :param metric: the distance metric to use to compute the distance between instances
        :param ngen: the number of generations to run
        :param mutpb: probability of mutation of a specific feature
        :param cxpb:
        :param tournsize:
        :param halloffame_ratio:
        :param random_seed: initial seed for the random number generator
        """
        self.bbox = bbox
        self.dataset = dataset
        self.encoder = encoder
        self.ocr = ocr
        self.alpha1 = alpha1
        self.alpha2 = alpha2
        self.metric = metric
        self.ngen = ngen
        self.mutpb = mutpb
        self.cxpb = cxpb
        self.tournsize = tournsize
        self.halloffame_ratio = halloffame_ratio
        self.random_seed = random_seed
        random.seed(random_seed)

    def generate(self, z, num_instances, descriptor, encoder):
        """
        The generation is based on the strategy of generating a number of instances for the same class as the input
        instance and a number of instances for a different class.
        The generation of the instances for each subgroup is done through a genetic algorithm based on two fitness
        fuctions: one for the same class and one for the different class.
        :param z: the input instance
        :param num_instances: how many elements to generate
        :param descriptor: the descriptor of the dataset
        :return:
        """
        new_x = z.copy()

        # determine the number of instances to generate for the same class and for a different class
        num_samples_eq = int(np.round(num_instances * 0.5))
        num_samples_neq = num_instances - num_samples_eq

        # generate the instances for the same class
        toolbox_eq = self.setup_toolbox(z, self.fitness_equal, num_samples_eq)
        population_eq, halloffame_eq, logbook_eq = self.fit(toolbox_eq, num_samples_eq)
        Z_eq = self.add_halloffame(population_eq, halloffame_eq)
        # print(logbook_eq)

        # generate the instances for a different class
        toolbox_noteq = self.setup_toolbox(z, self.fitness_notequal, num_samples_neq)
        population_noteq, halloffame_noteq, logbook_noteq = self.fit(toolbox_noteq, num_samples_neq)
        Z_noteq = self.add_halloffame(population_noteq, halloffame_noteq)
        # print(logbook_noteq)

        # concatenate the two sets of instances
        Z = np.concatenate((Z_eq, Z_noteq), axis=0)

        # balance the instances according to the minority class
        Z = super(LegacyGeneticGenerator, self).balance_neigh(z, Z, num_instances)
        # the first element is the input instance

        Z[0] = new_x
        return Z

    def add_halloffame(self, population, halloffame):
        fitness_values = [p.fitness.wvalues[0] for p in population]
        fitness_values = sorted(fitness_values)
        fitness_diff = [fitness_values[i + 1] - fitness_values[i] for i in range(0, len(fitness_values) - 1)]

        sorted_array = np.argwhere(fitness_diff == np.amax(fitness_diff)).flatten().tolist()
        if len(sorted_array) == 0:
            fitness_value_thr = -np.inf
        else:
            index = np.max(sorted_array)
            fitness_value_thr = fitness_values[index]

        Z = list()
        for p in population:
            # if p.fitness.wvalues[0] > fitness_value_thr:
            Z.append(p)

        for h in halloffame:
            if h.fitness.wvalues[0] > fitness_value_thr:
                Z.append(h)

        return np.array(Z)

    def setup_toolbox(self, x, evaluate, population_size):

        creator.create("fitness", base.Fitness, weights=(1.0,))
        creator.create("individual", np.ndarray, fitness=creator.fitness)

        toolbox = base.Toolbox()
        toolbox.register("feature_values", self.record_init, x)
        toolbox.register("individual", tools.initIterate, creator.individual, toolbox.feature_values)
        toolbox.register("population", tools.initRepeat, list, toolbox.individual, n=population_size)

        toolbox.register("clone", self.clone)
        # toolbox.register("evaluate", self.constraint_decorator(evaluate, x))
        toolbox.register("evaluate", evaluate, x)
        toolbox.register("mate", self.mate)
        toolbox.register("mutate", self.mutate, toolbox)
        toolbox.register("select", tools.selTournament, tournsize=self.tournsize)

        return toolbox

    def setup_toolbox_noteq(self, x, x1, evaluate, population_size):

        creator.create("fitness_noteg", base.Fitness, weights=(1.0,))
        creator.create("individual_noteq", np.ndarray, fitness=creator.fitness_noteq)

        toolbox = base.Toolbox()
        toolbox.register("feature_values", self.record_init, x1)
        toolbox.register("individual", tools.initIterate, creator.individual_noteq, toolbox.feature_values)
        toolbox.register("population", tools.initRepeat, list, toolbox.individual_noteq, n=population_size)

        toolbox.register("clone", self.clone)
        toolbox.register("evaluate", evaluate, x)
        toolbox.register("mate", tools.cxTwoPoint)
        toolbox.register("mutate", self.mutate, toolbox)
        toolbox.register("select", tools.selTournament, tournsize=self.tournsize)

        return toolbox

    def fit(self, toolbox, population_size):

        halloffame_size = int(np.round(population_size * self.halloffame_ratio))

        population = toolbox.population(n=population_size)
        halloffame = tools.HallOfFame(halloffame_size, similar=np.array_equal)

        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean)
        stats.register("min", np.min)
        stats.register("max", np.max)

        population, logbook = eaSimple(population, toolbox, cxpb=self.cxpb, mutpb=self.mutpb,
                                                  ngen=self.ngen, stats=stats, halloffame=halloffame,
                                                  verbose=True)

        return population, halloffame, logbook

    def record_init(self, x):
        '''
        This function is used to generate a random instance to start the evolutionary algorithm.
        In this case we repeat the input instance x for all the initial population

        :return: a (not so) random instance
        '''
        return x

    def random_init(self):
        z = self.generate_synthetic_instance()

        return z

    def clone(self, x):
        return pickle.loads(pickle.dumps(x))

    def mutate(self, toolbox, x):
        z = toolbox.clone(x)
        z = self.generate_synthetic_instance(from_z=z, mutpb=self.mutpb)

        return z,

    def mate(self, ind1, ind2):
        """Executes a two-point crossover on the input :term:`sequence`
        individuals. The two individuals are modified in place and both keep
        their original length.
        This implementation uses the original implementation of the DEAP library. It adds a special case for the
        one-hot encoding, where the crossover is done taking into account the intervals of values imposed by
        the one-hot encoding.

        :param ind1: The first individual participating in the crossover.
        :param ind2: The second individual participating in the crossover.
        :returns: A tuple of two individuals.

        This function uses the :func:`~random.randint` function from the Python
        base :mod:`random` module.
        """
        if self.encoder.type == 'one-hot':
            intervals = self.encoder.get_encoded_intervals()
            cxInterval1 = random.randint(0, len(intervals) - 1)
            cxInterval2 = random.randint(0, len(intervals) - 1)
            if cxInterval1 > cxInterval2:
                # Swap the two cx intervals
                cxInterval1, cxInterval2 = cxInterval2, cxInterval1

            cxpoint1 = intervals[cxInterval1][0]
            cxpoint2 = intervals[cxInterval2][1]
            ind1[cxpoint1:cxpoint2], ind2[cxpoint1:cxpoint2] \
                = ind2[cxpoint1:cxpoint2], ind1[cxpoint1:cxpoint2]
        else:
            size = min(len(ind1), len(ind2))
            cxpoint1 = random.randint(1, size)
            cxpoint2 = random.randint(1, size - 1)
            if cxpoint2 >= cxpoint1:
                cxpoint2 += 1
            else:  # Swap the two cx points
                cxpoint1, cxpoint2 = cxpoint2, cxpoint1

            ind1[cxpoint1:cxpoint2], ind2[cxpoint1:cxpoint2] \
                = ind2[cxpoint1:cxpoint2], ind1[cxpoint1:cxpoint2]

        return ind1, ind2



    def fitness_equal(self, z, z1):
        if isinstance(self.metric, numbers.Number):
            self.metric = neuclidean
        feature_similarity_score = 1.0 - cdist(z.reshape(1, -1), z1.reshape(1, -1), metric=self.metric).ravel()[0]
        # feature_similarity = feature_similarity_score if feature_similarity_score >= self.eta1 else 0.0
        feature_similarity = sigmoid(feature_similarity_score) if feature_similarity_score < 1.0 else 0.0
        # feature_similarity = sigmoid(feature_similarity_score)

        # y = self.bb_predict(x.reshape(1, -1))[0]
        # y1 = self.bb_predict(x1.reshape(1, -1))[0]
        x = self.encoder.decode(z.reshape(1, -1))
        y = self.bbox.predict(x)

        x1 = self.encoder.decode(z1.reshape(1, -1))
        # if None in x1[0]:
        #     x1 = x
        y1 = self.bbox.predict(x1)

        target_similarity_score = 1.0 - hamming(y, y1)
        # target_similarity = target_similarity_score if target_similarity_score >= self.eta2 else 0.0
        target_similarity = sigmoid(target_similarity_score)

        evaluation = self.alpha1 * feature_similarity + self.alpha2 * target_similarity
        return evaluation,


    def fitness_notequal(self, z, z1):
        feature_similarity_score = 1.0 - cdist(z.reshape(1, -1), z1.reshape(1, -1), metric=self.metric).ravel()[0]
        # feature_similarity = feature_similarity_score if feature_similarity_score >= self.eta1 else 0.0
        feature_similarity = sigmoid(feature_similarity_score)

        # y = self.bb_predict(x.reshape(1, -1))[0]
        # y1 = self.bb_predict(x1.reshape(1, -1))[0]
        x = self.encoder.decode(z.reshape(1, -1))
        y = self.bbox.predict(x)

        x1 = self.encoder.decode(z1.reshape(1, -1))
        # if None in x1[0]:
        #     x1 = x
        y1 = self.bbox.predict(x1)

        target_similarity_score = 1.0 - hamming(y, y1)
        # target_similarity = target_similarity_score if target_similarity_score < self.eta2 else 0.0
        target_similarity = 1.0 - sigmoid(target_similarity_score)

        evaluation = self.alpha1 * feature_similarity + self.alpha2 * target_similarity
        return evaluation,



[docs]class GeneticGenerator(LegacyGeneticGenerator): """ Random Generator creates neighbor instances by generating random values starting from an input instance and pruning the generation around a fitness function based on proximity to the instance to explain """
[docs] def __init__(self, bbox=None, dataset=None, encoder=None, ocr=0.1, alpha1=0.5, alpha2=0.5, metric=neuclidean, ngen=30, mutpb=0.2, cxpb=0.5, tournsize=3, halloffame_ratio=0.1, random_seed=None): """ :param bbox: the Black Box model to explain :param dataset: the dataset with the descriptor of the original dataset :param encoder: an encoder to transfrom the data from/to the black box model :param ocr: acronym for One Class Ratio, it is the ratio of the number of instances of the minority class :param alpha1: the weight of the similarity of the features from the given instance. The sum of alpha1 and alpha2 must be 1 :param alpha2: the weight of the similiarity of the target class from the given instance. The sum of alpha1 and alpha2 must be 1 :param metric: the distance metric to use to compute the distance between instances :param ngen: the number of generations to run :param mutpb: probability of mutation of a specific feature :param cxpb: :param tournsize: :param halloffame_ratio: :param random_seed: initial seed for the random number generator """ self.bbox = bbox self.dataset = dataset self.encoder = encoder self.ocr = ocr self.alpha1 = alpha1 self.alpha2 = alpha2 self.metric = metric self.ngen = ngen self.mutpb = mutpb self.cxpb = cxpb self.tournsize = tournsize self.halloffame_ratio = halloffame_ratio self.random_seed = random_seed random.seed(random_seed)
[docs] def generate(self, z, num_instances, descriptor, encoder): """ The generation is based on the strategy of generating a number of instances for the same class as the input instance and a number of instances for a different class. The generation of the instances for each subgroup is done through a genetic algorithm based on two fitness fuctions: one for the same class and one for the different class. :param z: the input instance, from which the generation starts :param num_instances: how many elements to generate :param descriptor: the descriptor of the dataset. This provides the metadata of each feature to guide the generation :param encoder: the encoder to transform the data from/to the black box model :return: a new set of instances generated from the input instance. The first element is the input instance """ new_x = z.copy() # determine the number of instances to generate for the same class and for a different class num_samples_eq = int(np.round(num_instances * 0.5)) num_samples_neq = num_instances - num_samples_eq # generate the instances for the same class toolbox_eq = self.setup_toolbox(z, self.population_fitness_equal(z), num_samples_eq) population_eq, halloffame_eq, logbook_eq = self.fit(toolbox_eq, num_samples_eq) Z_eq = self.add_halloffame(population_eq, halloffame_eq) # print(logbook_eq) # generate the instances for a different class toolbox_noteq = self.setup_toolbox(z, self.population_fitness_notequal(z), num_samples_neq) population_noteq, halloffame_noteq, logbook_noteq = self.fit(toolbox_noteq, num_samples_neq) Z_noteq = self.add_halloffame(population_noteq, halloffame_noteq) # print(logbook_noteq) # concatenate the two sets of instances Z = np.concatenate((Z_eq, Z_noteq), axis=0) # balance the instances according to the minority class Z = super(GeneticGenerator, self).balance_neigh(z, Z, num_instances) # the first element is the input instance Z[0] = new_x return Z
# def add_halloffame(self, population, halloffame): # fitness_values = [p.fitness.wvalues[0] for p in population] # fitness_values = sorted(fitness_values) # fitness_diff = [fitness_values[i + 1] - fitness_values[i] for i in range(0, len(fitness_values) - 1)] # # sorted_array = np.argwhere(fitness_diff == np.amax(fitness_diff)).flatten().tolist() # if len(sorted_array) == 0: # fitness_value_thr = -np.inf # else: # index = np.max(sorted_array) # fitness_value_thr = fitness_values[index] # # Z = list() # for p in population: # # if p.fitness.wvalues[0] > fitness_value_thr: # Z.append(p) # # for h in halloffame: # if h.fitness.wvalues[0] > fitness_value_thr: # Z.append(h) # # return np.array(Z) def setup_toolbox(self, x, evaluate, population_size): creator.create("fitness", base.Fitness, weights=(1.0,)) creator.create("individual", np.ndarray, fitness=creator.fitness) toolbox = base.Toolbox() toolbox.register("feature_values", self.record_init, x) toolbox.register("individual", tools.initIterate, creator.individual, toolbox.feature_values) toolbox.register("population", tools.initRepeat, list, toolbox.individual, n=population_size) toolbox.register("clone", self.clone) toolbox.register("evaluate", evaluate) toolbox.register("mate", self.mate) toolbox.register("mutate", self.mutate, toolbox) toolbox.register("select", tools.selTournament, tournsize=self.tournsize) return toolbox def fit(self, toolbox, population_size): halloffame_size = int(np.round(population_size * self.halloffame_ratio)) population = toolbox.population(n=population_size) halloffame = tools.HallOfFame(halloffame_size, similar=np.array_equal) stats = tools.Statistics(lambda ind: ind.fitness.values) stats.register("avg", np.mean) stats.register("min", np.min) stats.register("max", np.max) stats.register("std", np.std) population, logbook = GeneticGenerator.eaSimple(population, toolbox, cxpb=self.cxpb, mutpb=self.mutpb, ngen=self.ngen, stats=stats, halloffame=halloffame, verbose=False) return population, halloffame, logbook
[docs] def eaSimple(population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=__debug__): """This algorithm reproduce the simplest evolutionary algorithm as presented in chapter 7 of [Back2000]_. :param population: A list of individuals. :param toolbox: A :class:`~deap.base.Toolbox` that contains the evolution operators. :param cxpb: The probability of mating two individuals. :param mutpb: The probability of mutating an individual. :param ngen: The number of generation. :param stats: A :class:`~deap.tools.Statistics` object that is updated inplace, optional. :param halloffame: A :class:`~deap.tools.HallOfFame` object that will contain the best individuals, optional. :param verbose: Whether or not to log the statistics. :returns: The final population :returns: A class:`~deap.tools.Logbook` with the statistics of the evolution This implementation is an adaptation of the original algorithm implemented in the DEAP library. .. [Back2000] Back, Fogel and Michalewicz, "Evolutionary Computation 1 : Basic Algorithms and Operators", 2000. """ logbook = tools.Logbook() logbook.header = ['gen', 'nevals'] + (stats.fields if stats else []) # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in population if not ind.fitness.valid] fitnesses = toolbox.evaluate(invalid_ind) for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = (fit,) if halloffame is not None: halloffame.update(population) record = stats.compile(population) if stats else {} logbook.record(gen=0, nevals=len(invalid_ind), **record) if verbose: print(logbook.stream) # Begin the generational process for gen in range(1, ngen + 1): # Select the next generation individuals offspring = toolbox.select(population, len(population)) # Vary the pool of individuals offspring = varAnd(offspring, toolbox, cxpb, mutpb) # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in offspring if not ind.fitness.valid] fitnesses = toolbox.evaluate(invalid_ind) for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = (fit, ) # Update the hall of fame with the generated individuals if halloffame is not None: halloffame.update(offspring) # Replace the current population by the offspring population[:] = offspring # Append the current generation statistics to the logbook record = stats.compile(population) if stats else {} logbook.record(gen=gen, nevals=len(invalid_ind), **record) if verbose: print(logbook.stream) return population, logbook
[docs] def population_fitness_equal(self, z): """ This fitness function evaluate the feature_similarity and the target_similarity of a population against a given instance z. The two similarities are computed using optimezed functions of `numpy` and `scipy` libraries. This improves the performance of the algorithm. """ def wrapper(population): if isinstance(self.metric, numbers.Number): self.metric = neuclidean feature_similarity_score = 1.0 - cdist(z.reshape(1, -1), population, metric=self.metric).ravel() feature_similarity = sigmoid(feature_similarity_score) x = self.encoder.decode(z.reshape(1, -1)) pop = self.encoder.decode(np.array(population)) pop_y = self.bbox.predict(pop) y = self.bbox.predict(x) target_similarity = np.array([sigmoid(1.0 - hamming(y, [y1])) for y1 in pop_y]) evaluation = self.alpha1 * feature_similarity + self.alpha2 * target_similarity return evaluation return wrapper
def population_fitness_notequal(self, z): def wrapper(population): if isinstance(self.metric, numbers.Number): self.metric = neuclidean feature_similarity_score = 1.0 - cdist(z.reshape(1, -1), population, metric=self.metric).ravel() feature_similarity = sigmoid(feature_similarity_score) x = self.encoder.decode(z.reshape(1, -1)) pop = self.encoder.decode(np.array(population)) pop_y = self.bbox.predict(pop) y = self.bbox.predict(x) target_similarity = np.array([sigmoid(hamming(y, [y1])) for y1 in pop_y]) evaluation = self.alpha1 * feature_similarity + self.alpha2 * target_similarity return evaluation return wrapper