Camera traps are a widely used tool in wildlife conservation. They are used to monitor animal
populations and gather data on animal behaviour. However, the large amount of data generated by
camera traps makes it difficult to process and analyse the data.
In this second part, we will develop a machine learning model that can automatically classify animal
species captured by camera traps in the Dedan Kimathi University Wildlife Conservancy.
By achieving this, we hope to gain valuable insights into animal behaviour and contribute to
wildlife conservation efforts.
# The Modules required for this tutorial
from modules_used import *
# Extract the dataset to the current working directory
with zipfile.ZipFile('./dataset.zip', 'r') as zip:
zip.extractall(f'{os.getcwd()}')
print('Dataset extracted successfully!')
# load the train and test metadata
data_dir = os.path.join(os.getcwd(), 'dataset')
train_metadata_path = os.path.join(data_dir, 'train.csv')
test_metadata_path = os.path.join(data_dir, 'test.csv')
# read the data
train_df = pd.read_csv(train_metadata_path)
test_df = pd.read_csv(test_metadata_path)
# classes to use for training
classes = ['IMPALA', 'WARTHOG', 'ZEBRA']
# filter the train and test sets to only include the desired classes
train_df = train_df[train_df['Species'].isin(classes)]
test_df = test_df[test_df['Species'].isin(classes)]
# split the train set to train and validation sets
train_df, val_df = train_test_split(train_df, test_size=0.12, random_state=42)
# the length of all the sets
sets_len = len(train_df) + len(val_df) + len(test_df)
# print the length of each set
print(f'Train set length: {len(train_df)}, {len(train_df)/sets_len*100:.2f}%')
print(f'Validation set length: {len(val_df)}, {len(val_df)/sets_len*100:.2f}%')
print(f'Test set length: {len(test_df)}, {len(test_df)/sets_len*100:.2f}%')
# build the image datasets
train_dir = os.path.join(data_dir, 'train') # location of the training data
val_dir = os.path.join(data_dir, 'train') # location of the validation data
test_dir = os.path.join(data_dir, 'test') # location of the test data
# create the training, validation and test directories
helper.split_data(train_dir, train_df, 'train')
helper.split_data(val_dir, val_df, 'val')
helper.split_data(test_dir, test_df, 'test')
Train set length: 1368, 79.08% Validation set length: 187, 10.81% Test set length: 175, 10.12%
train_images_path = 'train'
val_images_path = 'val'
test_images_path = 'test'
# Visualize the data distribution
helper.data_distribution(train_images_path, val_images_path, test_images_path, plot=True)
Deep Learning is based on artificial neural networks which have been around in some form since the late 1950s. The networks are built from individual parts approximating neurons, typically called units or simply "neurons." Each unit has some number of weighted inputs. These weighted inputs are summed together (a linear combination) then passed through an activation function to get the unit's output.
Mathematically this looks like:
$$ \begin{align} y &= f(w_1 x_1 + w_2 x_2 + b) \\ y &= f\left(\sum_i w_i x_i +b \right) \end{align} $$With vectors this is the dot/inner product of two vectors:
$$ h = \begin{bmatrix} x_1 \, x_2 \cdots x_n \end{bmatrix} \cdot \begin{bmatrix} w_1 \\ w_2 \\ \vdots \\ w_n \end{bmatrix} $$BATCH_SIZE = 3
IMG_SIZE = (96, 96)
# Load the images to tensors
train_tensors = helper.load_images_from_dir(train_images_path, BATCH_SIZE, IMG_SIZE, shuffle=True)
val_tensors = helper.load_images_from_dir(val_images_path, BATCH_SIZE, IMG_SIZE, shuffle=False)
test_tensors = helper.load_images_from_dir(test_images_path, BATCH_SIZE, IMG_SIZE, shuffle=False)
# extract the images and labels
train_images, train_labels = helper.extract_images_and_labels(train_tensors, normalize=True, categorical=True)
val_images, val_labels = helper.extract_images_and_labels(val_tensors, normalize=True, categorical=True)
test_images, test_labels = helper.extract_images_and_labels(test_tensors, normalize=True, categorical=True)
# Define the labels dictionary
labels = {
0: 'IMPALA',
1: 'WARTHOG',
2: 'ZEBRA'
}
# Visualize the images
helper.visualize_images(train_tensors, BATCH_SIZE, class_names=list(labels.values()))
Found 1368 files belonging to 3 classes. Found 187 files belonging to 3 classes. Found 175 files belonging to 3 classes.
data_augmentation = tf.keras.Sequential([
tf.keras.layers.experimental.preprocessing.RandomFlip('horizontal'),
tf.keras.layers.experimental.preprocessing.RandomRotation(0.2),
tf.keras.layers.experimental.preprocessing.RandomZoom(0.2),
tf.keras.layers.experimental.preprocessing.RandomContrast(0.2),
tf.keras.layers.experimental.preprocessing.RandomTranslation(0.2, 0.2)
])
# augment the train tensors from directory
# Apply data augmentation to train tensors
augmented_train_tensors = train_tensors.map(lambda x, y: (data_augmentation(x, training=True), y))
# extract the augmented train images and labels
augmented_train_images, augmented_train_labels = helper.extract_images_and_labels(augmented_train_tensors, normalize=True, categorical=True)
# visualize the augmented images
helper.visualize_images(augmented_train_tensors, BATCH_SIZE, class_names=list(labels.values()))
# Call the function with your train_images, train_labels, and labels
helper.visualize_dimensionality_reduction(train_images, train_labels, labels)
# Model Architecture leveraging pretrained weights
LEARNING_RATE = 0.005
NUM_CLASSES = 3
DROPOUT = 0.1
WEIGHT_DECAY = 0.00000
ACTIVATION = 'softmax'
OPTIMIZER = 'adam'
MOMENTUM = 0.9
model = helper.build_model(num_classes=NUM_CLASSES, dropout=DROPOUT,
weight_decay=WEIGHT_DECAY, activation=ACTIVATION,
optimizer=OPTIMIZER, momentum=MOMENTUM,
learning_rate=LEARNING_RATE)
EPOCHS = 100
CHECKPOINT_PATH = 'model.h5'
MONITOR = 'val_loss'
MODE = 'min'
PATIENCE = 5 # number of epochs with no improvement after which learning rate will be reduced.
FACTOR = 0.5 # factor by which the learning rate will be reduced. new_lr = lr * factor
# defining the callbacks
callbacks = helper.define_callbacks(checkpoint_path=CHECKPOINT_PATH,
monitor=MONITOR, mode=MODE,
patience=PATIENCE, factor=FACTOR)
# Train the model
history = model.fit(
augmented_train_images,
augmented_train_labels,
epochs=EPOCHS,
validation_data=(val_images, val_labels),
verbose=2,
callbacks=callbacks
)
15/15 - 1s - loss: 0.3095 - accuracy: 0.8772 - val_loss: 0.2974 - val_accuracy: 0.9206 - lr: 1.5625e-04 - 1s/epoch - 71ms/step
Accuracy is the most intuitive performance measure and it is simply a ratio of correctly predicted
observation to the total observations.
Precision is a measure of how many of the positive predictions made are correct (true
positives).
Recall is a measure of how many of the positive cases the classifier correctly predicted, over all
the positive cases in the data.
For more information on the classification report, check out this link.
# evaluate the best saved model
model = tf.keras.models.load_model('model.h5')
print(model.evaluate(test_images, test_labels))
helper.plot_classification_report(model, test_tensors, labels)
2/2 [==============================] - 1s 33ms/step - loss: 0.8017 - accuracy: 0.8644 [0.8016550540924072, 0.8644067645072937] 2/2 [==============================] - 1s 33ms/step
helper.inference_model(model, mapping=labels)
Class: ZEBRA 1/1 [==============================] - 0s 34ms/step
def representative_dataset_generator():
for data, _ in train_tensors:
yield [data] # shape: (batch_size, height, width, channels)
# load the best model
model = tf.keras.models.load_model('model.h5')
# quantizing the model to int8
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.representative_dataset = representative_dataset_generator
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.target_spec.supported_types = [tf.dtypes.int8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
hub_m = converter.convert()
# save the keras model
with open('model.tflite', 'wb') as f:
f.write(hub_m)
# evaluate the tflite model
print('Evaluating tflite model...')
helper.evaluate_tflite('model.tflite', test_tensors, dtype='int8')
INFO:tensorflow:Assets written to: C:\Users\Austin\AppData\Local\Temp\tmpg415r5r8\assets
INFO:tensorflow:Assets written to: C:\Users\Austin\AppData\Local\Temp\tmpg415r5r8\assets
import edgeimpulse as ei
import json
# add the api key to edge impulse
with open('config.json') as f:
config = json.load(f)
api_key = config['edge_impulse_api_key']
ei.API_KEY = api_key
# the supported devices
profile_devices = ei.model.list_profile_devices()
print(profile_devices)
# target device is open mv cam h7 plus
model_profile = ei.model.profile(model='model.tflite')
# Profiling the model to check the compatibility with the MCU
model_profile.summary()
# Write the class names to text file
with open('labels.txt', 'w') as f:
for item in list(train_tensors.class_names):
f.write("%s\n" % item)
# create a folder called openmv-model if it doesn't exist
if not os.path.exists('openmv-model'):
os.mkdir('openmv-model')
# move the tflite file and labels.txt file into the openmv-model folder
os.rename('mobileweights_6classes4.tflite', 'openmv-model/trained449.tflite')
'''Uncomment and run this once if the file does not exist'''
# os.rename('labels.txt', 'openmv-model/labels.txt')
# evaluate the performance of the tflite model
helper.evaluate_tflite('openmv-model/trained449.tflite', test_tensors, dtype='int8')
0.5423728813559322