Module trase.tools.sei_pcs.supply_chain

The "central" interface which enables a user use to easily run an SEI-PCS model

Functions

def cwd(path)
def split_flows_using_proportions(supplychain: SupplyChain, df_proportions: pandas.core.frame.DataFrame, values: List[str], updating: List[str], on: List[str], by: str, where=None, skip_conservation_check_for=None, validate=None)

Args

values
a list of numerical columns whose values will be reduced: usually this is ["volume", "fob"]
updating
a list of columns which will be updated on the flows dataset: usually this is ["status", "branch"]
on
a list of columns on which the flows dataframe will be joined with the proportions dataframe
by
the name of the column which contains the proportions: values between 0.0 and 1.0, which always sum to one when aggregated over the columns specified in the on parameter
where
a dictionary of column name -> function which will be used to filter the flows dataframe before the proportions are applied: usually this is {"status": lambda status: status == "UNKNOWN"}
def write_results_dataframe_to_disk(df, pathlike_or_buffer, mkdirs=False)

Classes

class SupplyChain (directory: str, year: int, extra: Optional[dict] = None, script_levels: int = 1, data_directory: Optional[str] = None, definition: Optional[Definition] = None, pre_extracted_data: Optional[dict] = None)

Core object from which supply chain models are run.

This object is responsible for the following:

  • Executing the data preparation script

  • Loading data into memory from CSV files that were produced by the data preparation script, according to the rules in the model definition

  • Providing an interface by which these data can be modifed

  • Executing the model script, which uses the aforementioned interface

  • Generating the "results" file according to the rules in the model definition: a CSV file containing the data as it is after the model has modified it

Provides an easy interface to run supply chain models.

This class is intended to be used via trase.tools.sei_pcs.pandas_supply_chain.SupplyChain. The most basic invocation is:

SupplyChain("path/to/my/model", 2019)

When run like this, the class requires the following file structure:

trase/models
└── path/to/my/model
    └── definition.py

The definition file should contain the definition of the supply chain model (see trase.tools.sei_pcs.definition).

However, it supports the following file structure:

trase/models
└── path/to/my/
    ├── model
    │   ├── 2019
    │   │   ├── downloaded         # files downloaded via ETL framwork
    │   │   ├── prepared           # files prepared by ETL framework
    │   │   └── results            # output of the model
    │   ├── definition.py
    │   ├── pandas.py              # model run code
    │   ├── quality_assurance.py   # post-run, quality assurance code
    │   └── preparation.py         # ETL framework preparation code
    ├── preparation.py             # parent-level preparation*
    └── quality_assurance.py       # parent-level quality assurance*

* Notice that the scripts preparation.py and quality_assurance.py may also be additionally located in the parent directory. In this location, they will be run for all child models. This is intended to support "country-level" datasets when there are many models for country. By default only one parent directory is searched, but you can increase this using the script_levels parameter.

Args

directory : str
path to the directory containing the definition.py file relative to the "trase/models" directory.
year : int
year for which the supply chain model is to be run
extra : dict, optional
extra data for the model that will be passed to trase.tools.sei_pcs.context.Context. For example, if we provide extra=dict(factor=10), then we can access this later as supplychain.context.extra["factor"].
script_levels : int, optional
how many parent directories to search for preparation.py and quality_assurance.py scripts. The default is 1, meaning that the directory and its parent will be searched. Usually this is the commodity and the country.
data_directory : str, optional
a place to store the data for the ETL framework. By default this will be stored alongside the definition.py file in a sub-directory named after the current year
definition : Definition, optional
an in-memory definition for the supply chain. If this is not supplied then we will try to read a file "definition.py" in the directory argument of this constructor.
pre_extracted_data : dict, optional
if this is provided, then preparation classes which would have otherwise extracted data from e.g. S3 will instead use these values. This is a stop-gap hack to allow running of supply chain models in DBT; where we give DBT instead of the preparation classes the responsibility of reading from S3. This object should be a dictionary, where the keys are the file stem of the "outname" properties of the preparation classes; for example "flows".

