PBS Queue Management¶
Functions for submiting and monitoring jobs in a PBS queue.
submit_job()— submit a single job and optionally wait for it to finish.submit_many_jobs()— submit a batch of jobs with varying parameters control.wait_for_job()— block until a single job (or a set of jobs) finishes.print_jobs()— display currently queued/running jobs.
Configuration¶
Default values for several job submission parameters can be overridden by
setting environment variables before running your script or the oi CLI.
All variables use the OI_TOOLS_ prefix:
Environment variable |
Default |
Description |
|---|---|---|
|
|
Seconds between status polls when waiting for a job to finish. |
|
|
Default number of CPU cores to request when submitting a job. |
|
|
Default memory to request when submitting a job. |
|
|
Default directory for job log files. |
|
|
Maximum number of jobs to run simultaneously via |
Using these, one can set project or user defaults in a .env file or shell profile:
export OI_TOOLS_PBS_DEFAULT_MEM=16G
export OI_TOOLS_PBS_DEFAULT_CPUS=8
export OI_TOOLS_PBS_LOG_FOLDER=/scratch/logs
API Documentation¶
- oi_tools.pbs.print_jobs(
- *users: str,
- all: bool = False,
- limit: int = 20,
- completed: bool = False,
- sort: Literal['user', 'mem', 'cpus'] = 'user',
Print PBS jobs in a formatted table.
- Parameters:
- Return type:
None
Examples
>>> print_jobs()
>>> # print jobs for two users >>> print_jobs("patti193", "chett184", sort="mem")
- oi_tools.pbs.submit_job(
- file: str | Path,
- args: Sequence[str | Path | float | int] = [],
- *,
- mem: str | int = '8G',
- cpus: int = 4,
- wait: bool = False,
- log_folder: Path | str | None = PosixPath('logs'),
- verbose: bool = True,
- filetype: Literal['python_script', 'python_module', 'stata_script', 'r_script', 'sas_script'] | None = None,
- base_job_name: str | None = None,
- python_executable: Path | None = None,
- cwd: Path | str | None = None,
- env_vars: Mapping[str, str | Path | float | int] | None = None,
Submit a single script or module as a PBS job.
Logs are written to
<log_folder>/<base_job_name>/<args>/, with one file per submission named by today’s date and an incrementing counter (e.g.logs/myscript/2001/2026-01-15-1.log).If
filetypeis omitted, the filetype is inferred from the file suffix (e.g..py→python_script,.do→stata_script). If the suffix is unrecognized, this function will falls back to"python_module".- Parameters:
args (Sequence[str | Path | float | int]) – Command-line arguments to pass to the script or module.
mem (str | int) – Memory to request. Can be an integer (treated as gigabytes) or a string such as
"16G". The default value is configurable via theOI_TOOLS_PBS_DEFAULT_MEMenvironment variable.cpus (int) – Number of CPU cores to request. The default value is configurable via the
OI_TOOLS_PBS_DEFAULT_CPUSenvironment variable.wait (bool) – Whether to wait for the job to finish before returning (
True) or immediately return (False).log_folder (Path | str | None) – Directory in which to create log files. Set to
Noneto discard output. The default value is configurable via theOI_TOOLS_PBS_LOG_FOLDERenvironment variable.verbose (bool) – Print job details and the generated PBS script before submitting.
filetype (Literal['python_script', 'python_module', 'stata_script', 'r_script', 'sas_script'] | None) – Explicitly set the script type.
base_job_name (str | None) – Base name used for the PBS job and the log subdirectory. Defaults to the script path with the suffix removed and non-alphanumeric characters replaced by hyphens (e.g.
"code/myscript.py"→"code-myscript").python_executable (Path | None) – Path to the Python interpreter to use for Python jobs. Defaults to whatever version of Python is used to submit the job (
sys.executable).cwd (Path | str | None) – Working directory for the job. Defaults to the current directory at submission time.
env_vars (Mapping[str, str | Path | float | int] | None) – Optional dictionary of environment variables to export in the job script (e.g.
{"MY_VAR": "value"}).
- Returns:
The PBS job ID returned by
qsub(e.g."12345.cluster").- Return type:
Examples
Submit a Python script with a year argument:
>>> from pathlib import Path >>> job_id = submit_job( ... "code/myscript.py", ... ["2001"], ... mem="16G", ... cpus=8, ... log_folder="logs", ... python_executable=Path(".venv/bin/python3"), ... base_job_name="jobname", ... )
Submit a Stata do-file:
>>> submit_job("code/myscript.do", ["2001"])
Submit an R script:
>>> submit_job("code/myscript.r", ["2001"])
Submit a Python module:
>>> submit_job("myproject.submodule", filetype="python_module")
Pass environment variables to the job:
>>> submit_job( ... "code/myscript.py", ... ["2001"], ... env_vars={"MY_TOKEN": "abc123", "DATA_DIR": "/scratch/myproject"}, ... )
- oi_tools.pbs.submit_many_jobs(
- file: str | Path | Sequence[str | Path],
- *,
- args: Sequence[Sequence[str | Path | float | int]] | None = None,
- env_vars: Sequence[Mapping[str, str | Path | float | int]] | None = None,
- max_concurrent_jobs: int = 7,
- stop_on_error: bool = True,
- **kwargs: Any,
Submit multiple jobs to the PBS queue and block until all finish.
Jobs are submitted in batches to keep at most
max_concurrent_jobsrunning or queued at any time. As each job completes, the next one is submitted automatically. If any job exits with a non-zero status, aBatchJobErroris raised (unlessstop_on_error=False).- Parameters:
file (str | Path | Sequence[str | Path]) – Either a single script/Python module or a sequence of scripts/Python modules. If given single script, all jobs will be submitted using that same script. If given a list of scripts, it must be the same length as
args/env_vars.args (Sequence[Sequence[str | Path | float | int]] | None) – An iterable of argument lists, one per job. Each element is passed as the
argsparameter ofsubmit_job(). Mutually exclusive withenv_vars.env_vars (Sequence[Mapping[str, str | Path | float | int]] | None) – An iterable of environment variable dicts, one per job. Each element is passed as the
env_varsparameter ofsubmit_job(). Mutually exclusive withargs.max_concurrent_jobs (int) – Maximum number of jobs to have running or queued at any given time.
stop_on_error (bool) – If
True(default), raiseBatchJobErrorimmediately as soon as any job fails. IfFalse, continue submitting jobs and raise an exception once all jobs have either failed or successfully completed.**kwargs (Any) – Additional keyword arguments forwarded to every
submit_job()call (e.g.filetype,mem,cpus,log_folder, etc.).
- Raises:
BatchJobError – If a job fails.
- Return type:
None
Examples
Submit one job per year for 2000–2019, keeping at most 3 running at once, and block until all jobs finish:
>>> many_args = [[year] for year in range(2000, 2020)] >>> submit_many_jobs( ... "code/myscript.py", ... args=many_args, ... max_concurrent_jobs=3, ... mem="16G", ... )
Submit one job per state, passing each state as an environment variable:
>>> many_env_vars = [{"STATE": s} for s in ["MN", "MA", "WY"]] >>> submit_many_jobs( ... "code/myscript.py", ... env_vars=many_env_vars, ... )
Submit a different script per job:
>>> scripts = ["code/step1.py", "code/step2.py", "code/step3.py"] >>> submit_many_jobs( ... scripts, ... env_vars=[{"INPUT": f} for f in scripts], ... )
Collect failing jobs instead of stopping on first failure:
>>> failed = submit_many_jobs( ... "code/myscript.py", ... args=many_args, ... stop_on_error=False, ... ) >>> failed [[2004], [2012]] # two jobs failed
- oi_tools.pbs.wait_for_job(
- job_ids: str | Collection[str],
- *,
- stop_on_error: bool = True,
- wait_for_all: bool = False,
- polling_delay: int | float = 5.0,
Watch the specified job(s) and return when one (or all) finish.
- Parameters:
job_ids (str | Collection[str]) – PBS job ID(s) to monitor.
stop_on_error (bool) – If
True, raiseBatchJobErrorwhen a job exits with non-zero status.wait_for_all (bool) – If
True, block until every job finishes and return the last job ID. IfFalse(default), return as soon as the first job finishes.polling_delay (int | float) – Seconds to wait between status checks.
- Returns:
The job ID and exit status of a finished job. If
wait_for_all=True, this is the last job to complete; otherwise it is the first detected finished job.- Return type:
- Raises:
ValueError – If
job_idsis empty.BatchJobError – If a job exits with non-zero status and
stop_on_errorisTrue.
Examples
Wait for a single job to finish:
>>> job_id, status = wait_for_job("12345.pbs")
Wait for all jobs in a list, ignoring errors:
>>> wait_for_job( ... ["12345.pbs", "12346.pbs"], ... wait_for_all=True, ... stop_on_error=False, ... )