Python

Python package to interact with Orchest.

Quickstart

Data passing

For this example we let the pipeline (defined inside the pipeline.json) be as follows:

Pipeline defined as: step-1, step-2 --> step-3

where the order of getting data by step-3 is [step-2, step-1].

Note

The order in which the data is retrieved in step-3 is determined via the UI through the Connections section in the pipeline step properties pane. Order is from top to bottom, where the first element in the list (returned by orchest.transfer.get_inputs()) is the output of the top most step from the Connections.

"""step-1"""
import orchest

data = 'Hello, World!'

# Output the data so that step-3 can retrieve it.
orchest.output(data)
"""step-2"""
import orchest

data = [3, 1, 4]

# Output the data so that step-3 can retrieve it.
orchest.output(data)
"""step-3"""
import orchest

# Get the input for step-3, i.e. the output of step-1 and step-2.
data = orchest.get_inputs()  # data = [[3, 1, 4], 'Hello, World!']

Note

Since memory resources are scarce we have implemented a custom eviction manager when passing data through memory (between pipeline steps). Without it, objects do not get evicted from memory (even when an object has no reference) which will eventually lead to the memory reaching its maximum capacity without any room for new data. The eviction is handled by the memory-server.

Parameters

import orchest

# Get the parameters of the current step.
params = orchest.get_params()  # params = {'vegetable': 'carrot'}

# Add a new parameter and update the step's parameters. The
# parameters now also become visible through the properties pane in
# the UI when clicking on a pipeline step.
params['fruit'] = 'apple'
orchest.update_params(params)

Data sources

Before you can interact with data sources from within your scripts, you have to configure one through the Data sources option in the left menu pane. Please refer to Data sources in the features section.

import orchest
import pandas as pd

# Note that the "example-mysql-db" is created in the UI first under
# "Data sources" in the left hand panel.
mysql = orchest.get_datasource('example-mysql-db')

# Use a connection object to execute an SQL query.
with mysql.connect() as conn:
   df = pd.read_sql('SELECT * FROM users', conn)

API

orchest.transfer

Transfer mechanisms to output data and get data.

orchest.transfer.get_inputs(ignore_failure: bool = False, verbose: bool = False) → List[Any]

Gets all data sent from incoming steps.

Parameters:
  • ignore_failure – If True then the returned result can have None values if the data of a step could not be retrieved. If False, then this function will fail if any of the incoming steps’s data could not be retrieved. Example: [None, 'Hello World!'] vs OutputNotFoundError
  • verbose – If True print all the steps from which the current step has retrieved data.
Returns:

List of all the data in the specified order from the front-end.

Example:

Raises:

StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot determine what inputs to get.

Example

>>> # It does not matter how the data was output in steps 1 and 2.
>>> # It is resolved automatically by the get_inputs method.
>>> data_step_1, data_step_2 = get_inputs()

Warning

Only call get_inputs() once! When auto eviction is configured data might no longer be available. Either cache the data or maintain a copy yourself.

orchest.transfer.get_output_disk(step_uuid: str, serialization: str = 'arrow') → Any

Gets data from disk.

Parameters:
  • step_uuid – The UUID of the step to get output data from.
  • serialization – The serialization of the output. Has to be specified in order to deserialize correctly.
Returns:

Data from the step identified by step_uuid.

Raises:

DiskOutputNotFoundError – If output from step_uuid cannot be found.

orchest.transfer.get_output_memory(step_uuid: str, consumer: Optional[str] = None) → Any

Gets data from memory.

Parameters:
  • step_uuid – The UUID of the step to get output data from.
  • consumer – The consumer of the output data. This is put inside the metadata of an empty object to trigger a notification in the plasma store, which is then used to manage eviction of objects.
Returns:

Data from step identified by step_uuid.

Raises:
  • MemoryOutputNotFoundError – If output from step_uuid cannot be found.
  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.
orchest.transfer.output(data: Any, pickle_fallback: bool = True) → None

Outputs data so that it can be retrieved by the next step.

It first tries to output to memory and if it does not fit in memory, then disk will be used.

Parameters:
  • data – Data to output.
  • pickle_fallback – This option is passed to serialize(). If pyarrow cannot serialize the data, then it will fall back to using pickle. This is helpful for custom data types.
Raises:
  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.
  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus data cannot be outputted.

Example

>>> data = 'Data I would like to use in my next step'
>>> output(data)

Note

Calling output() multiple times within the same script will generally overwrite the output. Therefore want to be only calling the function once.

orchest.transfer.output_to_disk(data: Any, pickle_fallback: bool = True, serialization: Optional[str] = None) → None

Outputs data to disk.

