Module trase.tools.etl.processors

Classes

class Column (name, rename=None, type=None, keep=True, clean=None, to_upper=None, report=None)

Refer to a column in the input data that should be read and (possibly) also included in the output.

Args

name : str
the name of the column in the input data
rename : str
an optional renaming of the column before it gets included in the output data. If this is the same as the name in the input data, it is not necessary to provide this argument.
type : type
the expected type of the column in the input data; for example int or float. If this argument is not provided then it is assumed to be str. This argument is particularly important for untyped input data such as CSV, since it will trigger a type-cast. Is it still important for typed input data like Excel, since the code should reflect the types that are expected to be encountered in the input data.
keep : bool
whether this column should be included in the output data. This option is for when you want to read the column from the input data so that you can use it in some way but exclude it from the output. (If you neither intend to use the column nor include it in the output, it's recommended to simply not read the column at all).
clean : bool
whether string data should be "cleaned" (double whitespace removed, accented characters removed, etc). Defaults to true if the column type is str.
to_upper : bool
whether string data should be upper-cased. It defaults to whatever value clean takes, and is intended to be provided as an "opt-out" for those scenarios where string cleaning is desired but the case/capitalisation in the input data should be preserved.
report : bool
print a report of the total values in each column during the processing run. Defaults to true for numeric types (int, float).
class DataframePreprocessor (*args, **kwargs)

Create a preprocessor.

Args

context
note that Context.data_directory will be used to store files.

We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.

Ancestors

Subclasses

Class variables

var columns
var delimiter
var header
var output_types
var quotechar
var skiprows

Instance variables

var output_columns : List[str]

Methods

def column_types(self)
def consolidate(self, df, numerical_columns)
def converters(self)
def drop_duplicates(self, df)
def get_columns(self) ‑> List[Column]
def possibly_clean_strings(self, df)
def postprocess(self, df)
def preprocess(self, df)
def rename(self, df)
def report(self, df, msg='', report=None)

Print a summary of the dataframe.

This function is intended to be used during the process function:

Example

class MyData(DataframePreprocessor):
    columns = [
        Column("year", type=int),
        Column("vol", type=float, report=True),  # report=True causes sum to be included in the report
    ]

    def process(self, df_2):
        df_2 = df_2[df_2["year"] == self.year]
        self.report(df_2, "Filtered to current year")

        df_2 = df_2[df_2["vol"] > 0]
        self.report(df_2, "Dropped zero volume")

        return df_2

When the preprocessor is run, it will output something like:

Report: Filtered to current year
    | Row Count: 200
    | Sum of vol: 670
Report: Dropped zero volume
    | Row Count: 150
    | Sum of vol: 670

The function will always print out row counts. If furthermore any numeric columns are marked with report=True (as in the example of "vol" above), then their total sum will also be printed.

Args

df
the Pandas dataframe to report on
msg : optional
a message that will be printed to the screen
def select_output_columns(self, df)
def usecols(self)
def validate(self, series, validation_function)
def validate_columns(self, df)

Inherited members

class NoInputDataframePreprocessor (*args, **kwargs)

Create a preprocessor.

Args

context
note that Context.data_directory will be used to store files.

We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.

Ancestors

Class variables

var columns
var delimiter
var original_extension

Methods

def process(self, df)

Inherited members

class OutputFileOverwriteTracker

A quick hack to warn if two processors are writing to the same file

Methods

def add_if_enabled(self, path)
def disable(self)
def enable(self)
def enabled_and_seen(self, path) ‑> bool
class Preprocessor (*args, **kwargs)

Create a preprocessor.

Args

context
note that Context.data_directory will be used to store files.

We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.

Ancestors

Class variables

var source_path

Instance variables

var original_extension

Methods

def construct_metadata(self, args)
def extract(self, path)

Copy source data into 'downloaded' folder. NOTE: Runs every time as local copy is less costly than s3 download

def should_rerun(self, args)
def write_metadata(self, args)
def yearly_source_path(self)

Inherited members

class PreprocessorBase (context: Context)

Create a preprocessor.

Args

context
note that Context.data_directory will be used to store files.

We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.

Ancestors

Subclasses

Class variables

var encoding
var errors

Instance variables

var data_key : str
var inname : str
var original_extension : str
var outname : str
var year

Methods

def construct_metadata(self, args)
def extract(self, path: str) ‑> None

This method should extract data from the source and write it to PATH

def from_disk(self, path)

Deserialize the processed object from disk into memory

def metadata_path(self) ‑> str
def ori_path(self) ‑> str
def out_path(self) ‑> str
def postprocess(self, df)
def preprocess(self, df)
def process(self, deserialized_object, *args)
def read_file(self, path)

Read the file of extracted data into memory and return it

def run(self, *args, returns=True, force=False, prevent=False) ‑> Optional

:param returns: if True, will always return an object; if False then it will not return anything. This is an optimisation to avoid loading data if the return value is never used anyway. :param force: if True, will re-run regardless of value of self.should_rerun

def should_rerun(self, args)
def to_disk(self, process_object, path)

Serialize the processed object to disk, in a way that it can later be deserialized to recover exactly the original file

def write_metadata(self, args)
class TextPreprocessor (context: Context)

Create a preprocessor.

Args

context
note that Context.data_directory will be used to store files.

We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.

Ancestors

Subclasses

Methods

def from_disk(self, path: str)

Read the file of extracted data into memory and return it

Inherited members