What is a Part

A part is a Python class that wraps a functional component of a vehicle.

These include:

  • Sensors - Cameras, Lidar, Odometers, GPS ...
  • Actuators - Motor Controllers
  • Pilots - Lane Detectors, Behavioral Cloning models, ...
  • Controllers - Web based or Bluetooth.
  • Stores - Tub, or a way to save data.

This is an excellent video by Tawn Kramer that walks through how to make a part: https://www.youtube.com/watch?v=YZ4ESrtfShs

Here is an example how to use the PiCamera part to publish an image in the 'cam/img' channel on every drive loop.

V = dk.Vehicle()

#initialize the camera part
cam = PiCamera()

#add the part to the vehicle.
V.add(cam, outputs=['cam/img'])

V.start()

Anatomy of a Part

All parts share a common structure so that they can all be run by the vehicle's drive loop.

A part must have either an "run" or a "run_threaded" function that does the work (see below). It may also require inputs=['in single quotes', 'seperated by commas'] and will always generate at least one outputs=['in single quotes', 'seperated by commas']

If the part grabs some hardware resource, such as a camera or a serial port, it should also have a shutdown function that releases those resources properly when donkey is stopped.

Here is an example of a part that will accept a number, multiply it by a random number and return the result.

import random

class RandPercent:
    def run(self, x):
        return x * random.random()

Now to add this to a vehicle:

V = dk.Vehicle()

# initialize the channel value
V.mem['const'] = 4

# add the part to read and write to the same channel.
V.add(RandPercent, inputs=['const'], outputs=['const'])

V.start(max_loops=5)

Threaded Parts

For a vehicle to perform well the drive loop must execute 10-30 times per second (determined by the DRIVE_LOOP_HZ setting in your config file; the default is 20hz) overall, so slow parts should be threaded to avoid holding up the drive loop.

A threaded part needs to define the function that runs in the separate thread and the function to call that will return the most recent values quickly.

When you add threaded = True when adding a part, the main Donkey program will call the part's "run_threaded" function instead of the "run" function. So for the below example you would add the part with this:

V.add(RandPercent, inputs=['const'], outputs=['const'], threaded=True)

Once you have a "run_threaded" function, Donkey will automatically look for an "update" function and run that in its own thread, as you can see below.

Here's an example how to make the RandPercent part threaded if the run function takes a second to complete.

import random
import time

class RandPercent:
    self.in = 0.0
    self.out = 0.0
    def run(self, x):
        return x * random.random()
        time.sleep(1)

    def update(self):
        # the function run in its own thread
        while True:
            self.out = self.run(self.in)

    def run_threaded(self, x):
        self.in = x
        return self.out

  • part.run : function used to run the part
  • part.run_threaded : drive loop function run if part is threaded.
  • part.update : threaded function
  • part.shutdown