Module trase.tools.sps

The Trase SEI-PCS tool is a set of Python classes and functions designed to aid the development of Standardised Spatially Explicit Information on Production to Consumption (sSEI-PCS) models. From the perspective of the person developing the model, the tool:

  1. Helps you load data into the model; even if the data recently changed at its source
  2. Runs quality assurance on the input data: in particular ensuring that datasets can be fully-joined on common columns
  3. Provides useful wrapping functions around Pandas that provides a higher-level abstraction to things we often need to do in SEI-PCS models, such as "consolidation" or "splitting"
  4. Writes out model results in a way that can be directly ingested into Trase's main database

For the whole Trase project, the tool:

  1. Sets some standards about how data is loaded and written so that we can all work more efficiently
  2. Ensures that similar models have very similar-looking code, to reduce the cognitive load on the team
  3. Provides a standard interface to run models so that anybody on the team can run them, even if they didn't originally work on the model

Guiding Principles

The tool is written with a few guiding principles in mind:

  1. Be lightweight (in the sense of a small amount of code) and modular
  2. Set standards by default but don't enforce them, in order to remain flexible to needs
  3. Leverage Pandas 🐼 where possible: do not reinvent existing standards in the data science community

Functions

def append(df: pandas.core.frame.DataFrame, items) ‑> pandas.core.frame.DataFrame
def assert_frame_like(a, b, **kwargs)

Assert two dataframes are equal, ignoring row and column order and index

def cartesian_product(df_a, df_b, suffixes=('_x', '_y')) ‑> pandas.core.frame.DataFrame

Returns the Cartesian product of two dataframes.

For example, given the following two dataframes

| foo |    | bar |
|-----|    |-----|
| a   |    | 1   |
| b   |    | 2   |

The Cartesian product would be:

| foo | bar |
|-----|-----|
| a   | 1   |
| a   | 2   |
| b   | 1   |
| b   | 2   |

The returned dataframe will always have len(df1) * len(df2) rows. In the example above both dataframes have only one column, but this is not a requirement.

Args

suffixes
suffixes which will be applied to columns which appear in both dataframes

Note: this function is superceded in Pandas 1.2+ as pd.merge(how="cross")

def clean_string(text: Optional[str], upper=True)

Take a string and clean it!

  • Remove double-whitespace
  • Remove tab, newline, return, formfeed, etc.
  • Replace accented characters (e.g. ö becomes o)
  • Trim leading and trailing whitespace
  • Convert to upper-case
def compare_dataframes_single(df_a, df_b, value: str, group_by: Optional[Iterable[str]] = None, comparator=<function Compare.signed_symmetric_relative_error>, column='comparison', suffixes=('_a', '_b'), sort_absolute=True, sort='ascending', factor=1) ‑> pandas.core.frame.DataFrame

A wrapper around compare_dataframes() which:

  1. Takes just one value column
  2. Renames the comparison column
  3. Sorts dataframe for you

The intention is that the return value is slightly easier to use since columns only have one level, and it performs sorting for you (which is usually what you want).

Example

>>> compare_dataframes_single(df_a, df_b, "volume", ["exporter"])
          total_a  total_b  comparison
exporter
   BUNGE      100      200         1.0
 CARGILL      500        0        -inf

Args

value
the column which will be compared numerically
group_by
columns over which to group
comparator
a function which takes two series and returns their comparison. You can also pass in the string name of a method on Compare. See Compare for some examples. Defaults to Compare.signed_symmetric_relative_error()
column
the name of the comparison column. Defaults to "comparison"
sort_absolute
whether to use absolute values as the sorting key. Defaults to True
sort : optional
sorting order, either "ascending" (default) or "descending"
factor : optional
multiply the comparison by a factor. The multiplication is done before any sorting occurs.

Returns: a pd.DataFrame with the groups as the index. and three columns named (unless overridden) "total_a", "total_b", and "comparison"

def concat(dfs, *args, sort=False, ignore_index=True, **kwargs)

Some useful defaults for concatenating two dataframes together.

In particular:

  • Do not sort the dataframes.
  • Ignore indexes: we assume the indexes carry not useful information.
  • Validate that all dataframes have the same columns. Because NaNs are really annoying in Pandas.
def consolidate(df: pandas.core.frame.DataFrame, numerical_columns: Iterable[str], categorical_columns: Optional[Iterable[str]] = None, reset_index=True)

Group a dataframe by one or more categorical columns and sum one or more numerical columns.

Example

df = pd.DataFrame({
    "product": ["eggs", "eggs", "bacon"],
    "value":   [  1.99,  1.99,    5.99],
})
consolidate(df, ["value"], ["product"])
# product   value
#   bacon    5.99
#    eggs    4.98
def dataset_path(data_directory, name) ‑> str
def download_file(key, bucket='trase-storage', version_id=None, path=None, client=None, track=True, force=False)

Download an object stored in S3, but skip if the file already exists.

In particular, we skip the download if the local file exists and is newer than the remote or has a different bytesize to the remote.

Examples

>>> download_file("candyland/metadata/assets.csv")
'/var/folders/_m/s00h1rdj3m75ptngsv9tv52w0000gp/T/assets.csv'

