Python

Quickstart

Data passing

The SDK manages the target and source of the data, leaving you only with the decision what data to pass. The target and source of the data are inferred through the pipeline definition.

For this example we let the pipeline be defined as follows:

Pipeline defined as: step-1, step-2 --> step-3

Note

In this example we will name the data we output in the steps. It is also possible to use name=None and obtain the data using the "unnamed" key, which allows you treat the inputs as a collection. Additionally, there is an implied order of data in "unnamed", for more information please read the dedicated connections section.

"""step-1"""
import orchest

data = "Hello, World!"

# Output the data so that step-3 can retrieve it.
orchest.output(data, name="my_string")
"""step-2"""
import orchest

data = [3, 1, 4]

# Output the data so that step-3 can retrieve it.
orchest.output(data, name="my_list")
"""step-3"""
import orchest

# Get the input for step-3, i.e. the output of step-1 and step-2.
input_data = orchest.get_inputs()

print(input_data)
# {
#  "my_list": [3, 1, 4],
#  "my_string": "Hello, World!"
# }

Note

Memory eviction of objects is disabled by default, refer to configuration to learn how to enable it.

Parameters

import orchest

# Get the parameters of the current step.
params = orchest.get_params()  # e.g. params = {"vegetable": "carrot"}

# Add a new parameter and update the step's parameters. The
# parameters now also become visible through the properties pane in
# the UI when clicking on a pipeline step.
params["fruit"] = "apple"
orchest.update_params(params)

Note

Parameters are at the core of jobs, giving a handle to try out different modeling ideas based on a set of variable inputs.

Data sources

Before you can interact with data sources from within your scripts, you have to configure one through the Data sources option in the left menu pane. For more see data sources.

import orchest
import pandas as pd

# Note that the "example-mysql-db" is created in the UI first under
# "Data sources" in the left hand panel.
mysql = orchest.get_datasource("example-mysql-db")

# Use a connection object to execute an SQL query.
with mysql.connect() as conn:
   df = pd.read_sql("SELECT * FROM users", conn)

API

orchest.transfer

Transfer mechanisms to output data and get data.

class orchest.transfer.Serialization

Possible types of serialization.

Types are:

  • ARROW_TABLE
  • ARROW_BATCH
  • PICKLE
orchest.transfer.get_inputs(ignore_failure: bool = False, verbose: bool = False) → Dict[str, Any]

Gets all data sent from incoming steps.

Parameters:
  • ignore_failure – If True then the returned result can have None values if the data of a step could not be retrieved. If False, then this function will fail if any of the incoming steps’s data could not be retrieved. Example: [None, "Hello World!"] vs OutputNotFoundError
  • verbose – If True print all the steps from which the current step has retrieved data.
Returns:

Dictionary with input data for this step. We differentiate between two cases:

  • Named data, which is data that was outputted with a name by any parent step. Named data can be retrieved through the dictionary by its name, e.g. data = get_inputs()["my_name"]. Name collisions will raise an InputNameCollisionError.
  • Unnamed data, which is an ordered list containing all the data that was outputted without a name by the parent steps. Unnamed data can be retrieved by accessing the reserved "unnamed" key. The order of this list depends on the order of the parent steps of the node, which is visible through the GUI.

Example:

# It does not matter how the data was output by parent
# steps. It is resolved automatically by the get_inputs
# method.
{
    "unnamed" : ["Hello World!", (3, 4)],
    "named_1" : "mystring",
    "named_2" : [1, 2, 3]
}

Raises:
  • InputNameCollisionError – Multiple steps have outputted data with the same name.
  • OutputNotFoundError – If no output can be found of the given step_uuid. Either no output was generated or the in-memory object store died (and therefore lost all its data).
  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot determine what inputs to get.

Warning

Only call get_inputs() once! When auto eviction is configured data might no longer be available. Either cache the data or maintain a copy yourself.

orchest.transfer.output(data: Any, name: Optional[str]) → None

Outputs data so that it can be retrieved by the next step.

It first tries to output to memory and if it does not fit in memory, then disk will be used.

