Distributed computing with TokSearch
The mechanism for distributing TokSearch computations is to invoke a TokSearch script using the toksearch_submit
utility. Behind the scenes, toksearch_submit
uses SLURM, and can be run in both interactive and batch modes, utilizing salloc and sbatch respectively. The default mode is to run interactively, using salloc.
When using the Pipeline.compute_ray()
or Pipeline.compute_spark()
methods, the computation will be automatically distributed across the nodes allocated by toksearch_submit
.
How TokSearch uses Ray and Spark under SLURM
When a TokSearch script is run with toksearch_submit
using N nodes, the script will be executed on one of the worker nodes, and the Ray or Spark cluster will be started on the allocated nodes, with the master node running on the same node as the script, and the worker nodes running on the other N-1 nodes.
The base SLURM commands that toksearch_submit runs are of the following form:
For interactive jobs:
salloc -N NUM_NODES --exclusive ${interactive.salloc} \
srun -u --pty -N 1 --exclusive --propagate ${interactive.srun} \
COMMAND_TO_RUN ARGS
For batch jobs:
Options and arguments in CAPS are provided by the user when running
toksearch_submit. The options and arguments in braces
(e.g. ${batch.sbatch}
) are injected from the configuration file (see below).
Configuration file
toksearch_submit
uses YAML a configuration file to specify memory and CPU requirements for the Ray and Spark backends, and to adapt to the site-specific SLURM configuration.
Below, we walk through the different sections of the default config file. The actual file is commented with more detailed information, mirroring the documetation here.
# Job options passed through to salloc/sbatch
# Options for batch jobs (ie those using sbatch)
batch:
sbatch:
- --time=4:00:00
# Options for interactive jobs (ie those using salloc)
interactive:
salloc:
- --time=4:00:00
- --x11=all
srun:
- --overlap
The master process is started with the following command, with sections in braces replaced by the configuration options, and options with underscore + CAPS (e.g. _PORT) filled automatically by toksearch_submit:
srun --nodes=1 --ntasks=1 -w _HOST ${spark.master.srun} \
spark-class org.apache.spark.deploy.master.Master \
--host _HOST \
--port _PORT \
${spark.master.start}
Similarly, the worker process is started with the following command:
srun --nodes=1 --ntasks=1 -w _HOST ${spark.worker.srun} \
spark-class org.apache.spark.deploy.worker.Worker \
-i _HOST \
${spark.worker.start} \
_MASTER_URL
spark:
# Options for the Spark master
master:
srun:
- --overlap
worker:
srun:
- --overlap
start:
- -c
- 10
- -m
- 24G
Ray processes are started using the srun command (when compute_ray is invoked on a Pipeline object inside a SLURM job).
The master process is started with the following command, with sections in braces replaced by the configuration options, and options with underscore + CAPS (e.g. _PORT) filled automatically by toksearch_submit:
srun --nodes=1 --ntasks=1 -w _HOST ${ray.master.srun} \
ray start
--head \
--node-ip-address _IP \
--port _PORT \
--temp-dir _TEMP_DIR \
${ray.master.start}
Similarly, the worker process is started with the following command:
srun --nodes=1 --ntasks=1 -w _HOST ${ray.worker.srun} \
ray start
--node-ip-address _IP \
--address _MASTER_ADDRESS:_MASTER_PORT \
--temp-dir _TEMP_DIR \
${ray.worker.start}
ray:
master:
srun:
- --overlap
- --propagate
# Ray memory options can be adjusted here
#start:
#- --object-store-memory=95000000000
#- --memory=80000000000
worker:
srun:
- --overlap
- --propagate
# Ray memory options can be adjusted here
#start:
#- --object-store-memory=95000000000
#- --memory=80000000000
Example script
This will be demonstrated with an example script:
# toksearch_example.py
import argparse
from toksearch import Pipeline
from toksearch import MdsSignal
def create_pipeline(shots):
ipmhd_signal = MdsSignal(r"\ipmhd", "efit01")
pipeline = Pipeline(shots)
pipeline.fetch("ipmhd", ipmhd_signal)
@pipeline.map
def calc_max_ipmhd(rec):
rec["max_ipmhd"] = np.max(np.abs(rec["ipmhd"]["data"]))
pipeline.keep(["max_ipmhd"])
return pipeline
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("backend", choices=["spark", "ray"])
args = parser.parse_args()
backend = args.backend
num_shots = 10000
shots = list(range(165920, 165920 + num_shots))
pipeline = create_pipeline(shots)
if backend == "ray":
results = pipeline.compute_ray()
else: # spark
results = pipeline.compute_spark()
print(f"Got {len(results)} results using {backend}")
This script will fetch the IPMHD signal from the MDSplus tree efit01
for a range of shots, and calculate the maximum value of the signal. The script will then print the number of results and the backend used.
Running the script with toksearch_submit
First, we'll run with the Ray backend:
with the following output:
salloc -N 3 --exclusive --gres=gpu:volta:1 --x11=all srun -u --pty -N 1 --exclusive --propagate --gres=gpu:volta:0 --overlap python toksearch_example.py ray
salloc: Granted job allocation 16902
salloc: Waiting for resource configuration
salloc: Nodes saga[03-05] are ready for job
Initializing ray with _temp_dir = /tmp/tmp1ei083q3
['10.0.0.43', '10.0.0.44', '10.0.0.45']
STARTING CLUSTER
--nodes=1 --ntasks=1 -w saga03 --gres=gpu:volta:1 --overlap --propagate ray start --block --node-ip-address 10.0.0.43 --temp-dir /tmp/tmp1ei083q3 --port 6543 --head --object-store-memory=95000000000 --memory=80000000000
Ok, started head node
dict_keys(['saga04', 'saga05'])
Starting saga04...
--nodes=1 --ntasks=1 -w saga04 --gres=gpu:volta:1 --overlap --propagate ray start --block --node-ip-address 10.0.0.44 --temp-dir /tmp/tmp1ei083q3 --address=10.0.0.43:6543 --object-store-memory=95000000000 --memory=80000000000
Starting saga05...
--nodes=1 --ntasks=1 -w saga05 --gres=gpu:volta:1 --overlap --propagate ray start --block --node-ip-address 10.0.0.45 --temp-dir /tmp/tmp1ei083q3 --address=10.0.0.43:6543 --object-store-memory=95000000000 --memory=80000000000
********************************************************************************
BATCH 1/1
NUM CPUS: 144
NUM PARTITIONS: 10000
MEDIAN PARTITION SIZE: 1
Got 10000 results using ray
salloc: Relinquishing job allocation 16902
Next, we'll run with the Spark backend:
with the following output:
salloc -N 3 --exclusive --gres=gpu:volta:1 --x11=all srun -u --pty -N 1 --exclusive --propagate --gres=gpu:volta:0 --overlap python toksearch_example.py spark
salloc: Granted job allocation 16908
salloc: Waiting for resource configuration
salloc: Nodes saga[04-06] are ready for job
['10.0.0.44', '10.0.0.45', '10.0.0.46']
STARTING CLUSTER
MASTER IP 10.0.0.44
['--nodes=1', '--ntasks=1', '-w', 'saga04', '--gres=gpu:volta:0', '--overlap', 'spark-class', 'org.apache.spark.deploy.master.Master', '--host', '10.0.0.44', '--port', 7077]
Ok, started head node
dict_keys(['saga04', 'saga05', 'saga06'])
Starting saga04...
['--nodes=1', '--ntasks=1', '-w', 'saga04', '--gres=gpu:volta:1', '--overlap', 'spark-class', 'org.apache.spark.deploy.worker.Worker', '-i', '10.0.0.44', '-c', 48, '-m', '149G', 'spark://10.0.0.44:7077']
Starting saga05...
['--nodes=1', '--ntasks=1', '-w', 'saga05', '--gres=gpu:volta:1', '--overlap', 'spark-class', 'org.apache.spark.deploy.worker.Worker', '-i', '10.0.0.45', '-c', 48, '-m', '149G', 'spark://10.0.0.44:7077']
Starting saga06...
['--nodes=1', '--ntasks=1', '-w', 'saga06', '--gres=gpu:volta:1', '--overlap', 'spark-class', 'org.apache.spark.deploy.worker.Worker', '-i', '10.0.0.46', '-c', 48, '-m', '149G', 'spark://10.0.0.44:7077']
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 09:51:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Got 10000 results using spark
salloc: Relinquishing job allocation 16908