Skip to content

Pipeline

toksearch.Pipeline

Pipeline class for processing data

The Pipeline class is used to process data in a series of steps. The Pipeline class is designed to be used in a functional style, where operations are added to the pipeline in a linear fashion. The pipeline can then be applied to a set of Records objects. The pipeline can be applied in serial, using Ray, using Spark, or using multiprocessing, or using a custom backend. The results of the pipeline are stored in an object that is derived from a RecordSet, which is a list-like object that can be used to access the results of the pipeline.

Methods:

Name Description
from_sql

Initialize a Pipeline using the results of an sql query

__init__

Initialize a Pipeline object

fetch

Add a signal to be fetched by the pipeline

fetch_dataset

Create an xarray dataset field in the record

map

Apply a function to the records from of the previous step in the pipeline, modifying the record in place

keep

Keep only the fields specified in the list

align

Align an xarray dataset with a specified set of coordinates (typically times)

where

Apply a function to the records of the previous step in the pipeline, and keep the record if the result is truthy, remove it otherwise

compute_shot

Run the pipeline for a single shot, returning a record object

compute_record

Apply the pipeline to a record object

compute

Apply the pipeline using a backend

compute_serial

Apply the pipeline serially on the local host

compute_ray

Apply the pipeline using Ray

compute_spark

Apply the pipeline using Spark

compute_multiprocessing

Apply the pipeline using multiprocessing

__init__(parent)

Instantiate a Pipeline object

Parameters:

Name Type Description Default
parent Union[Iterable[Union[int, dict, Record]], Pipeline, RecordSet, PipelineSource]

The parent object. This can be an Iterable, a Pipeline, a RecordSet, or a PipelineSource.

If parent is an Iterable, then the elements of the Iterable must be one of three types: 1) A integer shot number 2) A dictionary containing at least the field "shot" (and not the fields "key" or "errors") 3) A Record object.

If the parent is another Pipeline, then the newly constructed Pipeline will act as a continuation of the parent.

The parent can also be a PipelineSource, although typically this is handled internally.

required

align(ds_name, align_with, dim='times', method='pad', extrapolate=True, interp_kwargs=None)

Align an xarray dataset with a specified set of coordinates

Parameters:

Name Type Description Default
ds_name str

Name of the dataset in the record

required
align_with Union[str, List, ndarray, Callable, float]

The coordinates to align with. This can be a string (which will be interpreted as a field in the dataset), a list (which will be interpreted as a list of values), a numpy array, a callable (which will be called with the dataset and the dim as arguments), or a numeric value (which will be interpreted as a sample period).

required
dim str

The dimension to align along. Default is 'times'

'times'
method str

The method to use for alignment. Default is 'pad', which zero-order holds the data. Other options include 'linear' and 'cubic'.

'pad'
extrapolate bool

Whether to extrapolate data. Default is True.

True
interp_kwargs Optional[dict]

Dict of eyword arguments to pass to the interpolation function provided by xarray. Default is None.

None

compute(recordset_cls, config=None)

Apply the pipeline using a backend defined by recordset_cls

Parameters:

Name Type Description Default
recordset_cls Type[RecordSet]

The class of the RecordSet to use. This should be a subclass of RecordSet.

required
config Optional[object]

Configuration object for the backend RecordSet (e.g. RayConfig if using Ray, MultiprocessingConfig if using multiprocessing, etc.)

None

Returns:

Name Type Description
RecordSet RecordSet

The record set

compute_multiprocessing(num_workers=None, batch_size='auto')

Apply the pipeline using multiprocessing

Parameters:

Name Type Description Default
num_workers Optional[int]

The number of workers to use for parallel processing. If set to None (the default), half the number of CPUs on the machine will be used.

None
batch_size Union[str, int]

The batch size to use for parallel processing, passed to joblib.Parallel. Defaults to "auto".

'auto'

Returns:

Name Type Description
MultiprocessingRecordSet MultiprocessingRecordSet

The record set

compute_ray(numparts=None, batch_size=None, verbose=True, placement_group_func=None, memory_per_shot=None, **ray_init_kwargs)