Parameters:
  • data – Data to output.
  • name – Name of the output data. As a string, it becomes the name of the data, when None, the data is considered nameless. This affects the way the data can be later retrieved using get_inputs().
Raises:
  • DataInvalidNameError – The name of the output data is invalid, e.g because it is a reserved name ("unnamed") or because it contains a reserved substring.
  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.
  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus data cannot be outputted.

Example

>>> data = "Data I would like to use in my next step"
>>> output(data, name="my_data")

Note

Calling output() multiple times within the same script will overwrite the output, even when using a different output name. You therefore want to be only calling the function once.

orchest.transfer.output_to_disk(data: Any, name: Optional[str], serialization: Optional[orchest.transfer.Serialization] = None) → None

Outputs data to disk.

To manage outputing the data to disk, this function has a side effect:

  • Writes to a HEAD file alongside the actual data file. This file serves as a protocol that returns the timestamp of the latest write to disk via this function alongside the used serialization.
Parameters:
  • data – Data to output to disk.
  • name – Name of the output data. As a string, it becomes the name of the data, when None, the data is considered nameless. This affects the way the data can be later retrieved using get_inputs().
  • serialization – Serialization of the data in case it is already serialized. For possible values see Serialization.
Raises:
  • DataInvalidNameError – The name of the output data is invalid, e.g because it is a reserved name ("unnamed") or because it contains a reserved substring.
  • PipelineDefinitionNotFoundError – If the pipeline definition file could not be found.
  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot determine where to output data to.

Example

>>> data = "Data I would like to use in my next step"
>>> output_to_disk(data, name="my_data")

Note

Calling output_to_disk() multiple times within the same script will overwrite the output, even when using a different output name. You therefore want to be only calling the function once.

orchest.transfer.output_to_memory(data: Any, name: Optional[str], disk_fallback: bool = True) → None

Outputs data to memory.

To manage outputing the data to memory for the user, this function uses metadata to add info to objects inside the plasma store.

Parameters:
  • data – Data to output.
  • name – Name of the output data. As a string, it becomes the name of the data, when None, the data is considered nameless. This affects the way the data can be later retrieved using get_inputs().
  • disk_fallback – If True, then outputing to disk is used when the data does not fit in memory. If False, then a MemoryError is thrown.
Raises:
  • DataInvalidNameError – The name of the output data is invalid, e.g because it is a reserved name ("unnamed") or because it contains a reserved substring.
  • MemoryError – If the data does not fit in memory and disk_fallback=False.
  • OrchestNetworkError – Could not connect to the Config.STORE_SOCKET_NAME, because it does not exist. Which might be because the specified value was wrong or the store died.
  • PipelineDefinitionNotFoundError – If the pipeline definition file could not be found.
  • StepUUIDResolveError – The step’s UUID cannot be resolved and thus it cannot set the correct ID to identify the data in the memory store.

Example

>>> data = "Data I would like to use in my next step"
>>> output_to_memory(data, name="my_data")

Note

Calling output_to_memory() multiple times within the same script will overwrite the output, even when using a different output name. You therefore want to be only calling the function once.

orchest.parameters

Module to interact with the parameter values of pipeline steps.

Parameters are stored in the corresponding pipeline definition file, e.g. pipeline.orchest.

orchest.parameters.get_params() → Dict[str, Any]

Gets the parameters of the current step.

Returns:The parameters of the current step.
orchest.parameters.update_params(params: Dict[str, Any]) → None

Updates the parameters of the current step.

Additionally, you can set new parameters by giving parameters that do not yet exist in the current parameters of the pipeline step.

Internally the updating is done by calling the dict.update method. This further explains the behavior of this method.

Parameters:params – The parameters to update. Either updating their values or adding new parameter keys.

orchest.datasources

class orchest.datasources.AWSObjectStorageS3(data)

Amazon S3 Storage Service datasource.

Parameters:data – Data containing connection_details to format the connection_string (see Attributes section).
s3

An “s3” resource service client. boto3.resource docs.

client

A low-level client representing Amazon Simple Storage Service (S3). boto3.client docs.

bucket

A resource representing an Amazon Simple Storage Service (S3) Bucket. s3.bucket docs.

Example

