Understanding subprocess.CompletedProcess for Concurrent Execution in Python


Concurrent Execution in Python

Concurrent execution, also known as concurrency, refers to the ability of a program to handle multiple tasks (processes or threads) seemingly at the same time. Python's Global Interpreter Lock (GIL) limits true parallel execution in most cases (where multiple CPU cores can work on tasks simultaneously), but it excels at concurrency using techniques like threads and asynchronous I/O.

The subprocess Module and subprocess.CompletedProcess

The subprocess module provides functions to execute external programs (subprocesses) from within your Python script. These subprocesses run as separate processes from your main Python program.

The subprocess.run() function is the recommended approach for most use cases. It executes a subprocess, waits for its completion, and returns a subprocess.CompletedProcess object. This object holds information about the completed subprocess, including:

  • stderr
    The standard error of the subprocess as bytes (if captured using the stderr argument).
  • stdout
    The standard output of the subprocess as bytes (if captured using the stdout argument).
  • returncode
    An integer representing the exit status of the subprocess (0 usually indicates success, non-zero indicates an error).

Using subprocess.CompletedProcess for Concurrent Execution

While subprocess.run() itself isn't inherently for concurrent execution (as it waits for each subprocess to finish), you can leverage it within your concurrent programming approach. Here are two common strategies:

    • Create multiple threads, each using subprocess.run() to execute a different program concurrently.
    • Use thread synchronization mechanisms (e.g., locks, semaphores) to coordinate access to shared resources if necessary.
  1. concurrent.futures Module

    • The concurrent.futures module provides higher-level abstractions for concurrent execution.
    • You can use the ProcessPoolExecutor class with subprocess.run() as the callable to manage a pool of worker processes that can execute subprocesses concurrently.

Example: Thread-Based Concurrency with subprocess.CompletedProcess

import subprocess
import threading

def run_subprocess(cmd):
    process = subprocess.run(cmd, capture_output=True)  # Capture stdout and stderr
    # Process the results (returncode, stdout, stderr)
    # ...

threads = []
commands = [["program1", "arg1"], ["program2", "arg2"]]  # Example commands

for cmd in commands:
    thread = threading.Thread(target=run_subprocess, args=(cmd,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()  # Wait for each thread to finish

# All subprocesses have completed, access results from run_subprocess

Key Points

  • Consider thread safety and resource management when implementing concurrency.
  • It's not directly for concurrent execution, but facilitates it within thread-based or concurrent.futures approaches.
  • subprocess.CompletedProcess provides information about a finished subprocess.


import subprocess
import threading

def run_subprocess(cmd):
    process = subprocess.run(cmd, capture_output=True)
    print(f"Command: {cmd}")
    print(f"Exit code: {process.returncode}")
    if process.returncode == 0:
        output = process.stdout.decode()  # Decode bytes to string if needed
        print(f"Output: {output}")
    else:
        error = process.stderr.decode()  # Decode bytes to string if needed
        print(f"Error: {error}")
    print("-" * 20)  # Separator for readability

commands = [["ls", "-l"], ["ping", "8.8.8.8", "-c", "3"]]  # Example commands

threads = []
for cmd in commands:
    thread = threading.Thread(target=run_subprocess, args=(cmd,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()  # Wait for each thread to finish

print("All subprocesses completed!")

This code runs ls -l and ping 8.8.8.8 -c 3 concurrently using separate threads. The run_subprocess function executes the command, retrieves the subprocess.CompletedProcess object, and prints the exit code, output (if successful), or error message.

This example demonstrates using the concurrent.futures module for concurrent execution with a pool of worker processes:

from concurrent.futures import ProcessPoolExecutor
import subprocess

def run_subprocess(cmd):
    process = subprocess.run(cmd, capture_output=True)
    # Process the results (returncode, stdout, stderr)
    # ...
    return process  # Return the entire CompletedProcess object

commands = [["program1", "arg1"], ["program2", "arg2"]]  # Example commands

with ProcessPoolExecutor() as executor:
    results = executor.map(run_subprocess, commands)  # Submit commands for execution

for result in results:
    print(f"Command: {result.args[0]}")  # Access arguments (command name)
    print(f"Exit code: {result.returncode}")
    if result.returncode == 0:
        output = result.stdout.decode()  # Decode bytes to string if needed
        print(f"Output: {output}")
    else:
        error = result.stderr.decode()  # Decode bytes to string if needed
        print(f"Error: {error}")
    print("-" * 20)  # Separator for readability

print("All subprocesses completed!")

This code uses a ProcessPoolExecutor to manage a pool of worker processes. The run_subprocess function is submitted to the executor with each command, and the results are collected as subprocess.CompletedProcess objects in the results list. The code then iterates through the results, extracting information and handling output/error messages.



Custom Data Structures

  • If you only need a subset of information captured by subprocess.CompletedProcess (e.g., just return code and output), you can create your own data structure (like a dictionary) to store the relevant data. This gives you more control over the format and avoids unnecessary object creation.

Error Handling in subprocess.run

  • The subprocess.run function itself offers several ways to handle the completion status of the subprocess:
    • check (default False)
      Raises an exception for non-zero return codes.
    • timeout
      Sets a maximum execution time for the subprocess and raises a TimeoutExpired exception if exceeded.
    • Custom Error Handling
      You can check the returncode returned by subprocess.run and handle errors or warnings accordingly in your code.

shlex Module (for Shell-like Commands)

  • If you're working with shell-like commands and only need the return code, consider using the shlex module to parse the command string and then use the os.system or os.popen functions (although subprocess is generally preferred). These functions might provide simpler ways to capture the return code.
  • For more advanced control over shell commands and output parsing, explore third-party libraries like pexpect or pty that offer finer-grained interaction with subprocesses.
  • Be cautious with os.system and os.popen as they have security implications when dealing with untrusted user input.
  • When choosing an alternative, ensure it captures the essential information you need about the completed subprocess (e.g., return code, output, error message).