Files

283 lines
9.3 KiB
Python
Raw Permalink Normal View History

2021-01-30 21:49:15 +01:00
"""
Author: Philip Andreadis
e-mail: philip_andreadis@hotmail.com
Implementation of Decision Tree model from scratch.
Metric used to apply the split on the data is the Gini index which is calculated for each feature's single value
in order to find the best split on each step. This means there is room for improvement performance wise as this
process is O(n^2) and can be reduced to linear complexity.
Parameters of the model:
max_depth (int): Maximum depth of the decision tree
min_node_size (int): Minimum number of instances a node can have. If this threshold is exceeded the node is terminated
Both are up to the user to set.
Input dataset to train() function must be a numpy array containing both feature and label values.
"""
from collections import Counter
import numpy as np
class DecisionTree:
def __init__(self, max_depth, min_node_size):
self.max_depth = max_depth
self.min_node_size = min_node_size
self.final_tree = {}
"""
This function calculates the gini index of a split in the dataset
Firstly the gini score is calculated for each child note and the resulting Gini is the weighted sum of gini_left and gini_right
Parameters:
child_nodes (list of np arrays): The two groups of instances resulting from the split
Returns:
float:Gini index of the split
"""
def calculate_gini(self, child_nodes):
n = 0
# Calculate number of all instances of the parent node
for node in child_nodes:
n = n + len(node)
gini = 0
# Calculate gini index for each child node
for node in child_nodes:
m = len(node)
# Avoid division by zero if a child node is empty
if m == 0:
continue
# Create a list with each instance's class value
y = []
for row in node:
y.append(row[-1])
# Count the frequency for each class value
freq = Counter(y).values()
node_gini = 1
for i in freq:
node_gini = node_gini - (i / m) ** 2
gini = gini + (m / n) * node_gini
return gini
"""
This function splits the dataset on certain value of a feature
Parameters:
feature_index (int): Index of selected feature
threshold : Value of the feature split point
Returns:
np.array: Two new groups of split instances
"""
def apply_split(self, feature_index, threshold, data):
instances = data.tolist()
left_child = []
right_child = []
for row in instances:
if row[feature_index] < threshold:
left_child.append(row)
else:
right_child.append(row)
left_child = np.array(left_child)
right_child = np.array(right_child)
return left_child, right_child
"""
This function finds the best split on the dataset on each iteration of the algorithm by evaluating
all possible splits and applying the one with the minimum Gini index.
Parameters:
data: Dataset
Returns node (dict): Dictionary with the index of the splitting feature and its value and the two child nodes
"""
def find_best_split(self, data):
num_of_features = len(data[0]) - 1
gini_score = 1000
f_index = 0
f_value = 0
# Iterate through each feature and find minimum gini score
for column in range(num_of_features):
for row in data:
value = row[column]
l, r = self.apply_split(column, value, data)
children = [l, r]
score = self.calculate_gini(children)
# print("Candidate split feature X{} < {} with Gini score {}".format(column,value,score))
if score < gini_score:
gini_score = score
f_index = column
f_value = value
child_nodes = children
# print("Chosen feature is {} and its value is {} with gini index {}".format(f_index,f_value,gini_score))
node = {"feature": f_index, "value": f_value, "children": child_nodes}
return node
"""
This function calculates the most frequent class value in a group of instances
Parameters:
node: Group of instances
Returns : Most common class value
"""
def calc_class(self, node):
# Create a list with each instance's class value
y = []
for row in node:
y.append(row[-1])
# Find most common class value
occurence_count = Counter(y)
return occurence_count.most_common(1)[0][0]
"""
Recursive function that builds the decision tree by applying split on every child node until they become terminal.
Cases to terminate a node is: i.max depth of tree is reached ii.minimum size of node is not met iii.child node is empty
Parameters:
node: Group of instances
depth (int): Current depth of the tree
"""
def recursive_split(self, node, depth):
l, r = node["children"]
del node["children"]
if l.size == 0:
c_value = self.calc_class(r)
node["left"] = node["right"] = {"class_value": c_value, "depth": depth}
return
elif r.size == 0:
c_value = self.calc_class(l)
node["left"] = node["right"] = {"class_value": c_value, "depth": depth}
return
# Check if tree has reached max depth
if depth >= self.max_depth:
# Terminate left child node
c_value = self.calc_class(l)
node["left"] = {"class_value": c_value, "depth": depth}
# Terminate right child node
c_value = self.calc_class(r)
node["right"] = {"class_value": c_value, "depth": depth}
return
# process left child
if len(l) <= self.min_node_size:
c_value = self.calc_class(l)
node["left"] = {"class_value": c_value, "depth": depth}
else:
node["left"] = self.find_best_split(l)
self.recursive_split(node["left"], depth + 1)
# process right child
if len(r) <= self.min_node_size:
c_value = self.calc_class(r)
node["right"] = {"class_value": c_value, "depth": depth}
else:
node["right"] = self.find_best_split(r)
self.recursive_split(node["right"], depth + 1)
"""
Apply the recursive split algorithm on the data in order to build the decision tree
Parameters:
X (np.array): Training data
Returns tree (dict): The decision tree in the form of a dictionary.
"""
def train(self, X):
# Create initial node
tree = self.find_best_split(X)
# Generate the rest of the tree via recursion
self.recursive_split(tree, 1)
self.final_tree = tree
return tree
"""
Prints out the decision tree.
Parameters:
tree (dict): Decision tree
"""
def print_dt(self, tree, depth=0):
if "feature" in tree:
print(
"\nSPLIT NODE: feature #{} < {} depth:{}\n".format(
tree["feature"], tree["value"], depth
)
)
self.print_dt(tree["left"], depth + 1)
self.print_dt(tree["right"], depth + 1)
else:
print(
"TERMINAL NODE: class value:{} depth:{}".format(
tree["class_value"], tree["depth"]
)
)
"""
This function outputs the class value of the instance given based on the decision tree created previously.
Parameters:
tree (dict): Decision tree
instance(id np.array): Single instance of data
Returns (float): predicted class value of the given instance
"""
def predict_single(self, tree, instance):
if not tree:
print("ERROR: Please train the decision tree first")
return -1
if "feature" in tree:
if instance[tree["feature"]] < tree["value"]:
return self.predict_single(tree["left"], instance)
else:
return self.predict_single(tree["right"], instance)
else:
return tree["class_value"]
"""
This function outputs the class value for each instance of the given dataset.
Parameters:
X (np.array): Dataset with labels
Returns y (np.array): array with the predicted class values of the dataset
"""
def predict(self, X):
y_predict = []
for row in X:
y_predict.append(self.predict_single(self.final_tree, row))
return np.array(y_predict)
if __name__ == "__main__":
# # test dataset
# X = np.array([[1, 1,0], [3, 1, 0], [1, 4, 0], [2, 4, 1], [3, 3, 1], [5, 1, 1]])
# y = np.array([0, 0, 0, 1, 1, 1])
train_data = np.loadtxt("example_data/data.txt", delimiter=",")
train_y = np.loadtxt("example_data/targets.txt")
# Build tree
dt = DecisionTree(5, 1)
tree = dt.train(train_data)
y_pred = dt.predict(train_data)
print(f"Accuracy: {sum(y_pred == train_y) / train_y.shape[0]}")
# Print out the decision tree
# dt.print_dt(tree)