To manage outputing the data to disk, this function has a side effect:

  • Writes to a HEAD file alongside the actual data file. This file serves as a protocol that returns the timestamp of the latest write to disk via this function alongside the used serialization.
Parameters:
  • data – Data to output to disk.
  • pickle_fallback – This option is passed to serialize(). If pyarrow cannot serialize the data, then it will fall back to using pickle. This is helpful for custom data types.
  • serialization – Serialization of the data in case it is already serialized. Currently supported values are: ['arrow', 'arrowpickle'].
Raises:

StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot determine where to output data to.

Example

>>> data = 'Data I would like to use in my next step'
>>> output_to_disk(data)

Note

Calling output_to_disk() multiple times within the same script will overwrite the output. Generally speaking you therefore want to be only calling the function once.

orchest.transfer.output_to_memory(data: Any, pickle_fallback: bool = True, disk_fallback: bool = True) → None

Outputs data to memory.

To manage outputing the data to memory for the user, this function uses metadata to add info to objects inside the plasma store.

Parameters:
  • data – Data to output.
  • pickle_fallback – This option is passed to serialize(). If pyarrow cannot serialize the data, then it will fall back to using pickle. This is helpful for custom data types.
  • disk_fallback – If True, then outputing to disk is used when the data does not fit in memory. If False, then a MemoryError is thrown.
Raises:
  • MemoryError – If the data does not fit in memory and disk_fallback=False.
  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.
  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot set the correct ID to identify the data in the memory store.

Example

>>> data = 'Data I would like to use in my next step'
>>> output_to_memory(data)

Note

Calling output_to_memory() multiple times within the same script will overwrite the output. Generally speaking you therefore want to be only calling the function once.

orchest.transfer.resolve(step_uuid: str, consumer: str = None) → Tuple[Any]

Resolves the most recently used tranfer method of the given step.

Additionally, resolves all the *args and **kwargs the receiving transfer method has to be called with.

Parameters:
  • step_uuid – UUID of the step to resolve its most recent write.
  • consumer – The consumer of the output data. This is put inside the metadata of an empty object to trigger a notification in the plasma store, which is then used to manage eviction of objects.
Returns:

Tuple containing the information of the function to be called to get the most recent data from the step. Additionally, returns fill-in arguments for the function.

Raises:

OutputNotFoundError – If no output can be found of the given step_uuid. Either no output was generated or the in-memory object store died (and therefore lost all its data).

orchest.transfer.resolve_disk(step_uuid: str) → Dict[str, Any]

Returns information of the most recent write to disk.

Resolves via the HEAD file the timestamp (that is used to determine the most recent write) and arguments to call the get_output_disk() method.

Parameters:step_uuid – The UUID of the step to resolve its most recent write to disk.
Returns:Dictionary containing the information of the function to be called to get the most recent data from the step. Additionally, returns fill-in arguments for the function.
Raises:DiskOutputNotFoundError – If output from step_uuid cannot be found.
orchest.transfer.resolve_memory(step_uuid: str, consumer: str = None) → Dict[str, Any]

Returns information of the most recent write to memory.

Resolves the timestamp via the create_time attribute from the info of the plasma store. It also sets the arguments to call the get_output_memory() method with.

Parameters:
  • step_uuid – The UUID of the step to resolve its most recent write to memory.
  • consumer – The consumer of the output data. This is put inside the metadata of an empty object to trigger a notification in the plasma store, which is then used to manage eviction of objects.
Returns:

Dictionary containing the information of the function to be called to get the most recent data from the step. Additionally, returns fill-in arguments for the function.

Raises:
  • MemoryOutputNotFoundError – If output from step_uuid cannot be found.
  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.
orchest.transfer.serialize(data: Any, pickle_fallback: bool = True) → Tuple[pa.SerializedPyObject, str]

Serializes an object using pyarrow.serialize.

Parameters:
  • data – The object/data to be serialized.
  • pickle_fallback – If True fall back on pickle for serialization if pyarrow cannot serialize the object. False to not fall back on pickle.
Returns:

Tuple of the serialized data and the serialization that was used, where 'arrowpickle' stands for that the data was first pickled and then serialized using pyarrow.

Raises:

pa.SerializationCallbackError – If pa.serialize cannot serialize the given data.

Note

pickle does not include the code of custom functions or classes, it only pickles their names. Following to the official Python Docs: “Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised.”

orchest.parameters

Module to interact with the parameters inside the pipeline.json.

orchest.parameters.get_params() → Dict[str, Any]

Gets the parameters of the current step.

Returns:The parameters of the current step.
orchest.parameters.update_params(params: Dict[str, Any]) → Dict[str, Any]

Updates the parameters of the current step.