Subclasses

Methods

def add_suffix(self, path, suffix)
def dataset_path(self, name) ‑> str
def df_export(self) ‑> pandas.core.frame.DataFrame

Construct the results dataframe.

The columns that this dataframe includes are as specified in the "export" section of the definition. To see how this should be written, refer to existing models such as the demonstrative "Candyland" model.

def export_results(self, df=None, suffix='', export=True, flows=True)

Write the results dataframe to disk.

The location will be <year>/results/results.csv, or <year>/results/results_suffix.csv.

Args

export
write the results/results.csv file, containing columns defined in the "export" section of the definition (default: True)
flows
write the results/flows.csv file, containing the flows dataset without pre-joined data (default: True)
def flow_report_by_attribute(self, value_column: str, by: List[str], significant_digits=2, **kwargs)

Prints a summary the flows dataframe when consolidated over one or more categorical columns. The absolute numbers are included as well as a relative percentage.

Example

>>> supplychain.flow_report_by_attribute("vol", ["exporter"])
                     sum percentage
exporter
BUNGE      4,300,000,000        56%
ADM        2,200,000,000        28%
CARGILL    1,300,000,000        16%

Args

value_column
the numerical column to add up. Only one column is supported
by
the categorical column(s) to group over.
significant_digits
number of significant digits to round the numbers to, or None if you do not want rounding to occur (default: 2).
def get(self, name, subset=None, prefix='', ids=True)

Access a dataset by name.

Example

df = supplychain.get("flows", subset=["branch"])

If possible, it is best to use the subset parameter to pre-filter the columns you want.

If the dataset has any columns which had a link parameter in the definition, they will be pre-joined for you using a dot notation, e,g, "department.province.name".

Args

name
the name of the dataset, such as "province". The flows dataset is referred to as "flows"
subset : list
a list of columns to select from the dataset. This is equivalent to filtering the dataset after you have accessed it; except with two differences. Firstly, the "_id" column which is required for the update method is preserved. Secondly, future versions of the framework may apply efficiency gains to using subset for large datasets
prefix
an optional string to prefix every column with
def load(self)

Read the prepared CSV files on disk and load them into this object.

Which CSV files are read is determined by the datasets in the definition. If the definition contains this:

