API Reference¶
Core API¶
Submitor¶
Submitor(
cluster_name: str,
scheduler: str = "local",
*,
defaults: SubmitorDefaults | None = None,
scheduler_options: SchedulerOptions | None = None,
store: JobStore | None = None,
jobs_dir: str | Path | None = None,
default_retry_policy: RetryPolicy | None = None,
retention_policy: RetentionPolicy | None = None,
profile_name: str | None = None,
event_bus: EventBus | None = None,
)
Primary entry point for job submission and cluster-scoped job management.
Alternative constructor:
Submitor.from_profile(profile_name, config_path=None, ...) -> Submitor
Main methods¶
submit(...) -> JobHandle— submit a job using exactly one ofargv,command, orscriptget(job_id) -> JobRecord— load a persisted record by idlist(include_terminal=False) -> list[JobRecord]— list jobs for the current clusterget_transitions(job_id) -> list[StatusTransition]— load lifecycle transitionsget_retry_family(job_id) -> list[JobRecord]— load all attempts for the same root jobget_dependencies(job_id) -> list[JobDependency]— load persisted Molq dependency edgeswatch(job_ids=None, timeout=None) -> list[JobRecord]— block until jobs reach terminal statescancel(job_id) -> None— cancel the latest active attempt for the job familyrefresh() -> None— reconcile active jobs against the schedulercleanup(dry_run=False, retention_policy=None) -> dict[str, list[str]]— prune old artifacts and recordsdaemon(once=False, interval=5.0, run_cleanup=True) -> None— run the lightweight reconciliation loopon(event, handler) -> None— subscribe to lifecycle eventsoff(event, handler) -> None— unsubscribe a lifecycle handlerclose() -> None— close the underlying store connection
submit(...) signature¶
submitor.submit(
*,
argv: list[str] | None = None,
command: str | None = None,
script: Script | None = None,
resources: JobResources | None = None,
scheduling: JobScheduling | None = None,
execution: JobExecution | None = None,
metadata: dict[str, str] | None = None,
retry: RetryPolicy | None = None,
# logical dependency shortcuts (SLURM, PBS, LSF only)
after_started: list[str] | None = None,
after: list[str] | None = None,
after_failure: list[str] | None = None,
after_success: list[str] | None = None,
) -> JobHandle
Exactly one of argv, command, or script must be provided.
Behavior:
- retries create a new attempt row with a fresh
job_id JobHandle.job_idremains the root Molq job id for the familywatch()andJobHandle.wait()follow the latest attempt when retries are enabled
JobHandle¶
Lightweight handle returned by Submitor.submit().
Fields:
job_idcluster_nameschedulerscheduler_job_id
Methods:
status() -> JobStaterefresh() -> JobHandlewait(timeout=None) -> JobRecordcancel() -> None
JobRecord¶
Immutable snapshot of persisted job state.
Key fields:
job_idroot_job_idattemptprevious_attempt_job_idretry_group_idprofile_namecluster_nameschedulerstatescheduler_job_idsubmitted_atstarted_atfinished_atexit_codefailure_reasoncwdcommand_typecommand_displaymetadatacleaned_at
JobDependency¶
Persisted dependency edge between Molq jobs.
Fields:
job_id— the job that has the dependencydependency_job_id— the upstream job being depended ondependency_type— one of theDependencyConditionvaluesscheduler_dependency— the compiled scheduler-native string (e.g.afterok:12345)
StatusTransition¶
Immutable persisted lifecycle transition.
Fields:
job_idold_statenew_statetimestampreason
SubmitorDefaults¶
SubmitorDefaults(
resources: JobResources | None = None,
scheduling: JobScheduling | None = None,
execution: JobExecution | None = None,
)
Reusable defaults merged into every submission for a given Submitor.
Retry, Retention, and Events¶
RetryBackoff¶
RetryBackoff(
mode: Literal["fixed", "exponential"] = "exponential",
initial_seconds: float = 5.0,
maximum_seconds: float = 300.0,
factor: float = 2.0,
)
RetryPolicy¶
RetryPolicy(
max_attempts: int = 1,
retry_on_states: tuple[JobState, ...] = (JobState.FAILED, JobState.TIMED_OUT),
retry_on_exit_codes: tuple[int, ...] | None = None,
backoff: RetryBackoff = RetryBackoff(),
)
RetentionPolicy¶
RetentionPolicy(
keep_job_dirs_for_days: int = 30,
keep_terminal_records_for_days: int = 90,
keep_failed_job_dirs: bool = True,
)
EventType¶
Lifecycle events emitted through EventBus:
STATUS_CHANGEJOB_STARTEDJOB_COMPLETEDJOB_FAILEDJOB_CANCELLEDJOB_TIMEOUTJOB_TIMED_OUTJOB_LOSTALL_COMPLETED
EventPayload¶
Fields:
eventjob_idtransitionrecorddata
EventBus¶
Methods:
on(event, handler) -> Noneoff(event, handler) -> Noneemit(event, data=None) -> None
Config and Profiles¶
MolqProfile¶
Named profile loaded from ~/.molq/config.toml.
Key fields:
nameschedulercluster_namedefaultsscheduler_optionsretryretentionjobs_dir
MolqConfig¶
Holds profiles: dict[str, MolqProfile].
Config helpers¶
load_config(path=None) -> MolqConfigload_profile(name, path=None) -> MolqProfile
Submission Types¶
Memory¶
Immutable memory quantity stored as bytes.
Memory.kb(n)Memory.mb(n)Memory.gb(n)Memory.tb(n)Memory.parse("32G")to_slurm()to_pbs()to_lsf_kb()
Duration¶
Immutable time quantity stored as seconds.
Duration.minutes(n)Duration.hours(n)Duration.parse("2h30m")Duration.parse("04:00:00")to_slurm()to_pbs()to_lsf_minutes()
Script¶
Immutable script reference.
Script.inline(text)Script.path(path)varianttextfile_path
JobResources¶
JobResources(
cpu_count: int | None = None,
memory: Memory | None = None,
gpu_count: int | None = None,
gpu_type: str | None = None,
time_limit: Duration | None = None,
)
JobScheduling¶
JobScheduling(
queue: str | None = None,
account: str | None = None,
priority: str | None = None,
dependency: str | None = None, # raw scheduler string — mutually exclusive with dependencies
dependencies: tuple[DependencyRef, ...] = (), # logical refs — mutually exclusive with dependency
node_count: int | None = None,
exclusive_node: bool = False,
array_spec: str | None = None,
email: str | None = None,
qos: str | None = None,
reservation: str | None = None,
)
dependency and dependencies are mutually exclusive. Constructing a JobScheduling with both set raises ValueError immediately.
JobExecution¶
JobExecution(
env: dict[str, str] | None = None,
cwd: str | Path | None = None,
job_name: str | None = None,
output_file: str | None = None,
error_file: str | None = None,
)
Job Dependencies¶
Molq supports logical job dependencies for SLURM, PBS, and LSF schedulers.
Dependencies are not supported on the local scheduler.
Conditions¶
DependencyCondition is a type alias for the four valid condition strings:
| Condition | Meaning | SLURM | PBS | LSF |
|---|---|---|---|---|
"after_success" |
Run after upstream succeeded | afterok |
afterok |
done() |
"after_failure" |
Run after upstream failed / cancelled / timed out / lost | afternotok |
afternotok |
exit() |
"after_started" |
Run after upstream began executing | after |
after |
started() |
"after" |
Run after upstream finished (any result) | afterany |
afterany |
ended() |
DependencyRef¶
Describes one upstream dependency using a Molq job ID and a condition.
Defining dependencies — two equivalent approaches¶
Approach A: submit() keyword arguments (recommended for simple cases)
parent = s.submit(argv=["python", "preprocess.py"])
child = s.submit(
argv=["python", "train.py"],
after_success=[parent.job_id],
)
# Multiple upstreams and mixed conditions in one call:
s.submit(
argv=["python", "cleanup.py"],
after_success=[job_a.job_id, job_b.job_id],
after_failure=[job_c.job_id],
)
Approach B: DependencyRef inside JobScheduling (useful when building scheduling objects separately)
from molq import DependencyRef, JobScheduling
s.submit(
argv=["python", "eval.py"],
scheduling=JobScheduling(
queue="gpu",
dependencies=(
DependencyRef(parent.job_id, "after_success"),
DependencyRef(monitor.job_id, "after_started"),
),
),
)
The two approaches are merged before submission. You cannot mix
dependency(raw string) withdependencies(logical refs) in a singleJobScheduling.
Querying dependencies¶
# Edges where job is the dependent (upstream jobs this job waits on)
deps = s.get_dependencies(job_id) # list[JobDependency]
# Edges where job is the upstream (downstream jobs waiting on this job)
dependents = s.get_dependents(job_id) # list[JobDependency]
# Depth-1 preview with satisfaction state
preview = s.get_dependency_preview(job_id) # DependencyPreview
DependencyPreview¶
DependencyPreview(
job_id: str,
upstream_total: int,
upstream_satisfied: int,
upstream: tuple[DependencyPreviewItem, ...],
downstream_total: int,
downstream: tuple[DependencyPreviewItem, ...],
)
DependencyPreviewItem¶
DependencyPreviewItem(
job_id: str,
dependency_type: DependencyCondition,
relation_state: str, # "satisfied" | "pending" | "impossible"
job_state: JobState,
command_display: str,
scheduler_dependency: str | None,
)
dependency_relation_state()¶
from molq import dependency_relation_state
relation = dependency_relation_state(
dependency_type="after_success",
related_state=JobState.SUCCEEDED,
related_started_at=1234567890.0,
)
# → "satisfied"
Evaluates a single dependency edge to "satisfied", "pending", or "impossible".
Raises ValueError for unrecognised condition names.
Scheduler Options¶
molq exports one scheduler options dataclass per backend:
LocalSchedulerOptionsSlurmSchedulerOptionsPBSSchedulerOptionsLSFSchedulerOptions
These objects are validated against the selected scheduler= value in Submitor.
Monitoring and Dashboard¶
Public monitoring helpers exported at package level:
MolqMonitorRunDashboardDashboardStateJobRow
Job State¶
JobState is a string enum with terminal awareness:
CREATEDSUBMITTEDQUEUEDRUNNINGSUCCEEDEDFAILEDCANCELLEDTIMED_OUTLOST
Use state.is_terminal to distinguish active from terminal states.
Errors¶
All public errors inherit from MolqError.
| Exception | Raised when |
|---|---|
ConfigError |
Invalid Submitor configuration |
SubmitError |
Submission fails |
CommandError |
Command specification is invalid |
ScriptError |
Script path is invalid or cannot be prepared |
SchedulerError |
Scheduler interaction fails |
JobNotFoundError |
The requested job id does not exist |
MolqTimeoutError |
A wait operation exceeds the timeout |
StoreError |
Persistence or schema operations fail |