Module trase.tools.aws.metadata

This module creates and manages metadata; primarily intended for objects in AWS S3, but not necessarily limited to this.

The initial aim of this metadata is to record the following information for our S3 objects:

  • The script that produced the object (e.g. file path relative to Git and commit hash)
  • The person that ran the script
  • The upstream datasets that the object was produced from ("dataset provenance")

Note on aligning with existing metadata standards

There have been many attempts over the years in the wider data community to standardise metadata files. Where possible Trase should align ourselves to external standards rather than inventing our own. Two of the most promising and widely-mentioned are:

The former seems to have more recent activity than the latter. Therefore, this module is written with the goal of eventually aligning with Tabular Data Resource, in particular in how it documents the columns and types of the CSV file, the delimiter, which values are to be interpreted as "missing" and so on. However, since this is largely standardised across Trase, it is not so urgent to start including such information in our metadata, at least not for internal data. Therefore, for now this module is concerned only with data provenance and the source script.

Functions

def crawl_upstream(client, bucket, key, version_id=None, seen=None) ‑> Generator[trase.tools.utilities.helpers.Kinship, None, None]
def dataclass_to_dict(object: ) ‑> dict
def find_metadata_object(client, key, bucket, version_id=None, check_exists=True) ‑> Tuple[str, str, Optional[str]]

Tries to find the metadata object on S3 for a given S3 object

Returns: a tuple of (bucket, key, version_id) for the metadata object, where version_id may be None.

def generate_metadata(script_path: str, upstream: List[S3Object] = None) ‑> Metadata
def generate_metadata_key(key)
def generate_metadata_path(path)
def get_commit_hash(script_path)
def get_metadata_from_environment() ‑> Script
def get_metadata_from_git(script_path) ‑> Script

Fetch git commit hash information: requires user to have "git" installed and also assumes that file is in a git repository

def get_script_path_relative_to_root(script_path)
def upload_file_and_metadata(client, bucket: str, key: str, path: str, metadata_path: str, metadata_key: str, printer=<bound method Logger.debug of <Logger trase.tools.aws.metadata (WARNING)>>)

Uploads a file and its metadata file to S3.

The important aspect of this function is that the S3 object will be uploaded with custom S3 metadata which allows you to locate the metadata object in S3.

Args

bucket
the s3 bucket that the file and metadata should be uploaded to
key
the s3 key that the file should be uploaed to
path
the location of the file on disk
metadata_path
the location of the metadata file on disk
metadata_key
the s3 key that the metadata file should be uploaded to

Raises

ValueError
if the key and the generated metadata key are the same. This can happen, for example, if the original object was itself a .yml file. In this case.
ValueError
if the files at path_to_object or path_to_metadata do not exist
def write_csv_for_upload(df: pd.DataFrame, key: str, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, **pd_to_csv_kwargs)

Writes a CSV file and either indicates to the user how they should upload it to, S3 or actually performs the upload itself.

It is a very intentional design that, by default, the upload to S3 does not occur. We want the user to explicitly choose to do the upload to minimise the chance that they accidentally overwrite data on S3. By putting a "human in the loop" we also give the user a chance to inspect the data before upload.

Args

df
the Pandas dataframe that should be serialized
key
the s3 key that the dataframe should be uploaded to
script_path : optional
the path of the script that generated the file. Often this can be taken from __file__. If it is not supplied then we will inspect the call stack and take the filename of the file which called this function, which may or may not be what was intended.
path : optional
the (temporary) file location where the Pandas dataframe will be written to before uploading. The reason it is written to disk is to give the user the chance to inspect its contents before uploading. If not provided then a path will be generated in the operating system's temporary directory.
metadata_path : optional
the (temporary) file location where the metadata will be written to before uploading. If not provided then it will be written next to the data file with a ".yml" extension.
metadata_key : optional
the s3 key where the metadata will be uploaded to. If not provided then it will be the same as the s3 key for the data file, but with a ".yml" extension.
bucket : optional
the s3 bucket to upload the file and metadata file to. Defaults to bucket defined in trase.config.settings.
upstream : optional
an iterable of trase.tools.aws.metadata.S3Object objects representing the upstream datasets that were used to produce the dataframe. If not provided these will be taken from trase.tools.aws.metadata.S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
do_upload : optional
whether to actually do the upload or just print to the user that they should do it themselves. If not provided then we will look for the presence of "–upload" in sys.argv to determine its value.
pd_to_csv_kwargs
keyword arguments that will be passed to pd.to_csv
def write_file_and_metadata(path: str, script_path, metadata_path, do_write, upstream: List[S3Object] = None)

Write a file to disk along with a metadata file.

Args

script_path
the path to the script that made the dataframe (you can take this from __file__)
path
the destination file path to serialize the dataframe to
metadata_path
the destination fie path of the metadata file on disk
do_write
a function that takes a file path and writes the file there
upstream : optional
an iterable of trase.tools.aws.metadata.S3Object objects representing the upstream datasets that were used to produce the dataframe.

Returns: the path of the metadata file that was written

def write_file_for_upload(key: str, do_write, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None)
def write_geopandas_for_upload(gdf: gpd.GeoDataFrame, key: str, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, driver: str = 'geojson', **to_file_kwargs)

Writes a GeoDataFrame to disk and optionally uploads to S3 with metadata.

Supported drivers: - "parquet" -> GeoParquet - "geojson" -> GeoJSON - "gpkg" -> GeoPackage

def write_json_for_upload(data, key: str, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, **json_dump_kwargs)
def write_metadata(metadata: Metadata, path)
def write_parquet_for_upload(df: pd.DataFrame, key: str, is_polars=False, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, **to_parquet_kwargs)

Classes

class Metadata (script: Script, user: str, upstream: List[S3Object] = <factory>)

Metadata(script: trase.tools.aws.metadata.Script, user: str, upstream: List[trase.tools.aws.tracker.S3Object] = )

Class variables

var scriptScript
var upstream : List[S3Object]
var user : str
class Script (path: str, commit_hash: Optional[str], github_username: Optional[str] = 'sei-international', github_repository: Optional[str] = 'TRASE', type: Optional[str] = 'github_script')

The location of a GitHub script

Class variables

var commit_hash : Optional[str]
var github_repository : Optional[str]
var github_username : Optional[str]
var path : str
var type : Optional[str]