Module trase.tools.sps
The Trase SEI-PCS tool is a set of Python classes and functions designed to aid the development of Standardised Spatially Explicit Information on Production to Consumption (sSEI-PCS) models. From the perspective of the person developing the model, the tool:
- Helps you load data into the model; even if the data recently changed at its source
- Runs quality assurance on the input data: in particular ensuring that datasets can be fully-joined on common columns
- Provides useful wrapping functions around Pandas that provides a higher-level abstraction to things we often need to do in SEI-PCS models, such as "consolidation" or "splitting"
- Writes out model results in a way that can be directly ingested into Trase's main database
For the whole Trase project, the tool:
- Sets some standards about how data is loaded and written so that we can all work more efficiently
- Ensures that similar models have very similar-looking code, to reduce the cognitive load on the team
- Provides a standard interface to run models so that anybody on the team can run them, even if they didn't originally work on the model
Guiding Principles
The tool is written with a few guiding principles in mind:
- Be lightweight (in the sense of a small amount of code) and modular
- Set standards by default but don't enforce them, in order to remain flexible to needs
- Leverage Pandas 🐼 where possible: do not reinvent existing standards in the data science community
Functions
def append(df: pandas.core.frame.DataFrame, items) ‑> pandas.core.frame.DataFramedef assert_frame_like(a, b, **kwargs)-
Assert two dataframes are equal, ignoring row and column order and index
def cartesian_product(df_a, df_b, suffixes=('_x', '_y')) ‑> pandas.core.frame.DataFrame-
Returns the Cartesian product of two dataframes.
For example, given the following two dataframes
| foo | | bar | |-----| |-----| | a | | 1 | | b | | 2 |The Cartesian product would be:
| foo | bar | |-----|-----| | a | 1 | | a | 2 | | b | 1 | | b | 2 |The returned dataframe will always have len(df1) * len(df2) rows. In the example above both dataframes have only one column, but this is not a requirement.
Args
suffixes- suffixes which will be applied to columns which appear in both dataframes
Note: this function is superceded in Pandas 1.2+ as pd.merge(how="cross")
def clean_string(text: Optional[str], upper=True)-
Take a string and clean it!
- Remove double-whitespace
- Remove tab, newline, return, formfeed, etc.
- Replace accented characters (e.g. ö becomes o)
- Trim leading and trailing whitespace
- Convert to upper-case
def compare_dataframes_single(df_a, df_b, value: str, group_by: Optional[Iterable[str]] = None, comparator=<function Compare.signed_symmetric_relative_error>, column='comparison', suffixes=('_a', '_b'), sort_absolute=True, sort='ascending', factor=1) ‑> pandas.core.frame.DataFrame-
A wrapper around
compare_dataframes()which:- Takes just one value column
- Renames the comparison column
- Sorts dataframe for you
The intention is that the return value is slightly easier to use since columns only have one level, and it performs sorting for you (which is usually what you want).
Example
>>> compare_dataframes_single(df_a, df_b, "volume", ["exporter"]) total_a total_b comparison exporter BUNGE 100 200 1.0 CARGILL 500 0 -infArgs
value- the column which will be compared numerically
group_by- columns over which to group
comparator- a function which takes two series and returns their comparison.
You can also pass in the string name of a method on
Compare. SeeComparefor some examples. Defaults toCompare.signed_symmetric_relative_error() column- the name of the comparison column. Defaults to "comparison"
sort_absolute- whether to use absolute values as the sorting key. Defaults to True
sort:optional- sorting order, either "ascending" (default) or "descending"
factor:optional- multiply the comparison by a factor. The multiplication is done before any sorting occurs.
Returns: a
pd.DataFramewith the groups as the index. and three columns named (unless overridden) "total_a", "total_b", and "comparison" def concat(dfs, *args, sort=False, ignore_index=True, **kwargs)-
Some useful defaults for concatenating two dataframes together.
In particular:
- Do not sort the dataframes.
- Ignore indexes: we assume the indexes carry not useful information.
- Validate that all dataframes have the same columns. Because NaNs are really annoying in Pandas.
def consolidate(df: pandas.core.frame.DataFrame, numerical_columns: Iterable[str], categorical_columns: Optional[Iterable[str]] = None, reset_index=True)-
Group a dataframe by one or more categorical columns and sum one or more numerical columns.
Example
df = pd.DataFrame({ "product": ["eggs", "eggs", "bacon"], "value": [ 1.99, 1.99, 5.99], }) consolidate(df, ["value"], ["product"]) # product value # bacon 5.99 # eggs 4.98 def dataset_path(data_directory, name) ‑> strdef download_file(key, bucket='trase-storage', version_id=None, path=None, client=None, track=True, force=False)-
Download an object stored in S3, but skip if the file already exists.
In particular, we skip the download if the local file exists and is newer than the remote or has a different bytesize to the remote.
Examples
>>> download_file("candyland/metadata/assets.csv") '/var/folders/_m/s00h1rdj3m75ptngsv9tv52w0000gp/T/assets.csv'Args
key- the key to the object on S3, e.g. "candyland/metadata/assets.csv"
bucket:optional- the S3 bucket, defaults to bucket defined in trase.config.settings
version_id:optional- the specific version ID of the file to download, e.g. "yiJXp1MC0N5U128RoBtlWOcAEjHFVowi"
path:optional- the file path on the local file system to download the file to. If not provided then a file will be made in the temporary directory of the operating system. (Such temporary directories are usually cleared on every restart of the operating system). This path will be checked to determine whether to skip downloading the file.
track:bool, optional- whether to add the object to the S3 tracker
force:bool, optional- whether to re-download the file even if it already exists on the file system.
Returns
The path to the downloaded file
def dumbbell_compare(df_a, df_b, value, group_by, comparator=<function Compare.absolute_error>, labels=('Base', 'Comparison'), colors=('#4e6356', '#ff6a5f'), title_text='', xaxis_title='', yaxis_title='', legend_title='', label_width=15, shorten_end=False, max_rows=15, other_label='OTHER', include_other=True, row_height_px=40) ‑> plotly.graph_objs.Figure-
Compare two dataframes and create a "dumbbell"-style plot.
Here is an ASCII-art of what that looks like:
BUNGE │ *─────* CARGILL │ *───* ADM │ *─* OTHER │ * └──────────────── 0 10 20 30Args
df_a- base DataFrame.
df_b- comparison DataFrame.
value- the name of the column containing numerical data that will be compared.
Any values duplicated over
group_bywill be summed. group_by- a list of categorical columns to group by.
comparator:optional- a function which takes two series and returns their
comparison. You can also pass in the string name of a method on
Compare. SeeComparefor some examples. Defaults toCompare.absolute_error(). labels:optional- name of the two series that will appear in the legend. Default: ("Base", "Comparison").
title_text:optional- title for the chart. Default ""
xaxis_title:optional- title for the X-axis. Default ""
yaxis_title:optional- title for the Y-axis. Default ""
legend_title:optional- title for the legend. Default ""
label_width:optional- the maximum length of a label before it is shortened with "…". Default: 15
shorten_end:optional- if
Truelabels will be shortened from the end ("LONG LA…"); ifFalsethey will be shortened from the start (e.g. "…G LABEL"). Default: False max_rows:optional- how many rows to show, or
Noneif all rows should be shown. Default: 15 other_label:optional- the name of the "other" category; either a string or a
tuple of strings of the same length as
group_by. Default: "OTHER" include_other:optional- whether to include the "OTHER". Default: True
Returns: the plotly Figure
def e(header: str, flow_attribute: Optional[str] = None)-
Construct an export definition for a column.
This object contains information which is used by the functions which construct the results file and the ingest metadata.
Args
header- the name of the resulting header in the CSV file. This should follow the Trase standard conventions, for example COUNTRY_OF_ORIGIN
flow_attribute- the name of the column in the flow. This must relate to a column in the flow definition. You can refer to linked columns using dot syntax, for example "country.trase_id".
def find_biomes_by_trase_id(df, returning: Iterable[str], node_attributes_ref_id: int, trase_id=Identifier('trase_id'), on_extra_columns='error', cur=None, cnx=None)def find_default_name_by_node_id(df, returning: Iterable[str], node_id: psycopg2.sql.Composable = Identifier('node_id'), on_extra_columns='error', cur=None, cnx=None)def find_economic_blocs_by_trase_id(df, returning: Iterable[str], trase_id=Identifier('trase_id'), on_extra_columns='error', cur=None, cnx=None)def find_nodes_by_code(df, returning: Iterable[str], code_id=Identifier('code_id'), value=Identifier('value'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]def find_nodes_by_name(df, returning: Iterable[str], name=Identifier('name'), level: psycopg2.sql.Composable = Literal(None), parent_id: psycopg2.sql.Composable = Literal(None), sub_type_id: psycopg2.sql.Composable = Literal(None), type_id: psycopg2.sql.Composable = Literal(None), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]-
Performs a lookup into the
main.nodesandmain.node_namestable given some search criteria.The following search terms are required:
- name (str)
The following search terms are optional:
- parent_id (int), which should be an id of the
main.nodestable - sub_type_id (int), which should be an id of the
main.node_sub_typestable - type_id (int)
- level (int)
- year (int)
The function requires a Pandas dataframe. Each row of the dataframe represents one "lookup" in the main schema table.
The search terms themselves can come from one of two sources:
- A column in the dataframe, indicated using
sql.Identifier - A value that is constant for all rows of the dataframe, indicated using
sql.Literal
For example, the following call would get the name search terms using the "name" column in the dataframe, and would use the level=4 for every search:
from psycopg2 import sql find_nodes_by_name( df, name=sql.Identifier("name"), level=sql.Literal(4), ...The function additionally requires you to specify which information should be returned from the search via the "returning" argument. Supported values are:
- default_name
- lookup_value
- parent_id
- type_id
- sub_type_id
- level
- year
- node_id
- trase_id
- count
Example usage:
import pandas as pd from psycopg2 import sql from trase.tools.pandasdb.find import find_nodes_by_name df = pd.DataFrame({"country_label": ["COOK ISLAS", "ZIMBAWE", "SAMOA AMERICANA"]}) returning = ["default_name", "node_id"] result = find_nodes_by_name( df, returning, name=sql.Identifier("country_label"), level=sql.Literal(1), ) print(result) # [('COOK ISLANDS', 55), ('ZIMBABWE', 211), ('AMERICAN SAMOA', 7)] df_result = pd.Dataframe(result, columns=returning)Let's break down the call to
find_nodes_by_name():-
First we pass the data containing the search terms we want to look up, in the form of a dataframe. This function is designed to take large dataframes:
find_nodes_by_name( df, -
We say that the function should return the (default) name of the node that it identifies, along with the Node ID:
returning=["default_name", "node_id"] -
Next, we say that the data used to match the node name should come from the "country_label" column in the dataframe. Here
sql.Identifermeans a column in the dataframe:name=sql.Identifier("country_label"), -
Then we say that the level should be 1, which is the level for a country. Here
sql.Literalmeans a value that should be the same for all rows:level=sql.Literal(1), -
The result of the function is a list of tuples, which we can convert to a dataframe
df_result = pd.Dataframe(result, columns=returning)
TODO:
- Test that columns adhere to postgres standards (or get around this with a hack!)
Args
df- a non-empty Pandas DataFrame
returning- a list of strings
name- a column in the dataframe or literal value (defaults to the column "name")
on_extra_columns- behaviour if the dataframe contains columns which are not used by this function. One of "error", "warn" or "ignore" (defaults to "error").
Returns
A list of tuples containing the search results. The list will be the same length as the input dataframe, and will be in the same order (i.e. the fifth entry in the return value is the result of the fifth row in the input dataframe). If a search result returned zero or not matches then it will contain None values. The tuples will be in the same order as the order of the "returning" argument.
def find_nodes_by_trase_id(df, returning: Iterable[str], trase_id=Identifier('trase_id'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]def find_parent_regions_by_trase_id(df, returning: Iterable[str], parent_region_level_name: psycopg2.sql.Composable, trase_id=Identifier('trase_id'), on_extra_columns='error', cur=None, cnx=None)def find_trader_groups_by_trader_id(df, returning: Iterable[str], trader_id=Identifier('trader_id'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]-
Example
>>> df = pd.DataFrame({"group_name": ["H B EMPREENDIMENTOS"]}) >>> df_result = find_trader_groups_by_name( ... df, ... returning=["node_id"], ... ) >>> print(df_result) [(1828696,)] def find_traders_and_groups_by_label(df, returning: Iterable[str], trader_label=Identifier('trader_label'), country_id: psycopg2.sql.Composable = Literal(None), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None)-
TODO: unclear if the "count" return type is from the trader lookup or the group lookup. Make this clearer.
def find_traders_and_groups_by_trase_id(df, returning: Iterable[str], trase_id=Identifier('trase_id'), year: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None)def find_traders_by_label(df, returning: Iterable[str], trader_label=Identifier('trader_label'), country_id: psycopg2.sql.Composable = Literal(None), on_extra_columns='error', cur=None, cnx=None) ‑> List[Tuple]-
Query the traders hierarchy in the database to clean the trader name and identify the trader group.
The following search terms are required:
- trader_label (str)
The following search terms are optional:
- country_id (int), which should be an id of the
main.nodestable for a country node (type 'REGION', level 1) when specified
The function requires a Pandas dataframe. Each row of the dataframe represents one "lookup" in the main schema table.
The search terms themselves can come from one of two sources:
- A column in the dataframe, indicated using
sql.Identifier - A value that is constant for all rows of the dataframe, indicated using
sql.Literal
When applying the function to the exporter, the "country_id" can be provided as a
sql.Literalargument as it is the same for all rows:from psycopg2 import sql find_nodes_by_name( df, trader_label=sql.Identifier("exporter"), country_id=sql.Literal(27), ...When applying the function to the importer, each row will have its own country of import, so the "country_id" should be supplied in the dataframe alongside with "trader_label" as
sql.Identifier:from psycopg2 import sql find_nodes_by_name( df, trader_label=sql.Identifier("importer"), country_id=sql.Identifier("country_of_import_id"), ...The function additionally requires you to specify which information should be returned from the search via the "returning" argument. Supported values are:
- trader_id
- trader_name
- trase_id
- lookup_value
- count
Example usage:
import pandas as pd from psycopg2 import sql from trase.tools.pandasdb.find import find_traders_by_label returning = ["trader_name", "trase_id", "count"] df = pd.DataFrame({"trader_label": ["CARGILL"]}) result = find_traders_by_label(df, returning) print(result) # [ # ( # 'CARGILL', # 'BR-TRADER-53404893', # 1, # ), # ] df = pd.DataFrame({"trader_label": ["BAYER SA"], "country_id": [12]}) result = find_traders_by_label(df, returning, country_id=sql.Identifier("country_id")) print(result) # [ # ( # 'BAYER S.A.', # 'AR-TRADER-3050381106', # 1, # ), # ] df = pd.DataFrame({"trader_label": ["JOSE GARCIA"], "country_id": [27]}) result = find_traders_by_label(df, returning, country_id=sql.Identifier("country_id")) print(result) # [ # ( # None, # None, # 8, # ), # ]Args
df- a non-empty Pandas DataFrame
returning- a list of strings
trader_label- a column in the dataframe (defaults to the column "trader_label")
country_id- a column in the dataframe or literal value (defaults to the column "country_id")
on_extra_columns- behaviour if the dataframe contains columns which are not used by this function. One of "error", "warn" or "ignore" (defaults to "error").
Returns
A list of tuples containing the search results. The list will be the same length as the input dataframe, and will be in the same order (i.e. the fifth entry in the return value is the result of the fifth row in the input dataframe). If a search result returned zero or not matches then it will contain None values. The tuples will be in the same order as the order of the "returning" argument.
def format_float(number, significant_digits=None)def full_merge(*args, how='outer', indicator=False, **kwargs)-
Performs a Pandas merge but adds an additional check that no keys are missing.
Suppose that we have two dataframes, df_production and df_demand
df_production: df_demand: name | production name | demand --------+----------- --------+------- BALDIM | 16 BALDIM | 16 CAXIAS | 73 CAXIAS | 34 ... | ... ... | ... PALMAS | 109 PALMAS | 46 URUOCA | 121 URUOCA | 47Suppose that we wish to merge production and demand together. Our model expects that merge keys present in the left are present in the right and vice-versa. However, how do we assert this in code? If we don't assert it, we might accidentally lose rows.
This function is a wrapper around
pd.mergethat will raise an error if any keys are missing on the left or the right.The behaviour depends on the value of the
howparameter:how="outer": error if any keys in the left are not present in the right and vice-versahow="left": error if any keys in the left are not present in the righthow="right": error if any keys in the right are not present in the lefthow="inner": behaviour will match that ofpd.merge.
All arguments are passed to
pd.merge.Example:
# let's make two dataframes with keys that don't match df_a = pd.DataFrame({"name": ["POMBAL"]}) df_b = pd.DataFrame({"name": ["PALMAS"]}) # pandas merge (defaulting to how=inner) will return an empty dataframe df = pd.merge(df_a, df_b) df.is_empty # True # full_merge (defaulting to how=outer) will raise an error full_merge(df_a, df_b) # ValueError: Not all merge keys match: # name _merge # POMBAL left_only # PALMAS right_only def get_pandas_df(key, bucket='trase-storage', version_id=None, client=<botocore.client.S3 object>, track=True, sep=';', encoding='utf8', xlsx=False, print_version_id=False, **kwargs) ‑> pandas.core.frame.DataFrame-
Read a CSV or XLSX dataset from S3 to a Pandas DataFrame. See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
def get_pandas_df_once(key, bucket='trase-storage', version_id=None, client=None, track=True, print_version_id=False, **kwargs) ‑> pandas.core.frame.DataFrame-
Load a CSV file on S3 into a Pandas dataframe.
The file will only be downloaded once: thereafter it is stored in the local cache using the joblib library. The cache key includes the ETag of the object, so it will be up-to-date even if the remote object changes content.
All other arguments are passed to
get_pandas_df(). def grouped_cartesian_product(df_a, df_b, common: List[str], suffixes=('_x', '_y')) ‑> pandas.core.frame.DataFrame-
Like
cartesian_product(), but merging on some common columns def grouped_proportion(df, numerical_column: str, categorical_columns: List[str]) ‑> pandas.core.series.Series-
Calculates the proportion of some numerical value relative to a group.
For example, given the following dataframe
df:| company | province | vol | BIFFI | ASCOPE | 100 | ARVUT | ASCOPE | 300 | ARVUT | PUNO | 500Calling
grouped_proportion(df, "vol", ["province"])would return:| vol | | 0.25 | | 0.75 | | 1.00 |because 0.25 = 100 / (100 + 300), 0.75 = 300 / (100 + 300), and 1.00 = 500 / 500. The function returns a
pd.Serieswith the same index as the dataframe so that it can be used to add a column:df["proportion"] = grouped_proportion(df, "vol", ["province"])Args
numerical_column- string name of the numerical column, for eaxmple "vol"
categorical_columns- list of one or more categorical columns that will be considered to define the "groups"
def histogram_compare(df_a, df_b, value, group_by, comparator=<function Compare.absolute_error>, title_text='', scale_by='none', bins=15, significant_digits=2, decimal_points=0, posinf_label='only in b', neginf_label='only in a')def pandas_display_context(rows=None, columns=None, float_format=None, precision: int = None, significant_digits=None)-
Temporarily alters the display settings that Pandas uses to pretty-print dataframes.
Example
with pd.pandas_display_context(rows=100): print(df) # up to 100 rows will be printed print(df) # setting has now been reset to the previous valueArgs
rows:int, optional- the maximum number of rows to print before truncating them
columns:int, optional- the maximum number of columns to print before truncating them
float_format:callable, optional- a function that takes a floating-point number and returns a string representation
precision:int, optional- floating-point output precision in terms of number of places after the decimal
significant_digits:int, optional- number of significant figures to print a floating-point number to. This setting will be ignored if float_format is supplied
def pivot(df, index: List, column: str, value: str, agg='sum')-
A version of
pandas.DataFrame.pivotwhich allows a list of index names, a feature only added in Pandas > 1.1.0. Once we upgrade Pandas we can delete this function def prefix_columns(df, prefix='', suffix='')def pretty_summarise(obj: Union[pandas.core.frame.DataFrame, pandas.core.series.Series, Iterable], index=False, max_rows=5, length=True, prefix='') ‑> str-
Returns the the top five values of an iterable, along with the number of items. This is useful for printing a potentially long list of items.
Example
>>> text = pretty_summarise([ ... "Berlin", "Frankfurt", "Heidelberg", "Munich", "Hamburg", "Bremen" ... ]) >>> print(text) Berlin Frankfurt ... Hamburg Bremen Length: 6Args
obj- a Pandas dataframe, Pandas series, or iterable of any kind
index:bool, optional- whether to print the index of the Pandas object (default: False)
max_rows:int, optional- number rows to print before collapsing with "…" (default: 5)
length:bool, optional- whether to print the total number of rows in the object (default: True)
Returns: a string representation of the pretty-printed object
def print_message(message, level='info', end='\n', indent=0, color=None, attrs=None)def print_report_by_attribute(df, value_column, by, significant_digits=2, printer=<built-in function print>)-
Prints a summary of a dataframe when consolidated over one or more categorical columns. The absolute numbers are included as well as a relative percentage.
Example
>>> print_report_by_attribute(df, "vol", ["exporter"]) sum percentage exporter BUNGE 4,300,000,000 56% ADM 2,200,000,000 28% CARGILL 1,300,000,000 16%Args
value_column- the numerical column to add up. Only one column is supported
by- the categorical column(s) to group over.
significant_digits- number of significant digits to round the numbers to, or
Noneif you do not want rounding to occur (default: 2). printer- a function which prints to the screen (default:
print).
def read_s3_parquet(key, bucket='trase-storage', version_id=None, client=<botocore.client.S3 object>, track=True, print_version_id=False, **kwargs) ‑> pandas.core.frame.DataFrame-
Read a Parquet dataset from S3 to a Pandas DataFrame. See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html
Arguments
kwargs: passed to pd.read_parquet
def rename(df, columns)def report_percentage_by_attribute(df, value_column, by) ‑> pandas.core.frame.DataFramedef round_to_significant_digits(number: float, significant_digits: int) ‑> floatdef run_validation(df: pandas.core.frame.DataFrame, validation: Dict[str, Iterable[Validation]])def sankey(df, value_column, categorical_columns, title_text='', shorten_end=False, label_width=15) ‑> plotly.graph_objs.Figure-
Args
df- the dataframe to be rendered. You do not need to consolidate the dataframe, this will be done for you in the function.
value_column- the column which will be used for the widths of the flows
categorical_columns- the columns which will be used for the columns of the Sankey.
title_text:optional- title for the chart. Default ""
label_width:optional- the maximum length of a label before it is shortened with "…". Default: 15
shorten_end:optional- if
Truelabels will be shortened from the end ("LONG LA…"); ifFalsethey will be shortened from the start (e.g. "…G LABEL"). Default: False
Returns: the plotly Figure
def select_minimum(df, by, on, include_id=True)def select_subset(df, subset)def set_pandas_display(rows=None, columns=None, float_format=None, precision: int = None, significant_digits=None)-
Alters the display settings that Pandas uses to pretty-print dataframes.
When this function is called the setting will be altered globally for the current Python session. To alter a setting temporarily use
pandas_display_context()instead.Example
pd.set_pandas_display(rows=100) print(df) # up to 100 rows will be printedArgs
rows:int, optional- the maximum number of rows to print before truncating them
columns:int, optional- the maximum number of columns to print before truncating them
float_format:callable, optional- a function that takes a floating-point number and returns a string representation
precision:int, optional- floating-point output precision in terms of number of places after the decimal
significant_digits:int, optional- number of significant figures to print a floating-point number to. This setting will be ignored if float_format is supplied
def solve_2_step_transportation_problem(supply: pandas.core.series.Series, output: pandas.core.series.Series, demand: pandas.core.series.Series, costs_1: pandas.core.series.Series, costs_2: pandas.core.series.Series, output_deviation_cost_factor=None, solver=None, on_missing: str = 'raise', eq_constraints_1=None, geq_constraints_1=None, leq_constraints_1=None, eq_constraints_2=None, geq_constraints_2=None, leq_constraints_2=None, commodity_ratios=None, solve_kwargs=None)-
Solve 2-step transportation problem using linear programming. Suppose a country has M farms ("sources"), N processing facilities ("via") and P ports ("sinks"). Commodities are transported from the farms to the processing facilities, and to the ports. Each farm has a maximum quantity of commodity it can supply, each processing facility has a maximum quantity of commodity it can process, and each port has a demand quantity.
We are given the transportation costs between every pair of farms and processing facilities, and between each pair of processing facilities and ports, and these costs are assumed to be linear to the quantity of commodity.
The problem is to meet the demand at each sink at minimum cost without exhausting the supply at any source, and without exceeding the output at each via point.
If a cost is missing for any (source, via) or (via, sink) pair then it is assumed that delivery is not possible. If a supply output is missing for any source it is assumed to be zero. If demand quantity is missing for any sink it is assumed to be zero.
It is possible to construct a problem where it is impossible to satisfy one or more sinks and/or via points. This happens when there are no reachable sources with a defined capacity. That is to say, there are either no cost entries for any via-sink pair, or not cost entries for any source-via pair connected to the sink, or there are cost entries but there are no supply constraints for any of the sources. In this scenario the function will, by default, raise an error. However you can control the behaviour using the
on_missingargument.Any number of additional constraints (equality, greater or equal, or lower or equal) can be specified for either steps of the path with the …constraints… arguments.
Args
supply- a
pd.Seriesof supply capacities, one row per source demand- a
pd.Seriesof demand quantities, one row per sink output- a
pd.Seriesof capacities, one row per via point costs_1- a
pd.Seriesof costs with a multi-index of (source, via) pairs costs_2- a
pd.Seriesof costs with a multi-index of (via, sink) pairs output_deviation_cost_factor- when value is None, the output for all "via" points needs to be met exactly. When value is non-null, any quantity in excess or deficit compared with the output adds a cost to the objective function by multiplying the amount with the cost factor. One way to understand the meaning of the factor is the following: when the factor has value X, sourcing trough a via point V1 without available output is as costly as sourcing through another via point V2 with available output located "further" by X units of transportation cost. For instance, if transportation costs are in minutes, a cost factor of 120 means that it would be cheaper to source through any via point with available output located less than 2 hours further from the final destination than from a via point that has no available output.
eq_constraints_1- a
pd.Seriesof expected allocation between sources and via points. geq_constraints_1- a
pd.Seriesof lower limit of allocation between sources and via points. leq_constraints_1- a
pd.Seriesof upper limit of allocation between sources and via points. eq_constraints_2- a
pd.Seriesof expected allocation between via points and sinks. geq_constraints_2- a
pd.Seriesof lower limit of allocation between via points and sinks. leq_constraints_2- a
pd.Seriesof upper limit of allocation between via points and sinks.
Returns
allocation- a
pd.Seriesof quantities with a multi-index of (source, via, sink) pairs leftover_supply- a
pd.Seriesof unallocated supply, one row per source leftover_output- a
pd.Seriesof unallocated output, one row per via point
def solve_transportation_problem(supply: pandas.core.series.Series, demand: pandas.core.series.Series, costs: pandas.core.series.Series, solver=None, on_missing: str = 'raise', on_lp_error: str = 'raise', allow_deviation=False, eq_constraints=None, geq_constraints=None, leq_constraints=None, commodity_ratios=None, solve_kwargs=None)def split_dataframe_using_proportions(df: pandas.core.frame.DataFrame, df_proportions: pandas.core.frame.DataFrame, values: List[str], on: List[str], by: str, where=None, validate=None) ‑> pandas.core.frame.DataFramedef stitch_dataframes(df_left, df_right, volume_column: str, values_left: List = None, values_right: List = None, fillna='UNKNOWN', indicator=False)-
Merge dataframes on common columns, splitting the values in
values_leftandvalue_rightaccording to proportions involume_column.df_left = pd.DataFrame( [ ("K1", "M1", 10, 55, "good"), ("K2", "M1", 5, 32, "great"), ("K1", "M2", 2.5, 11, "awesome"), ("K2", "M2", 3, 19.2, "awful"), ], columns=["kab", "mill", "vol", "deforestation", "quality"], ) df_right = pd.DataFrame( [ ("M1", "T1", "C1", 8, 853, 3), ("M1", "T2", "C2", 8, 732, 5), ("M2", "T1", "C1", 1, 97, 3), ("M2", "T2", "C3", 4, 333, 5), ], columns=["mill", "exporter", "country", "vol", "fob", "forest_500"], ) df = stitch_dataframes( df_left, df_right, volume_column="vol", values_left=["deforestation"], values_right=["fob"], ) def upload_pandas_df_to_s3(df, new_key, sep=';', encoding='utf8', float_format='%.2f', quotechar='"', bucket_name='trase-storage')-
Upload a csv dataset to s3 from a pandas DataFrame
:param df: pandas DataFrame object :param new_key: s3 path :param sep: Separator :param bucket_name: s3 bucket name :param encoding: encoding str :param float_format: format of float columns :param quotechar: quoting character
:return: AWS ServiceResource object
def warn_if_supply_sheds_demand_exceeds_supply(supply: pandas.core.series.Series, demand: pandas.core.series.Series, costs: pandas.core.series.Series)-
The total LP supply sometimes appears sufficient to meet the total demand, but the details of the available paths in the cost matrix make it impossible for some of the supply to reach the demand. When such issues occur, the LP just doesn't find a solution, but does not give any hint about what may be the issue. This function identifies supply sheds: groups of sources connected to the same group of sinks, and flags supply sheds for which there is excess demand.
Classes
class Any (first_child: Validation, *other_children: Validation)-
Combine other validation functions
Example
``` Any(Code(6), Code(8)) # accept codes that are length six or eight ````
Ancestors
- Validation
- abc.ABC
Inherited members
class Code (length: int, allow_x: bool = True)-
Has to be a string of a predefined length containing only digits, like "030211".
Example
Code(6)Args
allow_x- also permit "XXXXXX" (defaults to True).
Ancestors
- Validation
- abc.ABC
Class variables
var allow_x : boolvar length : int
Inherited members
class Column (name, rename=None, type=None, keep=True, clean=None, to_upper=None, report=None)-
Refer to a column in the input data that should be read and (possibly) also included in the output.
Args
name:str- the name of the column in the input data
rename:str- an optional renaming of the column before it gets included in the output data. If this is the same as the name in the input data, it is not necessary to provide this argument.
type:type- the expected type of the column in the input data; for example
intorfloat. If this argument is not provided then it is assumed to bestr. This argument is particularly important for untyped input data such as CSV, since it will trigger a type-cast. Is it still important for typed input data like Excel, since the code should reflect the types that are expected to be encountered in the input data. keep:bool- whether this column should be included in the output data. This option is for when you want to read the column from the input data so that you can use it in some way but exclude it from the output. (If you neither intend to use the column nor include it in the output, it's recommended to simply not read the column at all).
clean:bool- whether string data should be "cleaned" (double whitespace
removed, accented characters removed, etc). Defaults to
trueif the column type isstr. to_upper:bool- whether string data should be upper-cased. It defaults to
whatever value
cleantakes, and is intended to be provided as an "opt-out" for those scenarios where string cleaning is desired but the case/capitalisation in the input data should be preserved. report:bool- print a report of the total values in each column during the
processing run. Defaults to
truefor numeric types (int, float).
class c (name: str, type: Type = builtins.str, key: bool = False, link: str = None, value: Optional[Any] = None, conserve: bool = False, validate: Optional[Validation] = None, only_validate_link: bool = False, non_negative: bool = None)-
Define a column of a dataset.
Args
name- the name of the column as it appears in the file.
type- one of int, float, str, bool, List[int], etc.
key- indicates that the column should be considered to be part of the "primary key" of the dataset; in particular, that the values (among all key columns) should be unique.
link- of the form "target_dataset.target_column", indicating that this column should be left-joined on to on the "target_column" column in "target_dataset".
value- a default value that the column should be populated with.
conserve- whether this column should conserve its total sum throughout the model.
validate- a class from the
trase.tools.sei_pcs.validationmodel which performs column-level validation. For examplevalidate=Code(6)will check that every value in the column is a six-digit code. only_validate_link-
by default, if you link a target dataset, all columns of that dataset are added as part of the merge. For large datasets this can significantly increase memory. By setting
only_validate_link=True, only the target column will be added.For example, suppose that we have this definition:
datasets = { "state": Dataset([ Column("name"), Column("code"), ]), "asset": Dataset([ Column("state", link="state.code"), ]), }Then, the "asset" dataset will have the following columns:
state.namestate.code
If, however, we pass
only_validate_link=Trueto the link:datasets = { # ... "asset": Dataset([ Column("state", link="state.code", only_validate_link=True), ]), }then the "asset" dataset will have only one column: the target of the link:
state.code
However, the usual link validation will still occur.
non_negative- add a validation that values are not negative. This defaults to true for numeric types and false otherwise.
Class variables
var LINK_DELIMITERvar conserve : boolvar key : boolvar link : strvar name : strvar non_negative : boolvar only_validate_link : boolvar type : Type-
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
var validate : Optional[Validation]var value : Optional[Any]
Instance variables
var is_present_in_file
Methods
def all_validators(self) ‑> List[Validation]
class Compare-
Static methods
def absolute_error(a, b)-
The function
|b| - |a| def absolute_symmetric_relative_error(a, b)def cosine_similarity(a, b)def l1_distance(a, b)def l2_distance(a, b)def relative_error(a, b)-
Implements the function:
b - a ----- |a|Note that this function will be undefined for zero and near-zero values of a
def signed_symmetric_relative_error(a, b)-
Implements the below function, where division by zero is +/- infinity:
|b| - |a| ------------- min(|a|, |b|)The motivation for this comparator is to meaningfully handle zero values in either a or b: positive infinity means that a is zero but there is a value in b, and negative infinity means that a has a value but b is zero.
Example
signed_symmetric_relative_error( 10, 10) # 0 signed_symmetric_relative_error(250, 100) # -1.5 signed_symmetric_relative_error(100, 250) # 1.5 signed_symmetric_relative_error(100, 0) # -inf signed_symmetric_relative_error(0, 100) # inf
def signed_total_relative_error(a, b)-
Implements the below function:
|b| - |a| --------- sum(|a|)The original use case for this comparator was to get the difference in volumes exported to e.g. China relative to the entire export volume for the year.
class Context (year: int, data_directory: str, extra: Optional[dict] = <factory>, pre_extracted_data: Optional[dict] = None)-
Simple container for useful information about a model.
This object can be passed around the preparation script, run script, or elsewhere, to provide global variables for a model.
If the variable is really a hard-coded constant, consider putting it in a file like
constants.pyinstead.Args
year- the year for which the model is related to. This will be used, for
example, in conjunction with the
data_directoryargument to determine where to store input and output data for the model. data_directory- path to a directory on the local file system which should be used to store input and output data for the model.
extra:optional- a dictionary of extra metadata for the model, such as a model parameter.
pre_extracted_data:optional- dataframes to pass to the preparation files.
Class variables
var data_directory : strvar extra : Optional[dict]var pre_extracted_data : Optional[dict]var year : int
Methods
def replace(self, **changes)-
Return a copy of the instance except with some fields overwritten.
This method is identical to
dataclasses.replaceexcept that it will handle overwriting only a subset of keys ofextra:# we have two pieces of extra information: "complex" and "percent" old = Context(2013, "/tmp/", extra=dict(complex=True, percent=10)) # we alter percent only new = old.replace(extra=dict(percent=50)) # percent has updated, but complex is unchanged new.extra["complex"] # True new.extra["percent"] # 50If you do not wish to have this behaviour then simply use
dataclasses.replace
class Dataset (columns: List[Column])-
Dataset(columns: List[trase.tools.sei_pcs.definition.Column])
Class variables
var columns : List[Column]
class Enum (permitted: List)-
Must take one of a set of pre-defined values.
Example
Enum(["SOLVED", "UNSOLVED"])Ancestors
- Validation
- abc.ABC
Class variables
var permitted : List
Inherited members
class NoInputDataframePreprocessor (*args, **kwargs)-
Create a preprocessor.
Args
context- note that
Context.data_directorywill be used to store files.
We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.
Ancestors
Class variables
var columnsvar delimitervar original_extension
Methods
def process(self, df)
Inherited members
class Preprocessor (*args, client=None, fallback_bucket='trase-storage', **kwargs)-
Ancestors
Methods
def should_rerun(self, args)
Inherited members
class SupplyChain (directory: str, bucket: str = 'trase-storage', *args, **kwargs)-
Core object from which supply chain models are run.
This object is responsible for the following:
-
Executing the data preparation script
-
Loading data into memory from CSV files that were produced by the data preparation script, according to the rules in the model definition
-
Providing an interface by which these data can be modifed
-
Executing the model script, which uses the aforementioned interface
-
Generating the "results" file according to the rules in the model definition: a CSV file containing the data as it is after the model has modified it
Args
directory:str- path to the directory containing the definition.py file relative to the "trase/models" directory.
bucket:str, optional- S3 bucket to connect to, defaults to bucket specified in defaults.toml.
Ancestors
Methods
def create_file_name(self, year, country=None, commodity=None)def get_key_prefix(self, country=None, commodity=None)def upload_results(self, bucket=None, key_prefix=None, filename=None, country=None, commodity=None, suffix='', dry_run=False)
Inherited members
-
class TextPreprocessor (*args, client=None, fallback_bucket='trase-storage', **kwargs)-
Ancestors
Methods
def should_rerun(self, args)
Inherited members
class TraseGlpkCommand (logPath='glpk.log', msg=False, **kwargs)-
This is Pulp's GLPK_CMD but with a trick to capture stdout
Usually GLPK_CMD has two modes:
msg=True, in which case glpk is run as a fork of the current Python processmsg=False, in which case the output of glpl is sent to /dev/null
The intention of 1. is that the stdout of glpk is sent to the user. However, we have an odd problem that is very specific to DeforestationFree where the output is simply lost.
You can reproduce this problem by running the following code:
import os os.spawnvp(os.P_WAIT, "echo", ["echo", "hi"])If you run this locally you will see the output "hi". However if you run it in a Jupyter notebook on DeforestationFree you see nothing. I suspect that the output is being sent to the terminal rather than the notebook, perhaps the same problem as https://github.com/jupyterlab/jupyterlab/issues/9668. I hope that an upgrade or re-deployment of JupyterHub will resolve this issue.
Until then I have resorted to this clever trick, which is to patch
os.devnullwith a filename and passmsg=False. The pulp code runsopen(os.devnull, "w")which actually results it in opening the text file!To use this class do the following:
solver = TraseGlpkCommand() solve_transportation_problem(supply, demand, costs, solver=solver):param bool mip: if False, assume LP even if integer variables :param bool msg: if False, no log is shown :param float timeLimit: maximum time for solver (in seconds) :param list options: list of additional options to pass to solver :param bool keepFiles: if True, files are saved in the current directory and not deleted after solving :param str path: path to the solver binary
Ancestors
- pulp.apis.glpk_api.GLPK_CMD
- pulp.apis.core.LpSolver_CMD
- pulp.apis.core.LpSolver
Methods
def actualSolve(self, lp)-
Solve a well formulated lp problem
class ValidationError (*args, **kwargs)-
Common base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class Value (value: Any)-
Must match a single value
Example
Value("UNKNONWN")Ancestors
- Validation
- abc.ABC
Class variables
var value : Any
Inherited members