Design: Multiprocessing
This chapter is about how multiprocessing is implementated.
Design decision: multiprocessing.process
There is a playground: experiments/multiprocessing_process_queue_batteries.py
The implementation is here: src/testbed/multiprocessing/util_multiprocessing.py
There are many ways how multiprocessing may be done with python. I opted for ‘multiprocessing.process.Process()’ in a ‘spawn’ context. The processes communicate with ‘multiprocessing.Queue()’.
See
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
The reason why I did not use https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool was, that there is no timeout for a task in a pool. However, such a timeout is very important for a testenvironment which may be flaky.
Target
Note
I use the name ‘Target’ and ‘AsyncTarget’.
This originates from the python implmentation of https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process().
The second paramter is called ‘target’ which is the function which has to be called in the remote process.
Note
It is very difficult to debug a program which is spread out over different processes! To ease this, I implemented a small wrapper which allows to run the ‘target’ in a external process or just blocking in the very same process.
See: –no-multiprocessing
class TargetCtx:
def start(self, async_target: AsyncTarget) -> None:
process = self.ctx.Process(
name=target_unique_name,
target=async_target.target_func,
args=target_args_complete,
)
async_target.target = Target(
multiprocessing=self.multiprocessing,
process=process,
timeout_s=async_target.timeout_s,
)
if self.multiprocessing:
#
# Start the process which then calles 'target'
#
process.start()
else:
# Call the 'target' function directly
async_target.target_func(*target_args_complete)
The important code is if self.multiprocessing.
Sequence diagram
communication between main and subprocesses
When a process is started, the target function will be called with paramters.
These parameters are pickled by ‘multiprocessing.process’ and call on of these functions:
# src/testbed/mptest/util_testrunner.py
def target_run_one_test_async(
arg1: util_multiprocessing.TargetArg1,
args: Args,
ctxtestrun: CtxTestRun,
testrun: TestRun,
repo_micropython_tests: pathlib.Path,
) -> None:
...
# src/testbed/multiprocessing/firmware_bartender.py
def target_build_firmware_async(
arg1: util_multiprocessing.TargetArg1,
directory_mpbuild_artifacts: pathlib.Path,
firmwares: FirmwaresTobeBuilt,
repo_micropython_firmware: pathlib.Path,
) -> None:
...
Now a process may send events to the main process. All events derive from EventBase.
@dataclass(repr=True)
class EventBase:
target_unique_name: str
@dataclass(repr=True)
class EventExit(EventBase):
logfile: pathlib.Path
success: bool
@property
def logfile_relative(self) -> pathlib.Path:
return relative_cwd(self.logfile)
@dataclass(repr=True)
class EventLog(EventBase):
msg: str
FirmwareBartender and TestBartender cooperation
The FirmwareBartender
Start a process which * Compiles one firmware after each other * Notifies the FirmwareBartender with EventFirmwareSpec
The TestBartender
Waits for
FirmwareSpecs to be ready
Tentacles to be available
If a ‘TestRun’ is available * This test will be started
If the TestBartender has no tests anymore to to, the mptest will exit.