Args

key
the key to the object on S3, e.g. "candyland/metadata/assets.csv"
bucket : optional
the S3 bucket, defaults to bucket defined in trase.config.settings
version_id : optional
the specific version ID of the file to download, e.g. "yiJXp1MC0N5U128RoBtlWOcAEjHFVowi"
path : optional
the file path on the local file system to download the file to. If not provided then a file will be made in the temporary directory of the operating system. (Such temporary directories are usually cleared on every restart of the operating system). This path will be checked to determine whether to skip downloading the file.
track : bool, optional
whether to add the object to the S3 tracker
force : bool, optional
whether to re-download the file even if it already exists on the file system.

Returns

The path to the downloaded file

def dumbbell_compare(df_a, df_b, value, group_by, comparator=<function Compare.absolute_error>, labels=('Base', 'Comparison'), colors=('#4e6356', '#ff6a5f'), title_text='', xaxis_title='', yaxis_title='', legend_title='', label_width=15, shorten_end=False, max_rows=15, other_label='OTHER', include_other=True, row_height_px=40) ‑> plotly.graph_objs.Figure

Compare two dataframes and create a "dumbbell"-style plot.

Here is an ASCII-art of what that looks like:

  BUNGE │   *─────*
CARGILL │          *───*
   ADM  │  *─*
  OTHER │  *
        └────────────────
         0   10   20   30

Args

df_a
base DataFrame.
df_b
comparison DataFrame.
value
the name of the column containing numerical data that will be compared. Any values duplicated over group_by will be summed.
group_by
a list of categorical columns to group by.
comparator : optional
a function which takes two series and returns their comparison. You can also pass in the string name of a method on Compare. See Compare for some examples. Defaults to Compare.absolute_error().
labels : optional
name of the two series that will appear in the legend. Default: ("Base", "Comparison").
title_text : optional
title for the chart. Default ""
xaxis_title : optional
title for the X-axis. Default ""
yaxis_title : optional
title for the Y-axis. Default ""
legend_title : optional
title for the legend. Default ""
label_width : optional
the maximum length of a label before it is shortened with "…". Default: 15
shorten_end : optional
if True labels will be shortened from the end ("LONG LA…"); if False they will be shortened from the start (e.g. "…G LABEL"). Default: False
max_rows : optional
how many rows to show, or None if all rows should be shown. Default: 15
other_label : optional
the name of the "other" category; either a string or a tuple of strings of the same length as group_by. Default: "OTHER"
include_other : optional
whether to include the "OTHER". Default: True

Returns: the plotly Figure

def e(header: str, flow_attribute: Optional[str] = None)

Construct an export definition for a column.

This object contains information which is used by the functions which construct the results file and the ingest metadata.

Args