Additionally, you can set new parameters by giving parameters that do not yet exist in the current parameters of the pipeline step.

Internally the updating is done by calling the dict.update method. This further explains the behavior of this method.

Parameters:params – The parameters to update. Either updating their values or adding new parameter keys.
Returns:The updated parameters mapping.

orchest.datasources

class orchest.datasources.AWSObjectStorageS3(data)

Amazon S3 Storage Service datasource.

Parameters:data – Data containing connection_details to format the connection_string (see Attributes section).
s3

An “s3” resource service client. boto3.resource docs.

client

A low-level client representing Amazon Simple Storage Service (S3). boto3.client docs.

bucket

A resource representing an Amazon Simple Storage Service (S3) Bucket. s3.bucket docs.

Example

Print all the objects inside a bucket on S3.

>>> object_storage = AWSObjectStorageS3(data)
>>> for obj in object_storage.bucket.objects.all():
...     print(obj)
s3.ObjectSummary(bucket_name='orchest-s3', key='some-key')
class orchest.datasources.AWSRedshift(data, **kwargs)

AWSRedshift database datasource.

Parameters:
  • data – Data containing connection_details to format the connection_string (see Attributes section).
  • **kwargs – Passed to the sqlalchemy.create_engine method.
connection_string

Format for the connection string. SQLAlchemy calls this the URL (that indicates database dialect). It is passed to sqlalchemy.create_engine as the first positional argument.

Type:str
engine

Underlying engine instance to connect to the database.

Type:sqlalchemy.engine.Engine
connect(**kwargs)

Returns a new sqlalchemy.engine.Connection object.

Directly wraps sqlalchemy.engine.Engine.connect.

For transactions, look into Connection.begin() and Engine.begin() in the SQLAlchemy docs.

Examples

In the example below DB should be substituted with the name of this class.

>>> db = DB(data)
>>> with db.connect() as conn:
...     result = conn.execute('SELECT * FROM users')

Alternatively.

>>> conn = db.connect()
class orchest.datasources.HostDirectory(data)

Host directory data source.

A path from the host that is mounted onto a specific path so it can be accessed from within the pipeline steps.

Parameters:data

Connection information to use the data source. Example:

{
    'name': '<datasource-name>',
    'connection_details': {
        'absolute_host_path': '<path>'
    },
    'source_type': 'host-directory'
}
path

Path at which the host directory data source is mounted.

Type:str

Example

List files inside the mounted datasource.

>>> datasource = HostDirectory(data)
>>> os.listdir(datasource.path)
['image-1.png', 'file-1.txt']
class orchest.datasources.MySQL(data, **kwargs)

MySQL database datasource.

Parameters:
  • data – Data containing connection_details to format the connection_string (see Attributes section).
  • **kwargs – Passed to the sqlalchemy.create_engine method.
connection_string

Format for the connection string. SQLAlchemy calls this the URL (that indicates database dialect). It is passed to sqlalchemy.create_engine as the first positional argument.

Type:str
engine

Underlying engine instance to connect to the database.

Type:sqlalchemy.engine.Engine
connect(**kwargs)

Returns a new sqlalchemy.engine.Connection object.

Directly wraps sqlalchemy.engine.Engine.connect.

For transactions, look into Connection.begin() and Engine.begin() in the SQLAlchemy docs.

Examples

In the example below DB should be substituted with the name of this class.

>>> db = DB(data)
>>> with db.connect() as conn:
...     result = conn.execute('SELECT * FROM users')

Alternatively.

>>> conn = db.connect()
class orchest.datasources.PostgreSQL(data, **kwargs)

PostgreSQL database datasource.

Parameters:
  • data – Data containing connection_details to format the connection_string (see Attributes section).
  • **kwargs – Passed to the sqlalchemy.create_engine method.
connection_string

Format for the connection string. SQLAlchemy calls this the URL (that indicates database dialect). It is passed to sqlalchemy.create_engine as the first positional argument.

Type:str
engine

Underlying engine instance to connect to the database.

Type:sqlalchemy.engine.Engine
connect(**kwargs)

Returns a new sqlalchemy.engine.Connection object.

Directly wraps sqlalchemy.engine.Engine.connect.

For transactions, look into Connection.begin() and Engine.begin() in the SQLAlchemy docs.

Examples

In the example below DB should be substituted with the name of this class.

>>> db = DB(data)
>>> with db.connect() as conn:
...     result = conn.execute('SELECT * FROM users')

Alternatively.

>>> conn = db.connect()
orchest.datasources.get_datasource(name: str)

Gets a datasource by name.

The name coincides with the datasource name as defined in the UI on the Orchest platform.

Parameters:name – The name of the datasource.
Returns:A datasource object.