datasets = {
    "province": Dataset([
        c("name")
    ),
    ...
}

Then the file <year>/prepared/province.csv will be read. As well as "datasets", the file <year>/prepared/flows.csv is always read.

After all of the data is read, it will be checked for consistency. In particular, this includes:

  • The columns of a dataset with key=True must contain unique values when considered together
  • If a column has a link parameter then the target must be a many-to-one left join with no missing values.
  • The columns of CSV must be parsable to the target types, like int or bool

Calling load() will also create columns which have the value parameter.

def preparation(self)
def quality_assurance(self)
def replace(self, df_flows, missing_value_columns='warn', extra_columns='raise', skip_conservation_check_for=None)

Replace the flows dataset with a new dataframe.

This function can only be used to modify the "flows" dataset: it is not possible to modify any other datasets.

Example

Suppose that we have the following flows definition:

datasets={
    "municipality": Dataset([
        Column("name"),
        Column("trase_id"),
    ]),
},
flows=[
    Column("status", value="UNPROCESSED"),
    Column("municipality", link="trase_id"),
    Column("vol", int),
],

Then we would replace the flows dataset with some dataframe df_2 as follows:

# the following columns are required. "status" is optional; if not supplied the
# default value of "UNPROCESSED" will be used
assert "vol" in df.columns
assert "municipality.trase_id" in df.columns

supplychain.replace(df)

supplychain.get("flows")  # will now return the new dataframe

In the above example, only the columns "vol", "municipality.trase_id" and (optionally) "status" will be read. Other columns, like "municipality.name" for example, will be ignored. Such linked data will be joined in from the "municipality" dataset during the call to supplychain.get("flows").

Example: skipping a conservation check

For example, suppose we would like to allow the total sum of "vol" to change:

supplychain.replace(df, skip_conservation_check_for=["vol"])

Args

df_flows
a pandas dataframe that will replace the flows dataset.
missing_value_columns
behaviour when columns with a value have not been provided: one of "ignore" (do nothing), "warn" (print a warning) or "raise" (raise an error). By "column with a value" we mean one constructed like Column("my_column", value="default"): see Column
extra_columns
behaviour when the dataframe has unexpected columns: one of one of "ignore" (do nothing), "warn" (print a warning) or "raise" (raise an error).
skip_conservation_check_for : list
if you know that this update will cause a change in the total sum of a numeric column which has conserve=True, you can include it in this list to disable the check.

Raises

KeyError
if some non-value columns are missing, or some value columns are missing and missing_value_columns="error"
LinkError
if any linked columns fail to fully join with their targets
ConservedValueError
if the update would cause any columns which have conserve=True in their definition and are not included in the skip_conservation_check_for parameter to change in their total sum
def results_flows_path_with_optional_suffix(self, suffix='')
def results_metadata_path_with_optional_suffix(self, suffix='')
def results_path_with_optional_suffix(self, suffix='')
def run(self)

Execute the model run script.

This function will open the script "model.py" in the model directory and execute the function run defined in it.

Returns: the return value of the run function in the model run script.

def sankey(self, value_column, categorical_columns, *args, **kwargs)
def update(self, df_flows, columns, skip_conservation_check_for=None)

Update a subset of the flows dataset.

This function allows you to:

- Update a _subset_ the columns, leaving the others unchanged; and/or
- Update a _subset_ of rows, leaving the others unchanged; and/or
- _Split_ rows; that is, duplicate rows and then overwrite a subset of
    columns; and/or
- Add new rows

It is not possible to delete rows, nor is it possible to merge them. If you wish to do either of these then use trase.tools.sei_pcs.pandas_supply_chain.SupplyChain.replace.

This function can only be used to modify the "flows" dataset: it is not possible to modify any other datasets.

Example: modifying a subset of columns

df = supplychain.get("flows")
df["status"] = "my new value"
supplychain.update(df, ["status"])

Example: modifying a subset of rows

df = supplychain.get("flows")
df = df[df["municipality"] != "UNKNOWN"].copy()  # select a subset of rows
df["status"] = "solved"
supplychain.update(df, ["status"])

Example: splitting rows

This is done by having rows which repeat the "_id" column:

df = supplychain.get("flows")
df = pd.concat([
    df.assign(vol=0.5 * df["vol"], status="solved"),
    df.assign(vol=0.5 * df["vol"], status="unsolved"),
])
supplychain.update(df, ["status", "vol"])

Example: creating new rows

This is done by using the value -1 for the "_id":

df = supplychain.get("flows", subset=["status", "vol"])
df = df.append(
    {"_id": -1, "status": "DOMESTIC", "vol": 100},
    ignore_index=True,
)
supplychain.update(df, ["status", "vol"])

Example: skipping a conservation check

For example, suppose we would like to allow the total sum of "vol" to change:

supplychain.update(df, ["status", "vol"], skip_conservation_check_for=["vol"])

Args

df_flows
the flows dataset. This must have a column "_id" so that it is possible to relate your changed rows to the original ones.
columns : list
a list of columns which should be updated. If a column was linked, you must update the link parameter. For example if you have a column Column("province", link="province.name") then you must pass columns=["province.name"] to this function in order to update it.
skip_conservation_check_for : list
if you know that this update will cause a change in the total sum of a numeric column which has conserve=True, you can include it in this list to disable the check.

Raises

ValueError
if there are any type-casting issues or columns which are marked as "key" but contain duplicated values
LinkError
if any linked columns fail to fully join with their targets
ConservedValueError
if the update would cause any columns which have conserve=True in their definition and are not included in the skip_conservation_check_for parameter to change in their total sum