PBS Queue Management

Functions for submiting and monitoring jobs in a PBS queue.

  • submit_job() — submit a single job and optionally wait for it to finish.

  • submit_many_jobs() — submit a batch of jobs with varying parameters control.

  • wait_for_job() — block until a single job (or a set of jobs) finishes.

  • print_jobs() — display currently queued/running jobs.

Configuration

Default values for several job submission parameters can be overridden by setting environment variables before running your script or the oi CLI. All variables use the OI_TOOLS_ prefix:

Environment variable

Default

Description

OI_TOOLS_PBS_POLL_DELAY

5.0

Seconds between status polls when waiting for a job to finish.

OI_TOOLS_PBS_DEFAULT_CPUS

4

Default number of CPU cores to request when submitting a job.

OI_TOOLS_PBS_DEFAULT_MEM

'8G'

Default memory to request when submitting a job.

OI_TOOLS_PBS_LOG_FOLDER

'logs'

Default directory for job log files.

OI_TOOLS_PBS_MAX_CONCURRENT_JOBS

7

Maximum number of jobs to run simultaneously via submit_many_jobs().

Using these, one can set project or user defaults in a .env file or shell profile:

export OI_TOOLS_PBS_DEFAULT_MEM=16G
export OI_TOOLS_PBS_DEFAULT_CPUS=8
export OI_TOOLS_PBS_LOG_FOLDER=/scratch/logs

API Documentation

oi_tools.pbs.print_jobs(
*users: str,
all: bool = False,
limit: int = 20,
completed: bool = False,
sort: Literal['user', 'mem', 'cpus'] = 'user',
) None

Print PBS jobs in a formatted table.

Parameters:
  • users (str) – Usernames indicating whose jobs to show. If omitted, shows only the current user’s jobs.

  • all (bool) – Show everyone’s jobs.

  • limit (int) – Maximum number of jobs to display.

  • completed (bool) – Include completed jobs.

  • sort (Literal['user', 'mem', 'cpus']) – What to sort by.

Return type:

None

Examples

>>> print_jobs()
>>> # print jobs for two users
>>> print_jobs("patti193", "chett184", sort="mem")
oi_tools.pbs.submit_job(
file: str | Path,
args: Sequence[str | Path | float | int] = [],
*,
mem: str | int = '8G',
cpus: int = 4,
wait: bool = False,
log_folder: Path | str | None = PosixPath('logs'),
verbose: bool = True,
filetype: Literal['python_script', 'python_module', 'stata_script', 'r_script', 'sas_script'] | None = None,
base_job_name: str | None = None,
python_executable: Path | None = None,
cwd: Path | str | None = None,
env_vars: Mapping[str, str | Path | float | int] | None = None,
) str

Submit a single script or module as a PBS job.

Logs are written to <log_folder>/<base_job_name>/<args>/, with one file per submission named by today’s date and an incrementing counter (e.g. logs/myscript/2001/2026-01-15-1.log).

If filetype is omitted, the filetype is inferred from the file suffix (e.g. .pypython_script, .dostata_script). If the suffix is unrecognized, this function will falls back to "python_module".

Parameters:
  • file (str | Path) – Path to the script or python module.

  • args (Sequence[str | Path | float | int]) – Command-line arguments to pass to the script or module.

  • mem (str | int) – Memory to request. Can be an integer (treated as gigabytes) or a string such as "16G". The default value is configurable via the OI_TOOLS_PBS_DEFAULT_MEM environment variable.

  • cpus (int) – Number of CPU cores to request. The default value is configurable via the OI_TOOLS_PBS_DEFAULT_CPUS environment variable.

  • wait (bool) – Whether to wait for the job to finish before returning (True) or immediately return (False).

  • log_folder (Path | str | None) – Directory in which to create log files. Set to None to discard output. The default value is configurable via the OI_TOOLS_PBS_LOG_FOLDER environment variable.

  • verbose (bool) – Print job details and the generated PBS script before submitting.

  • filetype (Literal['python_script', 'python_module', 'stata_script', 'r_script', 'sas_script'] | None) – Explicitly set the script type.

  • base_job_name (str | None) – Base name used for the PBS job and the log subdirectory. Defaults to the script path with the suffix removed and non-alphanumeric characters replaced by hyphens (e.g. "code/myscript.py""code-myscript").

  • python_executable (Path | None) – Path to the Python interpreter to use for Python jobs. Defaults to whatever version of Python is used to submit the job (sys.executable).

  • cwd (Path | str | None) – Working directory for the job. Defaults to the current directory at submission time.

  • env_vars (Mapping[str, str | Path | float | int] | None) – Optional dictionary of environment variables to export in the job script (e.g. {"MY_VAR": "value"}).

Returns:

The PBS job ID returned by qsub (e.g. "12345.cluster").

Return type:

str

Examples

Submit a Python script with a year argument:

