Module trase.tools.etl.processors
Classes
class Column (name, rename=None, type=None, keep=True, clean=None, to_upper=None, report=None)-
Refer to a column in the input data that should be read and (possibly) also included in the output.
Args
name:str- the name of the column in the input data
rename:str- an optional renaming of the column before it gets included in the output data. If this is the same as the name in the input data, it is not necessary to provide this argument.
type:type- the expected type of the column in the input data; for example
intorfloat. If this argument is not provided then it is assumed to bestr. This argument is particularly important for untyped input data such as CSV, since it will trigger a type-cast. Is it still important for typed input data like Excel, since the code should reflect the types that are expected to be encountered in the input data. keep:bool- whether this column should be included in the output data. This option is for when you want to read the column from the input data so that you can use it in some way but exclude it from the output. (If you neither intend to use the column nor include it in the output, it's recommended to simply not read the column at all).
clean:bool- whether string data should be "cleaned" (double whitespace
removed, accented characters removed, etc). Defaults to
trueif the column type isstr. to_upper:bool- whether string data should be upper-cased. It defaults to
whatever value
cleantakes, and is intended to be provided as an "opt-out" for those scenarios where string cleaning is desired but the case/capitalisation in the input data should be preserved. report:bool- print a report of the total values in each column during the
processing run. Defaults to
truefor numeric types (int, float).
class DataframePreprocessor (*args, **kwargs)-
Create a preprocessor.
Args
context- note that
Context.data_directorywill be used to store files.
We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.
Ancestors
Subclasses
Class variables
var columnsvar delimitervar headervar output_typesvar quotecharvar skiprows
Instance variables
var output_columns : List[str]
Methods
def column_types(self)def consolidate(self, df, numerical_columns)def converters(self)def drop_duplicates(self, df)def get_columns(self) ‑> List[Column]def possibly_clean_strings(self, df)def postprocess(self, df)def preprocess(self, df)def rename(self, df)def report(self, df, msg='', report=None)-
Print a summary of the dataframe.
This function is intended to be used during the
processfunction:Example
class MyData(DataframePreprocessor): columns = [ Column("year", type=int), Column("vol", type=float, report=True), # report=True causes sum to be included in the report ] def process(self, df_2): df_2 = df_2[df_2["year"] == self.year] self.report(df_2, "Filtered to current year") df_2 = df_2[df_2["vol"] > 0] self.report(df_2, "Dropped zero volume") return df_2When the preprocessor is run, it will output something like:
Report: Filtered to current year | Row Count: 200 | Sum of vol: 670 Report: Dropped zero volume | Row Count: 150 | Sum of vol: 670The function will always print out row counts. If furthermore any numeric columns are marked with
report=True(as in the example of "vol" above), then their total sum will also be printed.Args
df- the Pandas dataframe to report on
msg:optional- a message that will be printed to the screen
def select_output_columns(self, df)def usecols(self)def validate(self, series, validation_function)def validate_columns(self, df)
Inherited members
class NoInputDataframePreprocessor (*args, **kwargs)-
Create a preprocessor.
Args
context- note that
Context.data_directorywill be used to store files.
We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.
Ancestors
Class variables
var columnsvar delimitervar original_extension
Methods
def process(self, df)
Inherited members
class OutputFileOverwriteTracker-
A quick hack to warn if two processors are writing to the same file
Methods
def add_if_enabled(self, path)def disable(self)def enable(self)def enabled_and_seen(self, path) ‑> bool
class Preprocessor (*args, **kwargs)-
Create a preprocessor.
Args
context- note that
Context.data_directorywill be used to store files.
We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.
Ancestors
Class variables
var source_path
Instance variables
var original_extension
Methods
def construct_metadata(self, args)def extract(self, path)-
Copy source data into 'downloaded' folder. NOTE: Runs every time as local copy is less costly than s3 download
def should_rerun(self, args)def write_metadata(self, args)def yearly_source_path(self)
Inherited members
class PreprocessorBase (context: Context)-
Create a preprocessor.
Args
context- note that
Context.data_directorywill be used to store files.
We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.
Ancestors
Subclasses
Class variables
var encodingvar errors
Instance variables
var data_key : strvar inname : strvar original_extension : strvar outname : strvar year
Methods
def construct_metadata(self, args)def extract(self, path: str) ‑> None-
This method should extract data from the source and write it to PATH
def from_disk(self, path)-
Deserialize the processed object from disk into memory
def metadata_path(self) ‑> strdef ori_path(self) ‑> strdef out_path(self) ‑> strdef postprocess(self, df)def preprocess(self, df)def process(self, deserialized_object, *args)def read_file(self, path)-
Read the file of extracted data into memory and return it
def run(self, *args, returns=True, force=False, prevent=False) ‑> Optional-
:param returns: if True, will always return an object; if False then it will not return anything. This is an optimisation to avoid loading data if the return value is never used anyway. :param force: if True, will re-run regardless of value of self.should_rerun
def should_rerun(self, args)def to_disk(self, process_object, path)-
Serialize the processed object to disk, in a way that it can later be deserialized to recover exactly the original file
def write_metadata(self, args)
class TextPreprocessor (context: Context)-
Create a preprocessor.
Args
context- note that
Context.data_directorywill be used to store files.
We will create two subdirectories: downloaded (file before processing) and prepared (file after processing). If the preprocesser is year-dependent, these will furthermore be in a YEAR subdirectory. The root directory need not exist: it will be created lazily when needed.
Ancestors
Subclasses
Methods
def from_disk(self, path: str)-
Read the file of extracted data into memory and return it
Inherited members