Apply the pipeline using Ray

Parameters:

Name Type Description Default
numparts Optional[int]

The number of partitions to use when mapping. If not provided, the number of partitions will equal the number of records in the pipeline.

None
batch_size Optional[int]

The number of elements to process in each batch. Defaults to the number of records in the pipeline.

None
verbose bool

Whether to print verbose output. Default is True.

True
placement_group_func Optional[Callable]

A function that returns a placement group. See the ray docs for more information on placement groups.

None
memory_per_shot Optional[int]

Memory to allocate to each shot in bytes. If not provided, there is no limit.

None

Other Parameters:

Name Type Description
**ray_init_kwargs

Keyword arguments to pass to ray.init

Returns:

Name Type Description
RayRecordSet RayRecordSet

The record set

compute_record(record)

Apply the pipeline to a record object

compute_serial()

Apply the pipeline serially on the local host

Returns a SerialRecordSet object

compute_shot(shot)

Run the pipeline for a single shot, returning a record object

Note that an empty record object is first created, and the acted on by the pipeline. If there are prerequiste fields in the record, then the method compute_record should be used to pass a record object directly to the pipeline.

compute_spark(sc=None, numparts=None, cache=False)

Apply the pipeline using Spark

Parameters:

Name Type Description Default
sc Optional[SparkContext]

SparkContext to use. If not provided, a default SparkContext will be created.

None

numparts: Number of partitions to use. If not provided, defaults to the number of records. will be used. cache: Whether to cache the RDD. Default is False.

Returns:

Name Type Description
SparkRecordSet SparkRecordSet

The record set

fetch(name, signal)

Add a signal to be fetched by the pipeline

Appends a field (name) to the record being processed by the pipeline

Parameters:

Name Type Description Default
name str

The name of the field to add to the record that will contain the signal data, dimensions units (if applicable)

required
signal Signal

The signal to fetch. Must be an object that implements the Signal interface.

required

fetch_dataset(name, signals, append=True)

Create an xarray dataset field called name in the record.

signal_dict is a dict of the form name: signal. Each key in signal_dict will become the name of a data var in the resulting dataset.

If the append keyword is set to True, then if name exists, it will be appended to. Otherwise, a new field is created (and any existing data in that field will be lost).

from_sql(conn, query, *query_params) classmethod

Initialize a Pipeline using the results of an sql query

Parameters:

Name Type Description Default
conn Connection

A Connection object, compatible with the Python DB API. This can be, for example, from pyodbc, pymssql, sqlite, etc...

required
query str

A query string. At a minimum, the query must produce rows with the column "shot". The query cannot have columns "key" or "errors" as those are reserved words in a Pipeline. Additionally, if the query has any unnamed column a MissingColumnName exception will be raised.

required
query_params arbitrary type

Optional. Used to pass parameters into a query. The exact query syntax is db-dependent. For SQL server (used for the d3drdb), use either %d or %s as placeholders (it doesn't matter which). For sqlite, ? is used as the placeholder.

()

Examples:

from toksearch import Pipeline
from toksearch.sql.mssql import connect_d3drdb

# See documentation for connect_d3drdb for more details
conn = connect_d3drdb()

# Query without parameters
query = "select shot from shots_type where shot_type = 'plasma'"
pipe = Pipeline.from_sql(conn, query)

# Query with parameters, limiting to shot numbers greater than a
# threshold
threshold = 180000
query = """
    select shot
    from shots_type
    where shot_type = 'plasma' and shot > %d
    """
pipe = Pipeline.from_sql(conn, query, threshold)

keep(fields)

Keep only the specified fields in the record

Parameters:

Name Type Description Default
fields List[str]

List of fields to keep

required

map(func)

Apply func to result of the previous step in the pipeline

Func is expected to be of the form func(record) -> record

where(func)

Apply a func to result in the previous step in the pipeline. If the result of the func is truthy, then keep the record in the pipeline. Otherwise, purge the record from the pipeline.

func must be of the form func(record) -> Truth-like value