>>> from pathlib import Path
>>> job_id = submit_job(
...     "code/myscript.py",
...     ["2001"],
...     mem="16G",
...     cpus=8,
...     log_folder="logs",
...     python_executable=Path(".venv/bin/python3"),
...     base_job_name="jobname",
... )

Submit a Stata do-file:

>>> submit_job("code/myscript.do", ["2001"])

Submit an R script:

>>> submit_job("code/myscript.r", ["2001"])

Submit a Python module:

>>> submit_job("myproject.submodule", filetype="python_module")

Pass environment variables to the job:

>>> submit_job(
...     "code/myscript.py",
...     ["2001"],
...     env_vars={"MY_TOKEN": "abc123", "DATA_DIR": "/scratch/myproject"},
... )
oi_tools.pbs.submit_many_jobs(
file: str | Path | Sequence[str | Path],
*,
args: Sequence[Sequence[str | Path | float | int]] | None = None,
env_vars: Sequence[Mapping[str, str | Path | float | int]] | None = None,
max_concurrent_jobs: int = 7,
stop_on_error: bool = True,
**kwargs: Any,
) None

Submit multiple jobs to the PBS queue and block until all finish.

Jobs are submitted in batches to keep at most max_concurrent_jobs running or queued at any time. As each job completes, the next one is submitted automatically. If any job exits with a non-zero status, a BatchJobError is raised (unless stop_on_error=False).

Parameters:
  • file (str | Path | Sequence[str | Path]) – Either a single script/Python module or a sequence of scripts/Python modules. If given single script, all jobs will be submitted using that same script. If given a list of scripts, it must be the same length as args/env_vars.

  • args (Sequence[Sequence[str | Path | float | int]] | None) – An iterable of argument lists, one per job. Each element is passed as the args parameter of submit_job(). Mutually exclusive with env_vars.

  • env_vars (Sequence[Mapping[str, str | Path | float | int]] | None) – An iterable of environment variable dicts, one per job. Each element is passed as the env_vars parameter of submit_job(). Mutually exclusive with args.

  • max_concurrent_jobs (int) – Maximum number of jobs to have running or queued at any given time.

  • stop_on_error (bool) – If True (default), raise BatchJobError immediately as soon as any job fails. If False, continue submitting jobs and raise an exception once all jobs have either failed or successfully completed.

  • **kwargs (Any) – Additional keyword arguments forwarded to every submit_job() call (e.g. filetype, mem, cpus, log_folder, etc.).

Raises:

BatchJobError – If a job fails.

Return type:

None

Examples

Submit one job per year for 2000–2019, keeping at most 3 running at once, and block until all jobs finish:

>>> many_args = [[year] for year in range(2000, 2020)]
>>> submit_many_jobs(
...     "code/myscript.py",
...     args=many_args,
...     max_concurrent_jobs=3,
...     mem="16G",
... )

Submit one job per state, passing each state as an environment variable:

>>> many_env_vars = [{"STATE": s} for s in ["MN", "MA", "WY"]]
>>> submit_many_jobs(
...     "code/myscript.py",
...     env_vars=many_env_vars,
... )

Submit a different script per job:

>>> scripts = ["code/step1.py", "code/step2.py", "code/step3.py"]
>>> submit_many_jobs(
...     scripts,
...     env_vars=[{"INPUT": f} for f in scripts],
... )

Collect failing jobs instead of stopping on first failure:

>>> failed = submit_many_jobs(
...     "code/myscript.py",
...     args=many_args,
...     stop_on_error=False,
... )
>>> failed
[[2004], [2012]] # two jobs failed
oi_tools.pbs.wait_for_job(
job_ids: str | Collection[str],
*,
stop_on_error: bool = True,
wait_for_all: bool = False,
polling_delay: int | float = 5.0,
) tuple[str, int]

Watch the specified job(s) and return when one (or all) finish.

Parameters:
  • job_ids (str | Collection[str]) – PBS job ID(s) to monitor.

  • stop_on_error (bool) – If True, raise BatchJobError when a job exits with non-zero status.

  • wait_for_all (bool) – If True, block until every job finishes and return the last job ID. If False (default), return as soon as the first job finishes.

  • polling_delay (int | float) – Seconds to wait between status checks.

Returns:

The job ID and exit status of a finished job. If wait_for_all=True, this is the last job to complete; otherwise it is the first detected finished job.

Return type:

tuple[str, int]

Raises:
  • ValueError – If job_ids is empty.

  • BatchJobError – If a job exits with non-zero status and stop_on_error is True.

Examples

Wait for a single job to finish:

>>> job_id, status = wait_for_job("12345.pbs")

Wait for all jobs in a list, ignoring errors:

>>> wait_for_job(
...     ["12345.pbs", "12346.pbs"],
...     wait_for_all=True,
...     stop_on_error=False,
... )