Print all the objects inside a bucket on S3.

>>> object_storage = AWSObjectStorageS3(data)
>>> for obj in object_storage.bucket.objects.all():
...     print(obj)
s3.ObjectSummary(bucket_name="orchest-s3", key="some-key")
class orchest.datasources.AWSRedshift(data, **kwargs)

AWSRedshift database datasource.

Parameters:
  • data – Data containing connection_details to format the connection_string (see Attributes section).
  • **kwargs – Passed to the sqlalchemy.create_engine method.
connection_string

Format for the connection string. SQLAlchemy calls this the URL (that indicates database dialect). It is passed to sqlalchemy.create_engine as the first positional argument.

Type:str
engine

Underlying engine instance to connect to the database.

Type:sqlalchemy.engine.Engine
connect(**kwargs)

Returns a new sqlalchemy.engine.Connection object.

Directly wraps sqlalchemy.engine.Engine.connect.

For transactions, look into Connection.begin() and Engine.begin() in the SQLAlchemy docs.

Examples

In the example below DB should be substituted with the name of this class.

>>> db = DB(data)
>>> with db.connect() as conn:
...     result = conn.execute("SELECT * FROM users")

Alternatively.

>>> conn = db.connect()
class orchest.datasources.HostDirectory(data)

Host directory data source.

A path from the host that is mounted onto a specific path so it can be accessed from within the pipeline steps.

Parameters:data

Connection information to use the data source. Example:

{
    "name": "<datasource-name>",
    "connection_details": {
        "absolute_host_path": "<path>"
    },
    "source_type": "host-directory"
}
path

Path at which the host directory data source is mounted.

Type:str

Example

List files inside the mounted datasource.

>>> datasource = HostDirectory(data)
>>> os.listdir(datasource.path)
["image-1.png", "file-1.txt"]
class orchest.datasources.MySQL(data, **kwargs)

MySQL database datasource.

Parameters:
  • data – Data containing connection_details to format the connection_string (see Attributes section).
  • **kwargs – Passed to the sqlalchemy.create_engine method.
connection_string

Format for the connection string. SQLAlchemy calls this the URL (that indicates database dialect). It is passed to sqlalchemy.create_engine as the first positional argument.

Type:str
engine

Underlying engine instance to connect to the database.

Type:sqlalchemy.engine.Engine
connect(**kwargs)

Returns a new sqlalchemy.engine.Connection object.

Directly wraps sqlalchemy.engine.Engine.connect.

For transactions, look into Connection.begin() and Engine.begin() in the SQLAlchemy docs.

Examples

In the example below DB should be substituted with the name of this class.

>>> db = DB(data)
>>> with db.connect() as conn:
...     result = conn.execute("SELECT * FROM users")

Alternatively.

>>> conn = db.connect()
class orchest.datasources.PostgreSQL(data, **kwargs)

PostgreSQL database datasource.

Parameters:
  • data – Data containing connection_details to format the connection_string (see Attributes section).
  • **kwargs – Passed to the sqlalchemy.create_engine method.
connection_string

Format for the connection string. SQLAlchemy calls this the URL (that indicates database dialect). It is passed to sqlalchemy.create_engine as the first positional argument.

Type:str
engine

Underlying engine instance to connect to the database.

Type:sqlalchemy.engine.Engine
connect(**kwargs)

Returns a new sqlalchemy.engine.Connection object.

Directly wraps sqlalchemy.engine.Engine.connect.

For transactions, look into Connection.begin() and Engine.begin() in the SQLAlchemy docs.

Examples

In the example below DB should be substituted with the name of this class.

>>> db = DB(data)
>>> with db.connect() as conn:
...     result = conn.execute("SELECT * FROM users")

Alternatively.

>>> conn = db.connect()
orchest.datasources.get_datasource(name: str)

Gets a datasource by name.

The name coincides with the datasource name as defined in the UI on the Orchest platform.

Parameters:name – The name of the datasource.
Returns:A datasource object.
orchest.datasources.get_datasources() → List[str]

Gets a list of all defined and ready to use data sources.

This can be helpful to find the names of the defined datasources, without having to go through the UI.