Python

Quickstart

Data passing

The SDK manages the target and source of the data, leaving you only with the decision what data to pass. The target and source of the data are inferred through the pipeline definition.

For this example we let the pipeline be defined as follows:

Pipeline defined as: step-1, step-2 --> step-3

Note

In this example we will name the data we output in the steps. It is also possible to use name=None and obtain the data using the "unnamed" key, which allows you treat the inputs as a collection. Additionally, there is an implied order of data in "unnamed", for more information please read the dedicated connections section.

"""step-1"""
import orchest

data = "Hello, World!"

# Output the data so that step-3 can retrieve it.
orchest.output(data, name="my_string")
"""step-2"""
import orchest

data = [3, 1, 4]

# Output the data so that step-3 can retrieve it.
orchest.output(data, name="my_list")
"""step-3"""
import orchest

# Get the input for step-3, i.e. the output of step-1 and step-2.
input_data = orchest.get_inputs()

print(input_data)
# {
#  "my_list": [3, 1, 4],
#  "my_string": "Hello, World!"
# }

Note

Memory eviction of objects is disabled by default, refer to configuration to learn how to enable it.

Parameters

import orchest

# Get the parameters of the current step and the pipeline.
fruit = orchest.get_step_param("fruit") # e.g. "apple"
vegetable = orchest.get_pipeline_param("vegetable") # e.g. "carrot"

# Update the step parameter. The updated parameter will be
# visible in the GUI, in the properties pane of the step.
fruit = "kiwi"
orchest.update_step_param("fruit", fruit)

Note

Parameters are at the core of jobs, giving a handle to try out different modeling ideas based on a set of variable inputs.

API

orchest.transfer

Transfer mechanisms to output data and get data.

class orchest.transfer.Serialization(value)

Possible types of serialization.

Types are:

  • ARROW_TABLE

  • ARROW_BATCH

  • PICKLE

orchest.transfer.get_inputs(ignore_failure: bool = False, verbose: bool = False) Dict[str, Any]

Gets all data sent from incoming steps.

Parameters
  • ignore_failure – If True then the returned result can have None values if the data of a step could not be retrieved. If False, then this function will fail if any of the incoming steps’s data could not be retrieved. Example: [None, "Hello World!"] vs OutputNotFoundError

  • verbose – If True print all the steps from which the current step has retrieved data.

Returns

Dictionary with input data for this step. We differentiate between two cases:

  • Named data, which is data that was outputted with a name by any parent step. Named data can be retrieved through the dictionary by its name, e.g. data = get_inputs()["my_name"]. Name collisions will raise an InputNameCollisionError.

  • Unnamed data, which is an ordered list containing all the data that was outputted without a name by the parent steps. Unnamed data can be retrieved by accessing the reserved "unnamed" key. The order of this list depends on the order of the parent steps of the node, which is visible through the GUI, refer to the connections section.

Example:

# It does not matter how the data was outputted by parent
# steps. It is resolved automatically by the `get_inputs`
# method.
{
    "unnamed" : ["Hello World!", (3, 4)],
    "named_1" : "mystring",
    "named_2" : [1, 2, 3]
}

Raises
  • InputNameCollisionError – Multiple steps have outputted data with the same name.

  • OutputNotFoundError – If no output can be found of the given step_uuid. Either no output was generated or the in-memory object store died (and therefore lost all its data).

  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot determine what inputs to get.

Warning

Only call get_inputs() once! When auto eviction is configured data might no longer be available. Either cache the data or maintain a copy yourself.

orchest.transfer.output(data: Any, name: Optional[str]) None

Outputs data so that it can be retrieved by the next step.

It first tries to output to memory and if it does not fit in memory, then disk will be used.

Parameters
  • data – Data to output.

  • name – Name of the output data. As a string, it becomes the name of the data, when None, the data is considered nameless. This affects the way the data can be later retrieved using get_inputs().

Raises
  • DataInvalidNameError – The name of the output data is invalid, e.g because it is a reserved name ("unnamed") or because it contains a reserved substring.

  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.

  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus data cannot be outputted.

Example

>>> data = "Data I would like to use in my next step"
>>> output(data, name="my_data")

Note

Calling output() multiple times within the same script will overwrite the output, even when using a different output name. You therefore want to be only calling the function once.

orchest.transfer.output_to_disk(data: Any, name: Optional[str], serialization: Optional[orchest.transfer.Serialization] = None) None

Outputs data to disk.

To manage outputing the data to disk, this function has a side effect:

  • Writes to a HEAD file alongside the actual data file. This file serves as a protocol that returns the timestamp of the latest write to disk via this function alongside the used serialization.

