Parallelization¶
TokSearch supports multiple methods of processing data after a Pipeline
has been defined. compute_serial
processes each shot in the Pipeline
sequentially. This is fine for small problems. But, if either the number of shots or the amount of data per shot increases, then TokSearch supports multiple methods of parallelizing the computations. At present, three methods support parallel execution of Pipeline
objects:
compute_multiprocessing
: This method uses JobLib to execute thePipeline
using node-local multiprocessing.compute_ray
: Here, the Ray framework is used to execute either node-local parallelization or fully distributed across multiple worker nodes.compute_spark
: Likecompute_ray
, here Apache Spark allows both node-local and fully distributed computation.
Spark and Ray offer similar capabilities and performance when used with TokSearch. In particular, both can be used in fully distributed fashion and parallelize the Pipeline
processing across arbitrarily many compute nodes as long as a cluster is set up properly. Additionally, they each can be used to parallelize within a single node, taking advantage of multi-core systems. For small jobs, though, compute_multiprocessing
method is often the fastest and lightest weight.
Partitioning¶
Pipeline
parallelization, regardless of backend, works by partioning the list of shots into roughly equal length chunks and then assinging each chunk to a worker process. Once all chunks have been processed the resulting records are returned to the calling program and reassembled into a flattened list of Records
.
Examples¶
The following examples show usage of the compute*
methods.
A few prelimimaries first:
We'll import some things that we'll need. Note that we're specifying that the environment variables MKL_NUM_THREADS
, NUMEXPR_NUM_THREADS
, and OMP_NUM_THREADS
should all equal "1". This prevents numpy from using multithreading, which tends to have a negative affect on execution time when running under Spark and Ray. Normally we'd want to set these using a modulefile or in a default environment, but we'll specify them here just to be sure. Note that these variables must be set before numpy is imported.
We'll also create a utility class for timing execution later on.
import time
import os
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
import numpy as np
# Context manager that we can use to time execution
class Timer(object):
def __init__(self):
self.start = None
def __enter__(self):
self.start = time.time()
def __exit__(self, *args):
elapsed = time.time() - self.start
print('Ran in {0:.2f} s'.format(elapsed))
Now we can build our pipeline. The particulars of the pipeline aren't critical. It just retrieves a few signals and performs a simple calculation, then returns just the result of the calculation.
from toksearch import Pipeline, MdsSignal
def create_pipeline(shots):
ipmhd_signal = MdsSignal(r'\ipmhd', 'efit01')
ip_signal = MdsSignal(r'\ipmeas', 'efit01')
pipeline = Pipeline(shots)
pipeline.fetch('ipmhd', ipmhd_signal)
pipeline.fetch('ip', ip_signal)
@pipeline.map
def calc_max_ipmhd(rec):
rec['max_ipmhd'] = np.max(np.abs(rec['ipmhd']['data']))
pipeline.keep(['max_ipmhd'])
return pipeline
Now we specify a non-trivial number of shots and create a list of shot numbers.
num_shots = 2000
shots = list(range(165920, 165920+num_shots))
Finally, we run the different methods.
Serial¶
print('*'*80)
print('RUNNING WITH compute_serial')
pipeline = create_pipeline(shots)
with Timer():
serial_result = pipeline.compute_serial()
******************************************************************************** RUNNING WITH compute_serial Ran in 38.58 s
Multiprocessing¶
print('*'*80)
print('RUNNING WITH compute_multiprocessing')
pipeline = create_pipeline(shots)
with Timer():
multiproc_result = pipeline.compute_multiprocessing()
******************************************************************************** RUNNING WITH compute_multiprocessing Ran in 5.80 s
Ray¶
import ray
# Create a dummy pipeline to initialize ray so we can benchmark without the startup overhead
dummy_res = Pipeline([1,2]).compute_ray()
list(dummy_res)
print('*'*80)
print('RUNNING WITH compute_ray')
pipeline = create_pipeline(shots)
with Timer():
ray_result = pipeline.compute_ray(numparts=8)
list(ray_result)
ray_result.cleanup() # This shuts down the ray cluster
Initializing ray with _temp_dir = /mnt/beegfs/users/sammuli/tmp/tmpj4075hyz ******************************************************************************** RUNNING WITH compute_ray Ran in 9.89 s
Spark¶
print('*'*80)
print('RUNNING WITH compute_spark')
pipeline = create_pipeline(shots)
# Spark results are generated lazily, so calling compute_spark
# just initializes spark. We won't count that in our timing.
spark_result = pipeline.compute_spark(numparts=8)
with Timer():
list(spark_result) # Converting to a list materializes the result
spark_result.cleanup()
******************************************************************************** RUNNING WITH compute_spark MASTER: None [('spark.master', 'None'), ('spark.app.submitTime', '1712249973365'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'pyspark-shell')]
Ran in 9.04 s
Summary¶
Summarizing the results:
Backend | Execution Time (s) | Speedup |
---|---|---|
Serial | 38.58 | - |
Multiprocessing | 5.80 | 6.65x |
Ray | 9.89 | 3.90x |
Spark | 9.04 | 4.27x |
These results were obtained using 8 cores and 8 partitions (specified in the numparts
keyword argument in the compute_spark
and compute_ray
methods) on a saga compute node.
This demonstrates that the multiprocessing backed will often beat Ray or Spark due to the various parallelization overhead issues. But, for much bigger jobs, both Spark and Ray are capable of fully distributed computing that utilizes multiple nodes in a cluster.
Running a TokSearch script that utilizes these capabilities is covered here