header
the name of the resulting header in the CSV file. This should follow the Trase standard conventions, for example COUNTRY_OF_ORIGIN
flow_attribute
the name of the column in the flow. This must relate to a column in the flow definition. You can refer to linked columns using dot syntax, for example "country.trase_id".
def find_biomes_by_trase_id(df, returning: Iterable[str], node_attributes_ref_id: int, trase_id=Identifier('trase_id'), on_extra_columns='error', cur=None, cnx=None)
def find_default_name_by_node_id(df, returning: Iterable[str], node_id: psycopg2.sql.Composable = Identifier('node_id'), on_extra_columns='error', cur=None, cnx=None)
def find_economic_blocs_by_trase_id(df, returning: Iterable[str], trase_id=Identifier('trase_id'), on_extra_columns='error', cur=None, cnx=None)
def find_nodes_by_code(df, returning: Iterable[str], code_id=Identifier('code_id'), value=Identifier('value'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]
def find_nodes_by_name(df, returning: Iterable[str], name=Identifier('name'), level: psycopg2.sql.Composable = Literal(None), parent_id: psycopg2.sql.Composable = Literal(None), sub_type_id: psycopg2.sql.Composable = Literal(None), type_id: psycopg2.sql.Composable = Literal(None), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]

Performs a lookup into the main.nodes and main.node_names table given some search criteria.

The following search terms are required:

  • name (str)

The following search terms are optional:

  • parent_id (int), which should be an id of the main.nodes table
  • sub_type_id (int), which should be an id of the main.node_sub_types table
  • type_id (int)
  • level (int)
  • year (int)

The function requires a Pandas dataframe. Each row of the dataframe represents one "lookup" in the main schema table.

The search terms themselves can come from one of two sources:

  1. A column in the dataframe, indicated using sql.Identifier
  2. A value that is constant for all rows of the dataframe, indicated using sql.Literal

For example, the following call would get the name search terms using the "name" column in the dataframe, and would use the level=4 for every search:

from psycopg2 import sql

find_nodes_by_name(
    df,
    name=sql.Identifier("name"),
    level=sql.Literal(4),
    ...

The function additionally requires you to specify which information should be returned from the search via the "returning" argument. Supported values are:

  • default_name
  • lookup_value
  • parent_id
  • type_id
  • sub_type_id
  • level
  • year
  • node_id
  • trase_id
  • count

Example usage:

import pandas as pd
from psycopg2 import sql
from trase.tools.pandasdb.find import find_nodes_by_name

df = pd.DataFrame({"country_label": ["COOK ISLAS", "ZIMBAWE", "SAMOA AMERICANA"]})
returning = ["default_name", "node_id"]

result = find_nodes_by_name(
    df,
    returning,
    name=sql.Identifier("country_label"),
    level=sql.Literal(1),
)

print(result)
# [('COOK ISLANDS', 55), ('ZIMBABWE', 211), ('AMERICAN SAMOA', 7)]

df_result = pd.Dataframe(result, columns=returning)

Let's break down the call to find_nodes_by_name():

  1. First we pass the data containing the search terms we want to look up, in the form of a dataframe. This function is designed to take large dataframes:

    find_nodes_by_name(
        df,
    
  2. We say that the function should return the (default) name of the node that it identifies, along with the Node ID:

    returning=["default_name", "node_id"]
    
  3. Next, we say that the data used to match the node name should come from the "country_label" column in the dataframe. Here sql.Identifer means a column in the dataframe:

    name=sql.Identifier("country_label"),
    
  4. Then we say that the level should be 1, which is the level for a country. Here sql.Literal means a value that should be the same for all rows:

    level=sql.Literal(1),
    
  5. The result of the function is a list of tuples, which we can convert to a dataframe

    df_result = pd.Dataframe(result, columns=returning)
    

TODO:

  • Test that columns adhere to postgres standards (or get around this with a hack!)

Args

df
a non-empty Pandas DataFrame
returning
a list of strings
name
a column in the dataframe or literal value (defaults to the column "name")
on_extra_columns
behaviour if the dataframe contains columns which are not used by this function. One of "error", "warn" or "ignore" (defaults to "error").

Returns

A list of tuples containing the search results. The list will be the same length as the input dataframe, and will be in the same order (i.e. the fifth entry in the return value is the result of the fifth row in the input dataframe). If a search result returned zero or not matches then it will contain None values. The tuples will be in the same order as the order of the "returning" argument.

def find_nodes_by_trase_id(df, returning: Iterable[str], trase_id=Identifier('trase_id'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]
def find_parent_regions_by_trase_id(df, returning: Iterable[str], parent_region_level_name: psycopg2.sql.Composable, trase_id=Identifier('trase_id'), on_extra_columns='error', cur=None, cnx=None)
def find_trader_groups_by_trader_id(df, returning: Iterable[str], trader_id=Identifier('trader_id'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]

Example

>>> df = pd.DataFrame({"group_name": ["H B EMPREENDIMENTOS"]})
>>> df_result = find_trader_groups_by_name(
...     df,
...     returning=["node_id"],
... )
>>> print(df_result)
[(1828696,)]
def find_traders_and_groups_by_label(df, returning: Iterable[str], trader_label=Identifier('trader_label'), country_id: psycopg2.sql.Composable = Literal(None), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None)

TODO: unclear if the "count" return type is from the trader lookup or the group lookup. Make this clearer.

def find_traders_and_groups_by_trase_id(df, returning: Iterable[str], trase_id=Identifier('trase_id'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None)
def find_traders_by_label(df, returning: Iterable[str], trader_label=Identifier('trader_label'), country_id: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]

Query the traders hierarchy in the database to clean the trader name and identify the trader group.

The following search terms are required:

  • trader_label (str)

The following search terms are optional:

  • country_id (int), which should be an id of the main.nodes table for a country node (type 'REGION', level 1) when specified

The function requires a Pandas dataframe. Each row of the dataframe represents one "lookup" in the main schema table.

The search terms themselves can come from one of two sources:

  1. A column in the dataframe, indicated using sql.Identifier
  2. A value that is constant for all rows of the dataframe, indicated using sql.Literal

When applying the function to the exporter, the "country_id" can be provided as a sql.Literal argument as it is the same for all rows:

from psycopg2 import sql

find_nodes_by_name(
    df,
    trader_label=sql.Identifier("exporter"),
    country_id=sql.Literal(27),
    ...

When applying the function to the importer, each row will have its own country of import, so the "country_id" should be supplied in the dataframe alongside with "trader_label" as sql.Identifier:

from psycopg2 import sql

find_nodes_by_name(
    df,
    trader_label=sql.Identifier("importer"),
    country_id=sql.Identifier("country_of_import_id"),
    ...

The function additionally requires you to specify which information should be returned from the search via the "returning" argument. Supported values are:

  • trader_id
  • trader_name
  • trase_id
  • lookup_value
  • count

Example usage:

import pandas as pd
from psycopg2 import sql
from trase.tools.pandasdb.find import find_traders_by_label

returning = ["trader_name", "trase_id", "count"]

df = pd.DataFrame({"trader_label": ["CARGILL"]})
result = find_traders_by_label(df, returning)
print(result)
# [
#     (
#         'CARGILL',
#         'BR-TRADER-53404893',
#         1,
#     ),
# ]

df = pd.DataFrame({"trader_label": ["BAYER SA"], "country_id": [12]})
result = find_traders_by_label(df, returning, country_id=sql.Identifier("country_id"))
print(result)
# [
#     (
#         'BAYER S.A.',
#         'AR-TRADER-3050381106',
#         1,
#     ),
# ]

df = pd.DataFrame({"trader_label": ["JOSE GARCIA"], "country_id": [27]})
result = find_traders_by_label(df, returning, country_id=sql.Identifier("country_id"))
print(result)
# [
#     (
#         None,
#         None,
#         8,
#     ),
# ]

Args

df
a non-empty Pandas DataFrame
returning
a list of strings
trader_label
a column in the dataframe (defaults to the column "trader_label")
country_id
a column in the dataframe or literal value (defaults to the column "country_id")
on_extra_columns
behaviour if the dataframe contains columns which are not used by this function. One of "error", "warn" or "ignore" (defaults to "error").

Returns

A list of tuples containing the search results. The list will be the same length as the input dataframe, and will be in the same order (i.e. the fifth entry in the return value is the result of the fifth row in the input dataframe). If a search result returned zero or not matches then it will contain None values. The tuples will be in the same order as the order of the "returning" argument.

def format_float(number, significant_digits=None)
def full_merge(*args, how='outer', indicator=False, **kwargs)

Performs a Pandas merge but adds an additional check that no keys are missing.

Suppose that we have two dataframes, df_production and df_demand

df_production:                    df_demand:

  name  | production               name  | demand
--------+-----------             --------+-------
 BALDIM |         16              BALDIM |     16
 CAXIAS |         73              CAXIAS |     34
 ...    |        ...              ...    |    ...
 PALMAS |        109              PALMAS |     46
 URUOCA |        121              URUOCA |     47

Suppose that we wish to merge production and demand together. Our model expects that merge keys present in the left are present in the right and vice-versa. However, how do we assert this in code? If we don't assert it, we might accidentally lose rows.

This function is a wrapper around pd.merge that will raise an error if any keys are missing on the left or the right.

The behaviour depends on the value of the how parameter:

  • how="outer": error if any keys in the left are not present in the right and vice-versa
  • how="left": error if any keys in the left are not present in the right
  • how="right": error if any keys in the right are not present in the left
  • how="inner": behaviour will match that of pd.merge.

All arguments are passed to pd.merge.

Example:

# let's make two dataframes with keys that don't match
df_a = pd.DataFrame({"name": ["POMBAL"]})
df_b = pd.DataFrame({"name": ["PALMAS"]})

# pandas merge (defaulting to how=inner) will return an empty dataframe
df = pd.merge(df_a, df_b)
df.is_empty  # True

# full_merge (defaulting to how=outer) will raise an error
full_merge(df_a, df_b)  # ValueError: Not all merge keys match:
                        #    name      _merge
                        #  POMBAL   left_only
                        #  PALMAS  right_only
def get_pandas_df(key, bucket='trase-storage', version_id=None, client=<botocore.client.S3 object>, track=True, sep=';', encoding='utf8', xlsx=False, print_version_id=False, **kwargs) ‑> pandas.core.frame.DataFrame
def get_pandas_df_once(key, bucket='trase-storage', version_id=None, client=None, track=True, print_version_id=False, **kwargs) ‑> pandas.core.frame.DataFrame

Load a CSV file on S3 into a Pandas dataframe.

The file will only be downloaded once: thereafter it is stored in the local cache using the joblib library. The cache key includes the ETag of the object, so it will be up-to-date even if the remote object changes content.

All other arguments are passed to get_pandas_df().

def grouped_cartesian_product(df_a, df_b, common: List[str], suffixes=('_x', '_y')) ‑> pandas.core.frame.DataFrame

Like cartesian_product(), but merging on some common columns

def grouped_proportion(df, numerical_column: str, categorical_columns: List[str]) ‑> pandas.core.series.Series

Calculates the proportion of some numerical value relative to a group.

For example, given the following dataframe df:

| company | province | vol
| BIFFI   | ASCOPE   | 100
| ARVUT   | ASCOPE   | 300
| ARVUT   | PUNO     | 500

Calling grouped_proportion(df, "vol", ["province"]) would return:

|  vol |
| 0.25 |
| 0.75 |
| 1.00 |

because 0.25 = 100 / (100 + 300), 0.75 = 300 / (100 + 300), and 1.00 = 500 / 500. The function returns a pd.Series with the same index as the dataframe so that it can be used to add a column:

df["proportion"] = grouped_proportion(df, "vol", ["province"])

Args

numerical_column
string name of the numerical column, for eaxmple "vol"
categorical_columns
list of one or more categorical columns that will be considered to define the "groups"
def histogram_compare(df_a, df_b, value, group_by, comparator=<function Compare.absolute_error>, title_text='', scale_by='none', bins=15, significant_digits=2, decimal_points=0, posinf_label='only in b', neginf_label='only in a')
def pandas_display_context(rows=None, columns=None, float_format=None, precision: int = None, significant_digits=None)

Temporarily alters the display settings that Pandas uses to pretty-print dataframes.

Example

with pd.pandas_display_context(rows=100):
   print(df)  # up to 100 rows will be printed

print(df)  # setting has now been reset to the previous value

Args

rows : int, optional
the maximum number of rows to print before truncating them
columns : int, optional
the maximum number of columns to print before truncating them
float_format : callable, optional
a function that takes a floating-point number and returns a string representation
precision : int, optional
floating-point output precision in terms of number of places after the decimal
significant_digits : int, optional
number of significant figures to print a floating-point number to. This setting will be ignored if float_format is supplied
def pivot(df, index: List, column: str, value: str, agg='sum')

A version of pandas.DataFrame.pivot which allows a list of index names, a feature only added in Pandas > 1.1.0. Once we upgrade Pandas we can delete this function

def prefix_columns(df, prefix='', suffix='')
def pretty_summarise(obj: Union[pandas.core.frame.DataFrame, pandas.core.series.Series, Iterable], index=False, max_rows=5, length=True, prefix='') ‑> str

Returns the the top five values of an iterable, along with the number of items. This is useful for printing a potentially long list of items.

Example

>>> text = pretty_summarise([
...     "Berlin", "Frankfurt", "Heidelberg", "Munich", "Hamburg", "Bremen"
... ])
>>> print(text)
    Berlin
 Frankfurt
   ...
   Hamburg
    Bremen
Length: 6

Args

obj
a Pandas dataframe, Pandas series, or iterable of any kind
index : bool, optional
whether to print the index of the Pandas object (default: False)
max_rows : int, optional
number rows to print before collapsing with "…" (default: 5)
length : bool, optional
whether to print the total number of rows in the object (default: True)

Returns: a string representation of the pretty-printed object

def print_message(message, level='info', end='\n', indent=0, color=None, attrs=None)
def print_report_by_attribute(df, value_column, by, significant_digits=2, printer=<built-in function print>)

Prints a summary of a dataframe when consolidated over one or more categorical columns. The absolute numbers are included as well as a relative percentage.

Example

>>> print_report_by_attribute(df, "vol", ["exporter"])
                     sum percentage
exporter
BUNGE      4,300,000,000        56%
ADM        2,200,000,000        28%
CARGILL    1,300,000,000        16%

Args

value_column
the numerical column to add up. Only one column is supported
by
the categorical column(s) to group over.
significant_digits
number of significant digits to round the numbers to, or None if you do not want rounding to occur (default: 2).
printer
a function which prints to the screen (default: print).
def read_s3_parquet(key, bucket='trase-storage', version_id=None, client=<botocore.client.S3 object>, track=True, print_version_id=False, **kwargs) ‑> pandas.core.frame.DataFrame

Read a Parquet dataset from S3 to a Pandas DataFrame. See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html

Arguments

kwargs: passed to pd.read_parquet

def rename(df, columns)
def report_percentage_by_attribute(df, value_column, by) ‑> pandas.core.frame.DataFrame
def round_to_significant_digits(number: float, significant_digits: int) ‑> float
def run_validation(df: pandas.core.frame.DataFrame, validation: Dict[str, Iterable[Validation]])
def sankey(df, value_column, categorical_columns, title_text='', shorten_end=False, label_width=15) ‑> plotly.graph_objs.Figure

Args

df
the dataframe to be rendered. You do not need to consolidate the dataframe, this will be done for you in the function.
value_column
the column which will be used for the widths of the flows
categorical_columns
the columns which will be used for the columns of the Sankey.
title_text : optional
title for the chart. Default ""
label_width : optional
the maximum length of a label before it is shortened with "…". Default: 15
shorten_end : optional
if True labels will be shortened from the end ("LONG LA…"); if False they will be shortened from the start (e.g. "…G LABEL"). Default: False

Returns: the plotly Figure

def select_minimum(df, by, on, include_id=True)
def select_subset(df, subset)
def set_pandas_display(rows=None, columns=None, float_format=None, precision: int = None, significant_digits=None)

Alters the display settings that Pandas uses to pretty-print dataframes.

When this function is called the setting will be altered globally for the current Python session. To alter a setting temporarily use pandas_display_context() instead.

Example

pd.set_pandas_display(rows=100)
print(df)  # up to 100 rows will be printed

Args

rows : int, optional
the maximum number of rows to print before truncating them
columns : int, optional
the maximum number of columns to print before truncating them
float_format : callable, optional
a function that takes a floating-point number and returns a string representation
precision : int, optional
floating-point output precision in terms of number of places after the decimal
significant_digits : int, optional
number of significant figures to print a floating-point number to. This setting will be ignored if float_format is supplied
def solve_2_step_transportation_problem(supply: pandas.core.series.Series, output: pandas.core.series.Series, demand: pandas.core.series.Series, costs_1: pandas.core.series.Series, costs_2: pandas.core.series.Series, output_deviation_cost_factor=None, solver=None, on_missing: str = 'raise', eq_constraints_1=None, geq_constraints_1=None, leq_constraints_1=None, eq_constraints_2=None, geq_constraints_2=None, leq_constraints_2=None, commodity_ratios=None, solve_kwargs=None)

Solve 2-step transportation problem using linear programming. Suppose a country has M farms ("sources"), N processing facilities ("via") and P ports ("sinks"). Commodities are transported from the farms to the processing facilities, and to the ports. Each farm has a maximum quantity of commodity it can supply, each processing facility has a maximum quantity of commodity it can process, and each port has a demand quantity.

We are given the transportation costs between every pair of farms and processing facilities, and between each pair of processing facilities and ports, and these costs are assumed to be linear to the quantity of commodity.

The problem is to meet the demand at each sink at minimum cost without exhausting the supply at any source, and without exceeding the output at each via point.

If a cost is missing for any (source, via) or (via, sink) pair then it is assumed that delivery is not possible. If a supply output is missing for any source it is assumed to be zero. If demand quantity is missing for any sink it is assumed to be zero.

It is possible to construct a problem where it is impossible to satisfy one or more sinks and/or via points. This happens when there are no reachable sources with a defined capacity. That is to say, there are either no cost entries for any via-sink pair, or not cost entries for any source-via pair connected to the sink, or there are cost entries but there are no supply constraints for any of the sources. In this scenario the function will, by default, raise an error. However you can control the behaviour using the on_missing argument.

Any number of additional constraints (equality, greater or equal, or lower or equal) can be specified for either steps of the path with the …constraints… arguments.

Args

supply
a pd.Series of supply capacities, one row per source
demand
a pd.Series of demand quantities, one row per sink
output
a pd.Series of capacities, one row per via point
costs_1
a pd.Series of costs with a multi-index of (source, via) pairs
costs_2
a pd.Series of costs with a multi-index of (via, sink) pairs
output_deviation_cost_factor
when value is None, the output for all "via" points needs to be met exactly. When value is non-null, any quantity in excess or deficit compared with the output adds a cost to the objective function by multiplying the amount with the cost factor. One way to understand the meaning of the factor is the following: when the factor has value X, sourcing trough a via point V1 without available output is as costly as sourcing through another via point V2 with available output located "further" by X units of transportation cost. For instance, if transportation costs are in minutes, a cost factor of 120 means that it would be cheaper to source through any via point with available output located less than 2 hours further from the final destination than from a via point that has no available output.
eq_constraints_1
a pd.Series of expected allocation between sources and via points.
geq_constraints_1
a pd.Series of lower limit of allocation between sources and via points.
leq_constraints_1
a pd.Series of upper limit of allocation between sources and via points.
eq_constraints_2
a pd.Series of expected allocation between via points and sinks.
geq_constraints_2
a pd.Series of lower limit of allocation between via points and sinks.
leq_constraints_2
a pd.Series of upper limit of allocation between via points and sinks.

Returns

allocation
a pd.Series of quantities with a multi-index of (source, via, sink) pairs
leftover_supply
a pd.Series of unallocated supply, one row per source
leftover_output
a pd.Series of unallocated output, one row per via point
def solve_transportation_problem(supply: pandas.core.series.Series, demand: pandas.core.series.Series, costs: pandas.core.series.Series, solver=None, on_missing: str = 'raise', on_lp_error: str = 'raise', allow_deviation=False, eq_constraints=None, geq_constraints=None, leq_constraints=None, commodity_ratios=None, solve_kwargs=None)
def split_dataframe_using_proportions(df: pandas.core.frame.DataFrame, df_proportions: pandas.core.frame.DataFrame, values: List[str], on: List[str], by: str, where=None, validate=None) ‑> pandas.core.frame.DataFrame
def stitch_dataframes(df_left, df_right, volume_column: str, values_left: List = None, values_right: List = None, fillna='UNKNOWN', indicator=False)

Merge dataframes on common columns, splitting the values in values_left and value_right according to proportions in volume_column.

df_left = pd.DataFrame(
    [
        ("K1", "M1", 10, 55, "good"),
        ("K2", "M1", 5, 32, "great"),
        ("K1", "M2", 2.5, 11, "awesome"),
        ("K2", "M2", 3, 19.2, "awful"),
    ],
    columns=["kab", "mill", "vol", "deforestation", "quality"],
)

df_right = pd.DataFrame(
    [
        ("M1", "T1", "C1", 8, 853, 3),
        ("M1", "T2", "C2", 8, 732, 5),
        ("M2", "T1", "C1", 1, 97, 3),
        ("M2", "T2", "C3", 4, 333, 5),
    ],
    columns=["mill", "exporter", "country", "vol", "fob", "forest_500"],
)

df = stitch_dataframes(
    df_left,
    df_right,
    volume_column="vol",
    values_left=["deforestation"],
    values_right=["fob"],
)
def upload_pandas_df_to_s3(df, new_key, sep=';', encoding='utf8', float_format='%.2f', quotechar='"', bucket_name='trase-storage')

Upload a csv dataset to s3 from a pandas DataFrame

:param df: pandas DataFrame object :param new_key: s3 path :param sep: Separator :param bucket_name: s3 bucket name :param encoding: encoding str :param float_format: format of float columns :param quotechar: quoting character

:return: AWS ServiceResource object

def warn_if_supply_sheds_demand_exceeds_supply(supply: pandas.core.series.Series, demand: pandas.core.series.Series, costs: pandas.core.series.Series)

The total LP supply sometimes appears sufficient to meet the total demand, but the details of the available paths in the cost matrix make it impossible for some of the supply to reach the demand. When such issues occur, the LP just doesn't find a solution, but does not give any hint about what may be the issue. This function identifies supply sheds: groups of sources connected to the same group of sinks, and flags supply sheds for which there is excess demand.

Classes

class Any (first_child: Validation, *other_children: Validation)

Combine other validation functions

Example

``` Any(Code(6), Code(8)) # accept codes that are length six or eight ````

Ancestors

Inherited members

class Code (length: int, allow_x: bool = True)

Has to be a string of a predefined length containing only digits, like "030211".

Example

Code(6)

Args

allow_x
also permit "XXXXXX" (defaults to True).

Ancestors

Class variables

var allow_x : bool
var length : int

Inherited members

class Column (name, rename=None, type=None, keep=True, clean=None, to_upper=None, report=None)

Refer to a column in the input data that should be read and (possibly) also included in the output.

Args

name : str
the name of the column in the input data
rename : str
an optional renaming of the column before it gets included in the output data. If this is the same as the name in the input data, it is not necessary to provide this argument.
type : type
the expected type of the column in the input data; for example int or float. If this argument is not provided then it is assumed to be str. This argument is particularly important for untyped input data such as CSV, since it will trigger a type-cast. Is it still important for typed input data like Excel, since the code should reflect the types that are expected to be encountered in the input data.
keep : bool
whether this column should be included in the output data. This option is for when you want to read the column from the input data so that you can use it in some way but exclude it from the output. (If you neither intend to use the column nor include it in the output, it's recommended to simply not read the column at all).
clean : bool
whether string data should be "cleaned" (double whitespace removed, accented characters removed, etc). Defaults to true if the column type is str.
to_upper : bool
whether string data should be upper-cased. It defaults to whatever value clean takes, and is intended to be provided as an "opt-out" for those scenarios where string cleaning is desired but the case/capitalisation in the input data should be preserved.
report : bool
print a report of the total values in each column during the processing run. Defaults to true for numeric types (int, float).
class c (name: str, type: Type = builtins.str, key: bool = False, link: str = None, value: Optional[Any] = None, conserve: bool = False, validate: Optional[Validation] = None, only_validate_link: bool = False, non_negative: bool = None)

Define a column of a dataset.

Args

name
the name of the column as it appears in the file.
type
one of int, float, str, bool, List[int], etc.
key
indicates that the column should be considered to be part of the "primary key" of the dataset; in particular, that the values (among all key columns) should be unique.
link
of the form "target_dataset.target_column", indicating that this column should be left-joined on to on the "target_column" column in "target_dataset".
value
a default value that the column should be populated with.
conserve
whether this column should conserve its total sum throughout the model.
validate
a class from the trase.tools.sei_pcs.validation model which performs column-level validation. For example validate=Code(6) will check that every value in the column is a six-digit code.
only_validate_link

by default, if you link a target dataset, all columns of that dataset are added as part of the merge. For large datasets this can significantly increase memory. By setting only_validate_link=True, only the target column will be added.

For example, suppose that we have this definition:

datasets = {
    "state": Dataset([
        Column("name"),
        Column("code"),
    ]),
    "asset": Dataset([
        Column("state", link="state.code"),
    ]),
}

Then, the "asset" dataset will have the following columns:

  • state.name
  • state.code

If, however, we pass only_validate_link=True to the link:

datasets = {
    # ...
    "asset": Dataset([
        Column("state", link="state.code", only_validate_link=True),
    ]),
}

then the "asset" dataset will have only one column: the target of the link:

  • state.code

However, the usual link validation will still occur.

non_negative
add a validation that values are not negative. This defaults to true for numeric types and false otherwise.

Class variables

var conserve : bool
var key : bool
var name : str
var non_negative : bool
var type : Type

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

var validate : Optional[Validation]
var value : Optional[Any]

Instance variables

var is_present_in_file

Methods

def all_validators(self) ‑> List[Validation]
class Compare

Static methods

def absolute_error(a, b)

The function |b| - |a|

def absolute_symmetric_relative_error(a, b)
def cosine_similarity(a, b)
def l1_distance(a, b)
def l2_distance(a, b)
def relative_error(a, b)

Implements the function:

b - a
-----
 |a|

Note that this function will be undefined for zero and near-zero values of a

def signed_symmetric_relative_error(a, b)

Implements the below function, where division by zero is +/- infinity:

  |b| - |a|
-------------
min(|a|, |b|)

The motivation for this comparator is to meaningfully handle zero values in either a or b: positive infinity means that a is zero but there is a value in b, and negative infinity means that a has a value but b is zero.

Example

signed_symmetric_relative_error( 10, 10) # 0 signed_symmetric_relative_error(250, 100) # -1.5 signed_symmetric_relative_error(100, 250) # 1.5 signed_symmetric_relative_error(100, 0) # -inf signed_symmetric_relative_error(0, 100) # inf

def signed_total_relative_error(a, b)

Implements the below function:

|b| - |a|
---------
sum(|a|)

The original use case for this comparator was to get the difference in volumes exported to e.g. China relative to the entire export volume for the year.

class Context (year: int, data_directory: str, extra: Optional[dict] = <factory>, pre_extracted_data: Optional[dict] = None)

Simple container for useful information about a model.

This object can be passed around the preparation script, run script, or elsewhere, to provide global variables for a model.

If the variable is really a hard-coded constant, consider putting it in a file like constants.py instead.

Args

year
the year for which the model is related to. This will be used, for example, in conjunction with the data_directory argument to determine where to store input and output data for the model.
data_directory
path to a directory on the local file system which should be used to store input and output data for the model.
extra : optional
a dictionary of extra metadata for the model, such as a model parameter.
pre_extracted_data : optional
dataframes to pass to the preparation files.

Class variables

var data_directory : str
var extra : Optional[dict]
var pre_extracted_data : Optional[dict]
var year : int

Methods

def replace(self, **changes)

Return a copy of the instance except with some fields overwritten.

This method is identical to dataclasses.replace except that it will handle overwriting only a subset of keys of extra:

# we have two pieces of extra information: "complex" and "percent"
old = Context(2013, "/tmp/", extra=dict(complex=True, percent=10))

# we alter percent only
new = old.replace(extra=dict(percent=50))

# percent has updated, but complex is unchanged
new.extra["complex"]  # True
new.extra["percent"]  # 50

If you do not wish to have this behaviour then simply use dataclasses.replace

class Dataset (columns: List[Column])

Dataset(columns: List[trase.tools.sei_pcs.definition.Column])

Class variables

var columns : List[Column]
class Enum (permitted: List)

Must take one of a set of pre-defined values.

Example

Enum(["SOLVED", "UNSOLVED"])

Ancestors

Class variables

var permitted : List

Inherited members

class NoInputDataframePreprocessor (*args, **kwargs)

Create a preprocessor.

Args

context
note that Context.data_directory will be used to store files.

We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.

Ancestors

Class variables

var columns
var delimiter
var original_extension

Methods

def process(self, df)

Inherited members

class Preprocessor (*args, client=None, fallback_bucket='trase-storage', **kwargs)

Ancestors

Methods

def should_rerun(self, args)

Inherited members

class SupplyChain (directory: str, bucket: str = 'trase-storage', *args, **kwargs)

Core object from which supply chain models are run.

This object is responsible for the following:

  • Executing the data preparation script

  • Loading data into memory from CSV files that were produced by the data preparation script, according to the rules in the model definition

  • Providing an interface by which these data can be modifed

  • Executing the model script, which uses the aforementioned interface

  • Generating the "results" file according to the rules in the model definition: a CSV file containing the data as it is after the model has modified it

Args

directory : str
path to the directory containing the definition.py file relative to the "trase/models" directory.
bucket : str, optional
S3 bucket to connect to, defaults to bucket specified in defaults.toml.

Ancestors

Methods

def create_file_name(self, year, country=None, commodity=None)
def get_key_prefix(self, country=None, commodity=None)
def upload_results(self, bucket=None, key_prefix=None, filename=None, country=None, commodity=None, suffix='', dry_run=False)

Inherited members

class TextPreprocessor (*args, client=None, fallback_bucket='trase-storage', **kwargs)

Ancestors

Methods

def should_rerun(self, args)

Inherited members

class TraseGlpkCommand (logPath='glpk.log', msg=False, **kwargs)

This is Pulp's GLPK_CMD but with a trick to capture stdout

Usually GLPK_CMD has two modes:

  1. msg=True, in which case glpk is run as a fork of the current Python process
  2. msg=False, in which case the output of glpl is sent to /dev/null

The intention of 1. is that the stdout of glpk is sent to the user. However, we have an odd problem that is very specific to DeforestationFree where the output is simply lost.

You can reproduce this problem by running the following code:

import os
os.spawnvp(os.P_WAIT, "echo", ["echo", "hi"])

If you run this locally you will see the output "hi". However if you run it in a Jupyter notebook on DeforestationFree you see nothing. I suspect that the output is being sent to the terminal rather than the notebook, perhaps the same problem as https://github.com/jupyterlab/jupyterlab/issues/9668. I hope that an upgrade or re-deployment of JupyterHub will resolve this issue.

Until then I have resorted to this clever trick, which is to patch os.devnull with a filename and pass msg=False. The pulp code runs open(os.devnull, "w") which actually results it in opening the text file!

To use this class do the following:

solver = TraseGlpkCommand()
solve_transportation_problem(supply, demand, costs, solver=solver)

:param bool mip: if False, assume LP even if integer variables :param bool msg: if False, no log is shown :param float timeLimit: maximum time for solver (in seconds) :param list options: list of additional options to pass to solver :param bool keepFiles: if True, files are saved in the current directory and not deleted after solving :param str path: path to the solver binary

Ancestors

  • pulp.apis.glpk_api.GLPK_CMD
  • pulp.apis.core.LpSolver_CMD
  • pulp.apis.core.LpSolver

Methods

def actualSolve(self, lp)

Solve a well formulated lp problem

class ValidationError (*args, **kwargs)

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class Value (value: Any)

Must match a single value

Example

Value("UNKNONWN")

Ancestors

Class variables

var valueAny

Inherited members