What is a Part
A part is a Python class that wraps a functional component of a vehicle.
- Sensors - Cameras, Lidar, Odometers, GPS ...
- Actuators - Motor Controllers
- Pilots - Lane Detectors, Behavioral Cloning models, ...
- Controllers - Web based or Bluetooth.
- Stores - Tub, or a way to save data.
This is an excellent video by Tawn Kramer that walks through how to make a part: https://www.youtube.com/watch?v=YZ4ESrtfShs
Here is an example how to use the PiCamera part to publish an image in the 'cam/img' channel on every drive loop.
V = dk.Vehicle() #initialize the camera part cam = PiCamera() #add the part to the vehicle. V.add(cam, outputs=['cam/img']) V.start()
Anatomy of a Part
All parts share a common structure so that they can all be run by the vehicle's drive loop.
A part must have either an "run" or a "run_threaded" function that does the work (see below). It may also require
inputs=['in single quotes', 'seperated by commas'] and will always generate at least one
outputs=['in single quotes', 'seperated by commas']
If the part grabs some hardware resource, such as a camera or a serial port, it should also have a
shutdown function that releases those resources properly when donkey is stopped.
Here is an example of a part that will accept a number, multiply it by a random number and return the result.
import random class RandPercent: def run(self, x): return x * random.random()
Now to add this to a vehicle:
V = dk.Vehicle() # initialize the channel value V.mem['const'] = 4 # add the part to read and write to the same channel. V.add(RandPercent, inputs=['const'], outputs=['const']) V.start(max_loops=5)
For a vehicle to perform well the drive loop must execute 10-30 times per
second (determined by the
DRIVE_LOOP_HZ setting in your config file; the default is 20hz)
overall, so slow parts should be threaded to avoid holding up the drive loop.
A threaded part needs to define the function that runs in the separate thread and the function to call that will return the most recent values quickly.
When you add
threaded = True when adding a part, the main Donkey program
will call the part's "run_threaded" function instead of the "run" function. So for
the below example you would add the part with this:
V.add(RandPercent, inputs=['const'], outputs=['const'], threaded=True)
Once you have a "run_threaded" function, Donkey will automatically look for an "update" function and run that in its own thread, as you can see below.
Here's an example how to make the
RandPercent part threaded if the run
function takes a second to complete.
import random import time class RandPercent: self.in = 0.0 self.out = 0.0 def run(self, x): return x * random.random() time.sleep(1) def update(self): # the function run in its own thread while True: self.out = self.run(self.in) def run_threaded(self, x): self.in = x return self.out
part.run: function used to run the part
part.run_threaded: drive loop function run if part is threaded.
part.update: threaded function