How to build your own model
Note: This requires version >= 4.1.X
Overview
You might want to write your own model:
- If you find the models that ship with donkey not sufficient, and you want to experiment with your own model infrastructure
- If you want to add more input data to the model because your car has more sensors
Constructor
Models are located in donkeycar/parts/keras.py
. Your own model needs to
inherit from KerasPilot
and initialize your model:
class KerasSensors(KerasPilot):
def __init__(self, input_shape=(120, 160, 3), num_sensors=2):
super().__init__()
self.num_sensors = num_sensors
self.model = self.create_model(input_shape)
Here, you implement the keras model
in the member function create_model()
. The model needs to have labelled input
and output tensors. These are required for the training to work.
Training interface
What is required for your model to work, are the following functions:
def compile(self):
self.model.compile(optimizer=self.optimizer, metrics=['accuracy'],
loss={'angle_out': 'categorical_crossentropy',
'throttle_out': 'categorical_crossentropy'},
loss_weights={'angle_out': 0.5, 'throttle_out': 0.5})
The compile
function tells keras how to define the loss function for training.
We are using the KerasCategorical
model as an example. The loss function here
makes explicit usage of the output tensors of the
model (angle_out, throttle_out
).
def x_transform(self, record: TubRecord):
img_arr = record.image(cached=True)
return img_arr
In this function you define how to extract the input data from your
recorded data. This data is usually called X
in the ML frame work . We have
shown the implementation in the base class which works for all models that have
only the image as input.
The function returns a single data item if the model has only one input. You need to return a tuple if your model uses more input data.
Note: If your model has more inputs, the tuple needs to have the image in the first place.
def y_transform(self, record: TubRecord):
angle: float = record.underlying['user/angle']
throttle: float = record.underlying['user/throttle']
return angle, throttle
In this function you specify how to extract the y
values (i.e. target
values) from your recorded data.
def x_translate(self, x: XY) -> Dict[str, Union[float, np.ndarray]]:
return {'img_in': x}
Here we require a translation of how the X
value that you extracted above will
be fed into tf.data
. Note, tf.data
expects a dictionary if the model has
more than one input variable, so we have chosen to use dictionaries also in the
one-argument case for consistency. Above we have shown the implementation in the
base class which works for all models that have only the image as input. You
don't have to overwrite neither x_transform
nor x_translate
if your
model only uses the image as input data.
Note: the keys of the dictionary must match the name of the input layers in the model.
def y_translate(self, y: XY) -> Dict[str, Union[float, np.ndarray]]:
if isinstance(y, tuple):
angle, throttle = y
return {'angle_out': angle, 'throttle_out': throttle}
else:
raise TypeError('Expected tuple')
Similar to the above, this provides the translation of the y
data into the
dictionary required for tf.data
. This example shows the implementation of
KerasLinear
.
Note: the keys of the dictionary must match the name of the output layers in the model.
def output_shapes(self):
# need to cut off None from [None, 120, 160, 3] tensor shape
img_shape = self.get_input_shape()[1:]
shapes = ({'img_in': tf.TensorShape(img_shape)},
{'angle_out': tf.TensorShape([15]),
'throttle_out': tf.TensorShape([20])})
return shapes
This function returns a tuple of two dictionaries that tells tensorflow which
shapes are used in the model. We have shown the example of the
KerasCategorical
model here.
Note 1: As above, the keys of the two dictionaries must match the name of the input and output layers in the model.
Note 2: Where the model returns scalar numbers, the corresponding
type has to be tf.TensorShape([])
.
Parts interface
In the car application the model is called through the run()
function. That
function is already provided in the base class where the normalisation of the
input image is happening centrally. Instead, the derived classes have to
implement
inference()
which works on the normalised data. If you have additional data
that needs to be normalised, too, you might want to override run()
as well.
def inference(self, img_arr, other_arr):
img_arr = img_arr.reshape((1,) + img_arr.shape)
outputs = self.model.predict(img_arr)
steering = outputs[0]
throttle = outputs[1]
return steering[0][0], throttle[0][0]
Here we are showing the implementation of the linear model. Please note that
the input tensor shape always contains the batch dimension in the first
place, hence the shape of the input image is adjusted from
(120, 160, 3) -> (1, 120, 160, 3)
.
Note: _If you are passing another array in theother_arr
variable, you will
have to do a similar re-shaping.
Example
Let's build a new donkey model which is based on the standard linear model but has following changes w.r.t. input data and network design:
-
The model takes an additional vector of input data that represents a set of values from distance sensors which are attached to the front of the car.
-
The model adds a couple of more feed-forward layers to combine the CNN layers of the vision system with the distance sensor data.
Building the model using keras
So here is the example model:
class KerasSensors(KerasPilot):
def __init__(self, input_shape=(120, 160, 3), num_sensors=2):
super().__init__()
self.num_sensors = num_sensors
self.model = self.create_model(input_shape)
def create_model(self, input_shape):
drop = 0.2
img_in = Input(shape=input_shape, name='img_in')
x = core_cnn_layers(img_in, drop)
x = Dense(100, activation='relu', name='dense_1')(x)
x = Dropout(drop)(x)
x = Dense(50, activation='relu', name='dense_2')(x)
x = Dropout(drop)(x)
# up to here, this is the standard linear model, now we add the
# sensor data to it
sensor_in = Input(shape=(self.num_sensors, ), name='sensor_in')
y = sensor_in
z = concatenate([x, y])
# here we add two more dense layers
z = Dense(50, activation='relu', name='dense_3')(z)
z = Dropout(drop)(z)
z = Dense(50, activation='relu', name='dense_4')(z)
z = Dropout(drop)(z)
# two outputs for angle and throttle
outputs = [
Dense(1, activation='linear', name='n_outputs' + str(i))(z)
for i in range(2)]
# the model needs to specify the additional input here
model = Model(inputs=[img_in, sensor_in], outputs=outputs)
return model
def compile(self):
self.model.compile(optimizer=self.optimizer, loss='mse')
def inference(self, img_arr, other_arr):
img_arr = img_arr.reshape((1,) + img_arr.shape)
sens_arr = other_arr.reshape((1,) + other_arr.shape)
outputs = self.model.predict([img_arr, sens_arr])
steering = outputs[0]
throttle = outputs[1]
return steering[0][0], throttle[0][0]
def x_transform(self, record: TubRecord) -> XY:
img_arr = super().x_transform(record)
# for simplicity we assume the sensor data here is normalised
sensor_arr = np.array(record.underlying['sensor'])
# we need to return the image data first
return img_arr, sensor_arr
def x_translate(self, x: XY) -> Dict[str, Union[float, np.ndarray]]:
assert isinstance(x, tuple), 'Requires tuple as input'
# the keys are the names of the input layers of the model
return {'img_in': x[0], 'sensor_in': x[1]}
def y_transform(self, record: TubRecord):
angle: float = record.underlying['user/angle']
throttle: float = record.underlying['user/throttle']
return angle, throttle
def y_translate(self, y: XY) -> Dict[str, Union[float, np.ndarray]]:
if isinstance(y, tuple):
angle, throttle = y
# the keys are the names of the output layers of the model
return {'n_outputs0': angle, 'n_outputs1': throttle}
else:
raise TypeError('Expected tuple')
def output_shapes(self):
# need to cut off None from [None, 120, 160, 3] tensor shape
img_shape = self.get_input_shape()[1:]
# the keys need to match the models input/output layers
shapes = ({'img_in': tf.TensorShape(img_shape),
'sensor_in': tf.TensorShape([self.num_sensors])},
{'n_outputs0': tf.TensorShape([]),
'n_outputs1': tf.TensorShape([])})
return shapes
We could have inherited from KerasLinear
which already provides the
implementation of y_transform(), y_translate(), compile()
. However, to
make it explicit for the general case we have implemented all functions here.
The model requires the sensor data to be an array in the TubRecord with key
"sensor"
.
Creating a tub
Because we don't have a tub with sensor data, let's create one with fake sensor entries:
import os
import tarfile
import numpy as np
from donkeycar.parts.tub_v2 import Tub
from donkeycar.pipeline.types import TubRecord
from donkeycar.config import load_config
if __name__ == '__main__':
# put your path to your car app
my_car = os.path.expanduser('~/mycar')
cfg = load_config(os.path.join(my_car, 'config.py'))
# put your path to donkey project
tar = tarfile.open(os.path.expanduser(
'~/Python/donkeycar/donkeycar/tests/tub/tub.tar.gz'))
tub_parent = os.path.join(my_car, 'data2/')
tar.extractall(tub_parent)
tub_path = os.path.join(tub_parent, 'tub')
tub1 = Tub(tub_path)
tub2 = Tub(os.path.join(my_car, 'data2/tub_sensor'),
inputs=['cam/image_array', 'user/angle', 'user/throttle',
'sensor'],
types=['image_array', 'float', 'float', 'list'])
for record in tub1:
t_record = TubRecord(config=cfg,
base_path=tub1.base_path,
underlying=record)
img_arr = t_record.image(cached=False)
record['sensor'] = list(np.random.uniform(size=2))
record['cam/image_array'] = img_arr
tub2.write_record(record)
Making the model available
We don't have a dynamic factory yet, so we need to add the new model into the
function get_model_by_type()
in the module donkeycar/utils.py
:
...
elif model_type == 'sensor':
kl = KerasSensors(input_shape=input_shape)
...
Go train
In your car app folder now the following should work:
donkey train --tub data2/tub_sensor --model models/pilot.h5 --type sensor
Because of the random values in the data the model will not converge quickly,
the goal here is to get it working in the framework.
Support and discussions
Please join the Discord Donkey Car group for support and discussions.