Module trase.tools.sei_pcs.supply_chain
The "central" interface which enables a user use to easily run an SEI-PCS model
Functions
def cwd(path)def split_flows_using_proportions(supplychain: SupplyChain, df_proportions: pandas.core.frame.DataFrame, values: List[str], updating: List[str], on: List[str], by: str, where=None, skip_conservation_check_for=None, validate=None)-
Args
values- a list of numerical columns whose values will be reduced: usually this
is
["volume", "fob"] updating- a list of columns which will be updated on the flows dataset:
usually this is
["status", "branch"] on- a list of columns on which the flows dataframe will be joined with the proportions dataframe
by- the name of the column which contains the proportions: values between 0.0
and 1.0, which always sum to one when aggregated over the columns specified
in the
onparameter where- a dictionary of column name -> function which will be used to filter the
flows dataframe before the proportions are applied: usually this is
{"status": lambda status: status == "UNKNOWN"}
def write_results_dataframe_to_disk(df, pathlike_or_buffer, mkdirs=False)
Classes
class SupplyChain (directory: str, year: int, extra: Optional[dict] = None, script_levels: int = 1, data_directory: Optional[str] = None, definition: Optional[Definition] = None, pre_extracted_data: Optional[dict] = None)-
Core object from which supply chain models are run.
This object is responsible for the following:
-
Executing the data preparation script
-
Loading data into memory from CSV files that were produced by the data preparation script, according to the rules in the model definition
-
Providing an interface by which these data can be modifed
-
Executing the model script, which uses the aforementioned interface
-
Generating the "results" file according to the rules in the model definition: a CSV file containing the data as it is after the model has modified it
Provides an easy interface to run supply chain models.
This class is intended to be used via
trase.tools.sei_pcs.pandas_supply_chain.SupplyChain. The most basic invocation is:SupplyChain("path/to/my/model", 2019)When run like this, the class requires the following file structure:
trase/models └── path/to/my/model └── definition.pyThe definition file should contain the definition of the supply chain model (see
trase.tools.sei_pcs.definition).However, it supports the following file structure:
trase/models └── path/to/my/ ├── model │ ├── 2019 │ │ ├── downloaded # files downloaded via ETL framwork │ │ ├── prepared # files prepared by ETL framework │ │ └── results # output of the model │ ├── definition.py │ ├── pandas.py # model run code │ ├── quality_assurance.py # post-run, quality assurance code │ └── preparation.py # ETL framework preparation code ├── preparation.py # parent-level preparation* └── quality_assurance.py # parent-level quality assurance** Notice that the scripts
preparation.pyandquality_assurance.pymay also be additionally located in the parent directory. In this location, they will be run for all child models. This is intended to support "country-level" datasets when there are many models for country. By default only one parent directory is searched, but you can increase this using thescript_levelsparameter.Args
directory:str- path to the directory containing the definition.py file relative to the "trase/models" directory.
year:int- year for which the supply chain model is to be run
extra:dict, optional- extra data for the model that will be passed to
trase.tools.sei_pcs.context.Context. For example, if we provideextra=dict(factor=10), then we can access this later assupplychain.context.extra["factor"]. script_levels:int, optional- how many parent directories to search for preparation.py and quality_assurance.py scripts. The default is 1, meaning that the directory and its parent will be searched. Usually this is the commodity and the country.
data_directory:str, optional- a place to store the data for the ETL framework. By default this will be stored alongside the definition.py file in a sub-directory named after the current year
definition:Definition, optional- an in-memory definition for the supply chain. If this is not supplied then we will try to read a file "definition.py" in the directory argument of this constructor.
pre_extracted_data:dict, optional- if this is provided, then preparation classes which would have otherwise extracted data from e.g. S3 will instead use these values. This is a stop-gap hack to allow running of supply chain models in DBT; where we give DBT instead of the preparation classes the responsibility of reading from S3. This object should be a dictionary, where the keys are the file stem of the "outname" properties of the preparation classes; for example "flows".
Subclasses
Methods
def add_suffix(self, path, suffix)def dataset_path(self, name) ‑> strdef df_export(self) ‑> pandas.core.frame.DataFrame-
Construct the results dataframe.
The columns that this dataframe includes are as specified in the "export" section of the definition. To see how this should be written, refer to existing models such as the demonstrative "Candyland" model.
def export_results(self, df=None, suffix='', export=True, flows=True)-
Write the results dataframe to disk.
The location will be
<year>/results/results.csv, or<year>/results/results_suffix.csv.Args
export- write the results/results.csv file, containing columns defined in the "export" section of the definition (default: True)
flows- write the results/flows.csv file, containing the flows dataset without pre-joined data (default: True)
def flow_report_by_attribute(self, value_column: str, by: List[str], significant_digits=2, **kwargs)-
Prints a summary the flows dataframe when consolidated over one or more categorical columns. The absolute numbers are included as well as a relative percentage.
Example
>>> supplychain.flow_report_by_attribute("vol", ["exporter"]) sum percentage exporter BUNGE 4,300,000,000 56% ADM 2,200,000,000 28% CARGILL 1,300,000,000 16%Args
value_column- the numerical column to add up. Only one column is supported
by- the categorical column(s) to group over.
significant_digits- number of significant digits to round the numbers to, or
Noneif you do not want rounding to occur (default: 2).
def get(self, name, subset=None, prefix='', ids=True)-
Access a dataset by name.
Example
df = supplychain.get("flows", subset=["branch"])If possible, it is best to use the
subsetparameter to pre-filter the columns you want.If the dataset has any columns which had a
linkparameter in the definition, they will be pre-joined for you using a dot notation, e,g, "department.province.name".Args
name- the name of the dataset, such as "province". The flows dataset is referred to as "flows"
subset:list- a list of columns to select from the dataset. This is
equivalent to filtering the dataset after you have accessed it; except
with two differences. Firstly, the "_id" column which is required for
the
updatemethod is preserved. Secondly, future versions of the framework may apply efficiency gains to usingsubsetfor large datasets prefix- an optional string to prefix every column with
def load(self)-
Read the prepared CSV files on disk and load them into this object.
Which CSV files are read is determined by the datasets in the definition. If the definition contains this:
datasets = { "province": Dataset([ c("name") ), ... }Then the file
<year>/prepared/province.csvwill be read. As well as "datasets", the file<year>/prepared/flows.csvis always read.After all of the data is read, it will be checked for consistency. In particular, this includes:
- The columns of a dataset with
key=Truemust contain unique values when considered together - If a column has a
linkparameter then the target must be a many-to-one left join with no missing values. - The columns of CSV must be parsable to the target types, like int or bool
Calling
load()will also create columns which have thevalueparameter. - The columns of a dataset with
def preparation(self)def quality_assurance(self)def replace(self, df_flows, missing_value_columns='warn', extra_columns='raise', skip_conservation_check_for=None)-
Replace the flows dataset with a new dataframe.
This function can only be used to modify the "flows" dataset: it is not possible to modify any other datasets.
Example
Suppose that we have the following flows definition:
datasets={ "municipality": Dataset([ Column("name"), Column("trase_id"), ]), }, flows=[ Column("status", value="UNPROCESSED"), Column("municipality", link="trase_id"), Column("vol", int), ],Then we would replace the flows dataset with some dataframe
df_2as follows:# the following columns are required. "status" is optional; if not supplied the # default value of "UNPROCESSED" will be used assert "vol" in df.columns assert "municipality.trase_id" in df.columns supplychain.replace(df) supplychain.get("flows") # will now return the new dataframeIn the above example, only the columns "vol", "municipality.trase_id" and (optionally) "status" will be read. Other columns, like "municipality.name" for example, will be ignored. Such linked data will be joined in from the "municipality" dataset during the call to
supplychain.get("flows").Example: skipping a conservation check
For example, suppose we would like to allow the total sum of "vol" to change:
supplychain.replace(df, skip_conservation_check_for=["vol"])Args
df_flows- a pandas dataframe that will replace the flows dataset.
missing_value_columns- behaviour when columns with a value have not been
provided: one of "ignore" (do nothing), "warn" (print a warning) or
"raise" (raise an error). By "column with a value" we mean one
constructed like
Column("my_column", value="default"): seeColumn extra_columns- behaviour when the dataframe has unexpected columns: one of one of "ignore" (do nothing), "warn" (print a warning) or "raise" (raise an error).
skip_conservation_check_for:list- if you know that this update will cause
a change in the total sum of a numeric column which has
conserve=True, you can include it in this list to disable the check.
Raises
KeyError- if some non-value columns are missing, or some value columns are
missing and
missing_value_columns="error" LinkError- if any linked columns fail to fully join with their targets
ConservedValueError- if the update would cause any columns which have
conserve=Truein their definition and are not included in theskip_conservation_check_forparameter to change in their total sum
def results_flows_path_with_optional_suffix(self, suffix='')def results_metadata_path_with_optional_suffix(self, suffix='')def results_path_with_optional_suffix(self, suffix='')def run(self)-
Execute the model run script.
This function will open the script "model.py" in the model directory and execute the function
rundefined in it.Returns: the return value of the
runfunction in the model run script. def sankey(self, value_column, categorical_columns, *args, **kwargs)-
See
sankey() def update(self, df_flows, columns, skip_conservation_check_for=None)-
Update a subset of the flows dataset.
This function allows you to:
- Update a _subset_ the columns, leaving the others unchanged; and/or - Update a _subset_ of rows, leaving the others unchanged; and/or - _Split_ rows; that is, duplicate rows and then overwrite a subset of columns; and/or - Add new rowsIt is not possible to delete rows, nor is it possible to merge them. If you wish to do either of these then use
trase.tools.sei_pcs.pandas_supply_chain.SupplyChain.replace.This function can only be used to modify the "flows" dataset: it is not possible to modify any other datasets.
Example: modifying a subset of columns
df = supplychain.get("flows") df["status"] = "my new value" supplychain.update(df, ["status"])Example: modifying a subset of rows
df = supplychain.get("flows") df = df[df["municipality"] != "UNKNOWN"].copy() # select a subset of rows df["status"] = "solved" supplychain.update(df, ["status"])Example: splitting rows
This is done by having rows which repeat the "_id" column:
df = supplychain.get("flows") df = pd.concat([ df.assign(vol=0.5 * df["vol"], status="solved"), df.assign(vol=0.5 * df["vol"], status="unsolved"), ]) supplychain.update(df, ["status", "vol"])Example: creating new rows
This is done by using the value -1 for the "_id":
df = supplychain.get("flows", subset=["status", "vol"]) df = df.append( {"_id": -1, "status": "DOMESTIC", "vol": 100}, ignore_index=True, ) supplychain.update(df, ["status", "vol"])Example: skipping a conservation check
For example, suppose we would like to allow the total sum of "vol" to change:
supplychain.update(df, ["status", "vol"], skip_conservation_check_for=["vol"])Args
df_flows- the flows dataset. This must have a column "_id" so that it is possible to relate your changed rows to the original ones.
columns:list- a list of columns which should be updated. If a column was
linked, you must update the link parameter. For example if you have a
column
Column("province", link="province.name")then you must passcolumns=["province.name"]to this function in order to update it. skip_conservation_check_for:list- if you know that this update will cause
a change in the total sum of a numeric column which has
conserve=True, you can include it in this list to disable the check.
Raises
ValueError- if there are any type-casting issues or columns which are marked as "key" but contain duplicated values
LinkError- if any linked columns fail to fully join with their targets
ConservedValueError- if the update would cause any columns which have
conserve=Truein their definition and are not included in theskip_conservation_check_forparameter to change in their total sum
-