Parameters
  • data – Data to output to disk.

  • name – Name of the output data. As a string, it becomes the name of the data, when None, the data is considered nameless. This affects the way the data can be later retrieved using get_inputs().

  • serialization – Serialization of the data in case it is already serialized. For possible values see Serialization.

Raises
  • DataInvalidNameError – The name of the output data is invalid, e.g because it is a reserved name ("unnamed") or because it contains a reserved substring.

  • PipelineDefinitionNotFoundError – If the pipeline definition file could not be found.

  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot determine where to output data to.

Example

>>> data = "Data I would like to use in my next step"
>>> output_to_disk(data, name="my_data")

Note

Calling output_to_disk() multiple times within the same script will overwrite the output, even when using a different output name. You therefore want to be only calling the function once.

orchest.transfer.output_to_memory(data: Any, name: Optional[str], disk_fallback: bool = True) None

Outputs data to memory.

To manage outputing the data to memory for the user, this function uses metadata to add info to objects inside the plasma store.

Parameters
  • data – Data to output.

  • name – Name of the output data. As a string, it becomes the name of the data, when None, the data is considered nameless. This affects the way the data can be later retrieved using get_inputs().

  • disk_fallback – If True, then outputing to disk is used when the data does not fit in memory. If False, then a MemoryError is thrown.

Raises
  • DataInvalidNameError – The name of the output data is invalid, e.g because it is a reserved name ("unnamed") or because it contains a reserved substring.

  • MemoryError – If the data does not fit in memory and disk_fallback=False.

  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.

  • PipelineDefinitionNotFoundError – If the pipeline definition file could not be found.

  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot set the correct ID to identify the data in the memory store.

Example

>>> data = "Data I would like to use in my next step"
>>> output_to_memory(data, name="my_data")

Note

Calling output_to_memory() multiple times within the same script will overwrite the output, even when using a different output name. You therefore want to be only calling the function once.

orchest.parameters

Module to interact with the parameter values of pipeline steps.

Parameters are stored in the corresponding pipeline definition file, e.g. pipeline.orchest.

orchest.parameters.get_params() Tuple[dict, dict]

Gets the parameters of the current step and the pipeline.

Returns

A tuple of two elements, where the first is the parameters of the current step, the second is the parameters of the pipeline.

orchest.parameters.get_pipeline_param(name: str) Any

Gets a pipeline parameter by name.

Parameters

name – The pipeline parameter to get.

Returns

The value that was mapped to the pipeline parameter name.

orchest.parameters.get_step_param(name: str) Any

Gets a parameter of the current step by name.

Parameters

name – The step parameter to get.

Returns

The value that was mapped to the step parameter name.

orchest.parameters.update_params(step_params: Optional[dict] = None, pipeline_params: Optional[dict] = None) None

Updates the parameters of the current step and of the pipeline.

Additionally, you can set new parameters by giving parameters that do not yet exist in the current parameters, either of the step or of the pipeline.

Internally the updating is done by calling the dict.update method. This further explains the behavior of this method.

Parameters
  • step_params – The step parameters to update. Either updating their values or adding new parameter keys.

  • pipeline_params – The pipeline parameters to update. Either updating their values or adding new parameter keys.

Warning

Updating the pipeline_params can lead to race conditions, since different steps could be updating them at the same time.

orchest.parameters.update_pipeline_param(name: str, value: Any) None

Updates or sets a pipeline parameter.

Internally the updating is done by calling the dict.update method. This further explains the behavior of this method.

Parameters
  • name – The pipeline parameter to update/set.

  • value – The value that will be set.

Warning

Updating a pipeline parameter can lead to race conditions, since different steps could be updating pipeline parameters at the same time.

orchest.parameters.update_step_param(name: str, value: Any) None

Updates or sets a step parameter.

Internally the updating is done by calling the dict.update method. This further explains the behavior of this method.

Parameters
  • name – The step parameter to update/set.

  • value – The value that will be set.

orchest.services

Module to retrieve information about services.

Service specifications are stored in the corresponding pipeline definition file e.g. pipeline.orchest.

orchest.services.get_service(name) Dict[str, Any]

Gets the service of the pipeline by name.

Returns

A dictionary describing a service.

Example:

{
    "internal_url": service-<service-name>-<identifier>,
    "external_urls": {
        80: "http://{host_name}:{port}/service"
        "-<service-name>-<identifier>_80"
    }
    "base_paths": {
        80: "/service-<service-name>-<identifier>_80"
    }
    ... # user specified service fields
}

where each port specified in the service specification constitutes to one element in the external_urls and base_paths mappings, that map port to external urls and ports to base paths respectively.

Raises

ServiceNotFoundError – The service given by name name could not be found.

orchest.services.get_services() Dict[str, Dict[str, Any]]

Gets the services of the pipeline.

Returns

A dictionary of services, mapping service name to service description. For an example of a service dictionary, see get_service().