Pipeline
toksearch.Pipeline
Pipeline class for processing data
The Pipeline class is used to process data in a series of steps. The Pipeline class is designed to be used in a functional style, where operations are added to the pipeline in a linear fashion. The pipeline can then be applied to a set of Records objects. The pipeline can be applied in serial, using Ray, using Spark, or using multiprocessing, or using a custom backend. The results of the pipeline are stored in an object that is derived from a RecordSet, which is a list-like object that can be used to access the results of the pipeline.
Methods:
Name | Description |
---|---|
from_sql |
Initialize a Pipeline using the results of an sql query |
__init__ |
Initialize a Pipeline object |
fetch |
Add a signal to be fetched by the pipeline |
fetch_dataset |
Create an xarray dataset field in the record |
map |
Apply a function to the records from of the previous step in the pipeline, modifying the record in place |
keep |
Keep only the fields specified in the list |
align |
Align an xarray dataset with a specified set of coordinates (typically times) |
where |
Apply a function to the records of the previous step in the pipeline, and keep the record if the result is truthy, remove it otherwise |
compute_shot |
Run the pipeline for a single shot, returning a record object |
compute_record |
Apply the pipeline to a record object |
compute |
Apply the pipeline using a backend |
compute_serial |
Apply the pipeline serially on the local host |
compute_ray |
Apply the pipeline using Ray |
compute_spark |
Apply the pipeline using Spark |
compute_multiprocessing |
Apply the pipeline using multiprocessing |
__init__(parent)
Instantiate a Pipeline object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parent |
Union[Iterable[Union[int, dict, Record]], Pipeline, RecordSet, PipelineSource]
|
The parent object. This can be an Iterable, a Pipeline, a RecordSet, or a PipelineSource. If parent is an Iterable, then the elements of the Iterable must be one of three types: 1) A integer shot number 2) A dictionary containing at least the field "shot" (and not the fields "key" or "errors") 3) A Record object. If the parent is another Pipeline, then the newly constructed Pipeline will act as a continuation of the parent. The parent can also be a PipelineSource, although typically this is handled internally. |
required |
align(ds_name, align_with, dim='times', method='pad', extrapolate=True, interp_kwargs=None)
Align an xarray dataset with a specified set of coordinates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds_name |
str
|
Name of the dataset in the record |
required |
align_with |
Union[str, List, ndarray, Callable, float]
|
The coordinates to align with. This can be a string (which will be interpreted as a field in the dataset), a list (which will be interpreted as a list of values), a numpy array, a callable (which will be called with the dataset and the dim as arguments), or a numeric value (which will be interpreted as a sample period). |
required |
dim |
str
|
The dimension to align along. Default is 'times' |
'times'
|
method |
str
|
The method to use for alignment. Default is 'pad', which zero-order holds the data. Other options include 'linear' and 'cubic'. |
'pad'
|
extrapolate |
bool
|
Whether to extrapolate data. Default is True. |
True
|
interp_kwargs |
Optional[dict]
|
Dict of eyword arguments to pass to the interpolation function provided by xarray. Default is None. |
None
|
compute(recordset_cls, config=None)
Apply the pipeline using a backend defined by recordset_cls
Parameters:
Name | Type | Description | Default |
---|---|---|---|
recordset_cls |
Type[RecordSet]
|
The class of the RecordSet to use. This should be a subclass of RecordSet. |
required |
config |
Optional[object]
|
Configuration object for the backend RecordSet (e.g. RayConfig if using Ray, MultiprocessingConfig if using multiprocessing, etc.) |
None
|
Returns:
Name | Type | Description |
---|---|---|
RecordSet |
RecordSet
|
The record set |
compute_multiprocessing(num_workers=None, batch_size='auto')
Apply the pipeline using multiprocessing
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_workers |
Optional[int]
|
The number of workers to use for parallel processing. If set to None (the default), half the number of CPUs on the machine will be used. |
None
|
batch_size |
Union[str, int]
|
The batch size to use for parallel processing, passed to joblib.Parallel. Defaults to "auto". |
'auto'
|
Returns:
Name | Type | Description |
---|---|---|
MultiprocessingRecordSet |
MultiprocessingRecordSet
|
The record set |
compute_ray(numparts=None, batch_size=None, verbose=True, placement_group_func=None, memory_per_shot=None, **ray_init_kwargs)
Apply the pipeline using Ray
Parameters:
Name | Type | Description | Default |
---|---|---|---|
numparts |
Optional[int]
|
The number of partitions to use when mapping. If not provided, the number of partitions will equal the number of records in the pipeline. |
None
|
batch_size |
Optional[int]
|
The number of elements to process in each batch. Defaults to the number of records in the pipeline. |
None
|
verbose |
bool
|
Whether to print verbose output. Default is True. |
True
|
placement_group_func |
Optional[Callable]
|
A function that returns a placement group. See the ray docs for more information on placement groups. |
None
|
memory_per_shot |
Optional[int]
|
Memory to allocate to each shot in bytes. If not provided, there is no limit. |
None
|
Other Parameters:
Name | Type | Description |
---|---|---|
**ray_init_kwargs |
Keyword arguments to pass to ray.init |
Returns:
Name | Type | Description |
---|---|---|
RayRecordSet |
RayRecordSet
|
The record set |
compute_record(record)
Apply the pipeline to a record object
compute_serial()
Apply the pipeline serially on the local host
Returns a SerialRecordSet object
compute_shot(shot)
Run the pipeline for a single shot, returning a record object
Note that an empty record object is first created, and the acted on by the pipeline. If there are prerequiste fields in the record, then the method compute_record should be used to pass a record object directly to the pipeline.
compute_spark(sc=None, numparts=None, cache=False)
Apply the pipeline using Spark
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sc |
Optional[SparkContext]
|
SparkContext to use. If not provided, a default SparkContext will be created. |
None
|
numparts: Number of partitions to use. If not provided, defaults to the number of records. will be used. cache: Whether to cache the RDD. Default is False.
Returns:
Name | Type | Description |
---|---|---|
SparkRecordSet |
SparkRecordSet
|
The record set |
fetch(name, signal)
Add a signal to be fetched by the pipeline
Appends a field (name) to the record being processed by the pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the field to add to the record that will contain the signal data, dimensions units (if applicable) |
required |
signal |
Signal
|
The signal to fetch. Must be an object that implements the Signal interface. |
required |
fetch_dataset(name, signals, append=True)
Create an xarray dataset field called name in the record.
signal_dict is a dict of the form name: signal. Each key in signal_dict will become the name of a data var in the resulting dataset.
If the append keyword is set to True, then if name exists, it will be appended to. Otherwise, a new field is created (and any existing data in that field will be lost).
from_sql(conn, query, *query_params)
classmethod
Initialize a Pipeline using the results of an sql query
Parameters:
Name | Type | Description | Default |
---|---|---|---|
conn |
Connection
|
A Connection object, compatible with the Python DB API. This can be, for example, from pyodbc, pymssql, sqlite, etc... |
required |
query |
str
|
A query string. At a minimum, the query must produce rows with the column "shot". The query cannot have columns "key" or "errors" as those are reserved words in a Pipeline. Additionally, if the query has any unnamed column a MissingColumnName exception will be raised. |
required |
query_params |
arbitrary type
|
Optional. Used to pass parameters into a query. The exact query syntax is db-dependent. For SQL server (used for the d3drdb), use either %d or %s as placeholders (it doesn't matter which). For sqlite, ? is used as the placeholder. |
()
|
Examples:
from toksearch import Pipeline
from toksearch.sql.mssql import connect_d3drdb
# See documentation for connect_d3drdb for more details
conn = connect_d3drdb()
# Query without parameters
query = "select shot from shots_type where shot_type = 'plasma'"
pipe = Pipeline.from_sql(conn, query)
# Query with parameters, limiting to shot numbers greater than a
# threshold
threshold = 180000
query = """
select shot
from shots_type
where shot_type = 'plasma' and shot > %d
"""
pipe = Pipeline.from_sql(conn, query, threshold)
keep(fields)
Keep only the specified fields in the record
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fields |
List[str]
|
List of fields to keep |
required |
map(func)
Apply func to result of the previous step in the pipeline
Func is expected to be of the form func(record) -> record
where(func)
Apply a func to result in the previous step in the pipeline. If the result of the func is truthy, then keep the record in the pipeline. Otherwise, purge the record from the pipeline.
func must be of the form func(record) -> Truth-like value