Design: Multiprocessing

This chapter is about how multiprocessing is implementated.

Design decision: multiprocessing.process

There is a playground: experiments/multiprocessing_process_queue_batteries.py

The implementation is here: src/testbed/multiprocessing/util_multiprocessing.py

There are many ways how multiprocessing may be done with python. I opted for ‘multiprocessing.process.Process()’ in a ‘spawn’ context. The processes communicate with ‘multiprocessing.Queue()’.

See

The reason why I did not use https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool was, that there is no timeout for a task in a pool. However, such a timeout is very important for a testenvironment which may be flaky.

Target

Note

I use the name ‘Target’ and ‘AsyncTarget’.

This originates from the python implmentation of https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process().

The second paramter is called ‘target’ which is the function which has to be called in the remote process.

Note

It is very difficult to debug a program which is spread out over different processes! To ease this, I implemented a small wrapper which allows to run the ‘target’ in a external process or just blocking in the very same process.

  • See: –no-multiprocessing

class TargetCtx:
    def start(self, async_target: AsyncTarget) -> None:
        process = self.ctx.Process(
              name=target_unique_name,
              target=async_target.target_func,
              args=target_args_complete,
        )
        async_target.target = Target(
              multiprocessing=self.multiprocessing,
              process=process,
              timeout_s=async_target.timeout_s,
        )

        if self.multiprocessing:
              #
              # Start the process which then calles 'target'
              #
              process.start()
        else:
              # Call the 'target' function directly
              async_target.target_func(*target_args_complete)

The important code is if self.multiprocessing.

Sequence diagram

sequenceDiagram participant tr as Testrunner participant b as TestBartender/BuildBartender participant tx as TargetCtx participant f as TestFunc/BuildFunc tr->>b: testrun = testrun_next() b->>tx: run(AsyncResult(testrun)) tx->>+f: f(unique_id) f->>-tx: Event(unique_id) tx->>b: done(unique_id)

communication between main and subprocesses

When a process is started, the target function will be called with paramters.

These parameters are pickled by ‘multiprocessing.process’ and call on of these functions:

# src/testbed/mptest/util_testrunner.py
def target_run_one_test_async(
   arg1: util_multiprocessing.TargetArg1,
   args: Args,
   ctxtestrun: CtxTestRun,
   testrun: TestRun,
   repo_micropython_tests: pathlib.Path,
) -> None:
   ...

# src/testbed/multiprocessing/firmware_bartender.py
def target_build_firmware_async(
   arg1: util_multiprocessing.TargetArg1,
   directory_mpbuild_artifacts: pathlib.Path,
   firmwares: FirmwaresTobeBuilt,
   repo_micropython_firmware: pathlib.Path,
) -> None:
   ...

Now a process may send events to the main process. All events derive from EventBase.

@dataclass(repr=True)
class EventBase:
   target_unique_name: str


@dataclass(repr=True)
class EventExit(EventBase):
   logfile: pathlib.Path
   success: bool

   @property
   def logfile_relative(self) -> pathlib.Path:
      return relative_cwd(self.logfile)


@dataclass(repr=True)
class EventLog(EventBase):
   msg: str

FirmwareBartender and TestBartender cooperation

The FirmwareBartender

  • Start a process which * Compiles one firmware after each other * Notifies the FirmwareBartender with EventFirmwareSpec

The TestBartender

  • Waits for

    • FirmwareSpecs to be ready

    • Tentacles to be available

    • If a ‘TestRun’ is available * This test will be started

    • If the TestBartender has no tests anymore to to, the mptest will exit.