mirror of
https://github.com/aladdinpersson/Machine-Learning-Collection.git
synced 2026-02-21 19:27:58 +00:00
283 lines
9.3 KiB
Python
283 lines
9.3 KiB
Python
|
|
"""
|
||
|
|
Author: Philip Andreadis
|
||
|
|
e-mail: philip_andreadis@hotmail.com
|
||
|
|
|
||
|
|
|
||
|
|
Implementation of Decision Tree model from scratch.
|
||
|
|
Metric used to apply the split on the data is the Gini index which is calculated for each feature's single value
|
||
|
|
in order to find the best split on each step. This means there is room for improvement performance wise as this
|
||
|
|
process is O(n^2) and can be reduced to linear complexity.
|
||
|
|
|
||
|
|
Parameters of the model:
|
||
|
|
max_depth (int): Maximum depth of the decision tree
|
||
|
|
min_node_size (int): Minimum number of instances a node can have. If this threshold is exceeded the node is terminated
|
||
|
|
|
||
|
|
Both are up to the user to set.
|
||
|
|
|
||
|
|
Input dataset to train() function must be a numpy array containing both feature and label values.
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
|
||
|
|
from collections import Counter
|
||
|
|
import numpy as np
|
||
|
|
|
||
|
|
|
||
|
|
class DecisionTree:
|
||
|
|
def __init__(self, max_depth, min_node_size):
|
||
|
|
self.max_depth = max_depth
|
||
|
|
self.min_node_size = min_node_size
|
||
|
|
self.final_tree = {}
|
||
|
|
|
||
|
|
"""
|
||
|
|
This function calculates the gini index of a split in the dataset
|
||
|
|
Firstly the gini score is calculated for each child note and the resulting Gini is the weighted sum of gini_left and gini_right
|
||
|
|
|
||
|
|
Parameters:
|
||
|
|
child_nodes (list of np arrays): The two groups of instances resulting from the split
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
float:Gini index of the split
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
def calculate_gini(self, child_nodes):
|
||
|
|
n = 0
|
||
|
|
# Calculate number of all instances of the parent node
|
||
|
|
for node in child_nodes:
|
||
|
|
n = n + len(node)
|
||
|
|
gini = 0
|
||
|
|
# Calculate gini index for each child node
|
||
|
|
for node in child_nodes:
|
||
|
|
m = len(node)
|
||
|
|
|
||
|
|
# Avoid division by zero if a child node is empty
|
||
|
|
if m == 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Create a list with each instance's class value
|
||
|
|
y = []
|
||
|
|
for row in node:
|
||
|
|
y.append(row[-1])
|
||
|
|
|
||
|
|
# Count the frequency for each class value
|
||
|
|
freq = Counter(y).values()
|
||
|
|
node_gini = 1
|
||
|
|
for i in freq:
|
||
|
|
node_gini = node_gini - (i / m) ** 2
|
||
|
|
gini = gini + (m / n) * node_gini
|
||
|
|
return gini
|
||
|
|
|
||
|
|
"""
|
||
|
|
This function splits the dataset on certain value of a feature
|
||
|
|
Parameters:
|
||
|
|
feature_index (int): Index of selected feature
|
||
|
|
|
||
|
|
threshold : Value of the feature split point
|
||
|
|
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
np.array: Two new groups of split instances
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
def apply_split(self, feature_index, threshold, data):
|
||
|
|
instances = data.tolist()
|
||
|
|
left_child = []
|
||
|
|
right_child = []
|
||
|
|
for row in instances:
|
||
|
|
if row[feature_index] < threshold:
|
||
|
|
left_child.append(row)
|
||
|
|
else:
|
||
|
|
right_child.append(row)
|
||
|
|
left_child = np.array(left_child)
|
||
|
|
right_child = np.array(right_child)
|
||
|
|
return left_child, right_child
|
||
|
|
|
||
|
|
"""
|
||
|
|
This function finds the best split on the dataset on each iteration of the algorithm by evaluating
|
||
|
|
all possible splits and applying the one with the minimum Gini index.
|
||
|
|
Parameters:
|
||
|
|
data: Dataset
|
||
|
|
|
||
|
|
Returns node (dict): Dictionary with the index of the splitting feature and its value and the two child nodes
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
def find_best_split(self, data):
|
||
|
|
num_of_features = len(data[0]) - 1
|
||
|
|
gini_score = 1000
|
||
|
|
f_index = 0
|
||
|
|
f_value = 0
|
||
|
|
# Iterate through each feature and find minimum gini score
|
||
|
|
for column in range(num_of_features):
|
||
|
|
for row in data:
|
||
|
|
value = row[column]
|
||
|
|
l, r = self.apply_split(column, value, data)
|
||
|
|
children = [l, r]
|
||
|
|
score = self.calculate_gini(children)
|
||
|
|
# print("Candidate split feature X{} < {} with Gini score {}".format(column,value,score))
|
||
|
|
if score < gini_score:
|
||
|
|
gini_score = score
|
||
|
|
f_index = column
|
||
|
|
f_value = value
|
||
|
|
child_nodes = children
|
||
|
|
# print("Chosen feature is {} and its value is {} with gini index {}".format(f_index,f_value,gini_score))
|
||
|
|
node = {"feature": f_index, "value": f_value, "children": child_nodes}
|
||
|
|
return node
|
||
|
|
|
||
|
|
"""
|
||
|
|
This function calculates the most frequent class value in a group of instances
|
||
|
|
Parameters:
|
||
|
|
node: Group of instances
|
||
|
|
|
||
|
|
Returns : Most common class value
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
def calc_class(self, node):
|
||
|
|
# Create a list with each instance's class value
|
||
|
|
y = []
|
||
|
|
for row in node:
|
||
|
|
y.append(row[-1])
|
||
|
|
# Find most common class value
|
||
|
|
occurence_count = Counter(y)
|
||
|
|
return occurence_count.most_common(1)[0][0]
|
||
|
|
|
||
|
|
"""
|
||
|
|
Recursive function that builds the decision tree by applying split on every child node until they become terminal.
|
||
|
|
Cases to terminate a node is: i.max depth of tree is reached ii.minimum size of node is not met iii.child node is empty
|
||
|
|
Parameters:
|
||
|
|
node: Group of instances
|
||
|
|
depth (int): Current depth of the tree
|
||
|
|
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
def recursive_split(self, node, depth):
|
||
|
|
l, r = node["children"]
|
||
|
|
del node["children"]
|
||
|
|
if l.size == 0:
|
||
|
|
c_value = self.calc_class(r)
|
||
|
|
node["left"] = node["right"] = {"class_value": c_value, "depth": depth}
|
||
|
|
return
|
||
|
|
elif r.size == 0:
|
||
|
|
c_value = self.calc_class(l)
|
||
|
|
node["left"] = node["right"] = {"class_value": c_value, "depth": depth}
|
||
|
|
return
|
||
|
|
# Check if tree has reached max depth
|
||
|
|
if depth >= self.max_depth:
|
||
|
|
# Terminate left child node
|
||
|
|
c_value = self.calc_class(l)
|
||
|
|
node["left"] = {"class_value": c_value, "depth": depth}
|
||
|
|
# Terminate right child node
|
||
|
|
c_value = self.calc_class(r)
|
||
|
|
node["right"] = {"class_value": c_value, "depth": depth}
|
||
|
|
return
|
||
|
|
# process left child
|
||
|
|
if len(l) <= self.min_node_size:
|
||
|
|
c_value = self.calc_class(l)
|
||
|
|
node["left"] = {"class_value": c_value, "depth": depth}
|
||
|
|
else:
|
||
|
|
node["left"] = self.find_best_split(l)
|
||
|
|
self.recursive_split(node["left"], depth + 1)
|
||
|
|
# process right child
|
||
|
|
if len(r) <= self.min_node_size:
|
||
|
|
c_value = self.calc_class(r)
|
||
|
|
node["right"] = {"class_value": c_value, "depth": depth}
|
||
|
|
else:
|
||
|
|
node["right"] = self.find_best_split(r)
|
||
|
|
self.recursive_split(node["right"], depth + 1)
|
||
|
|
|
||
|
|
"""
|
||
|
|
Apply the recursive split algorithm on the data in order to build the decision tree
|
||
|
|
Parameters:
|
||
|
|
X (np.array): Training data
|
||
|
|
|
||
|
|
Returns tree (dict): The decision tree in the form of a dictionary.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def train(self, X):
|
||
|
|
# Create initial node
|
||
|
|
tree = self.find_best_split(X)
|
||
|
|
# Generate the rest of the tree via recursion
|
||
|
|
self.recursive_split(tree, 1)
|
||
|
|
self.final_tree = tree
|
||
|
|
return tree
|
||
|
|
|
||
|
|
"""
|
||
|
|
Prints out the decision tree.
|
||
|
|
Parameters:
|
||
|
|
tree (dict): Decision tree
|
||
|
|
|
||
|
|
"""
|
||
|
|
|
||
|
|
def print_dt(self, tree, depth=0):
|
||
|
|
if "feature" in tree:
|
||
|
|
print(
|
||
|
|
"\nSPLIT NODE: feature #{} < {} depth:{}\n".format(
|
||
|
|
tree["feature"], tree["value"], depth
|
||
|
|
)
|
||
|
|
)
|
||
|
|
self.print_dt(tree["left"], depth + 1)
|
||
|
|
self.print_dt(tree["right"], depth + 1)
|
||
|
|
else:
|
||
|
|
print(
|
||
|
|
"TERMINAL NODE: class value:{} depth:{}".format(
|
||
|
|
tree["class_value"], tree["depth"]
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
"""
|
||
|
|
This function outputs the class value of the instance given based on the decision tree created previously.
|
||
|
|
Parameters:
|
||
|
|
tree (dict): Decision tree
|
||
|
|
instance(id np.array): Single instance of data
|
||
|
|
|
||
|
|
Returns (float): predicted class value of the given instance
|
||
|
|
"""
|
||
|
|
|
||
|
|
def predict_single(self, tree, instance):
|
||
|
|
if not tree:
|
||
|
|
print("ERROR: Please train the decision tree first")
|
||
|
|
return -1
|
||
|
|
if "feature" in tree:
|
||
|
|
if instance[tree["feature"]] < tree["value"]:
|
||
|
|
return self.predict_single(tree["left"], instance)
|
||
|
|
else:
|
||
|
|
return self.predict_single(tree["right"], instance)
|
||
|
|
else:
|
||
|
|
return tree["class_value"]
|
||
|
|
|
||
|
|
"""
|
||
|
|
This function outputs the class value for each instance of the given dataset.
|
||
|
|
Parameters:
|
||
|
|
X (np.array): Dataset with labels
|
||
|
|
|
||
|
|
Returns y (np.array): array with the predicted class values of the dataset
|
||
|
|
"""
|
||
|
|
|
||
|
|
def predict(self, X):
|
||
|
|
y_predict = []
|
||
|
|
for row in X:
|
||
|
|
y_predict.append(self.predict_single(self.final_tree, row))
|
||
|
|
return np.array(y_predict)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
|
||
|
|
# # test dataset
|
||
|
|
# X = np.array([[1, 1,0], [3, 1, 0], [1, 4, 0], [2, 4, 1], [3, 3, 1], [5, 1, 1]])
|
||
|
|
# y = np.array([0, 0, 0, 1, 1, 1])
|
||
|
|
|
||
|
|
train_data = np.loadtxt("example_data/data.txt", delimiter=",")
|
||
|
|
train_y = np.loadtxt("example_data/targets.txt")
|
||
|
|
|
||
|
|
# Build tree
|
||
|
|
dt = DecisionTree(5, 1)
|
||
|
|
tree = dt.train(train_data)
|
||
|
|
y_pred = dt.predict(train_data)
|
||
|
|
print(f"Accuracy: {sum(y_pred == train_y) / train_y.shape[0]}")
|
||
|
|
# Print out the decision tree
|
||
|
|
# dt.print_dt(tree)
|