Module trase.tools.aws.metadata
This module creates and manages metadata; primarily intended for objects in AWS S3, but not necessarily limited to this.
The initial aim of this metadata is to record the following information for our S3 objects:
- The script that produced the object (e.g. file path relative to Git and commit hash)
- The person that ran the script
- The upstream datasets that the object was produced from ("dataset provenance")
Note on aligning with existing metadata standards
There have been many attempts over the years in the wider data community to standardise metadata files. Where possible Trase should align ourselves to external standards rather than inventing our own. Two of the most promising and widely-mentioned are:
-
"Tabular Data Resource" from Frictionless Data: https://specs.frictionlessdata.io/tabular-data-resource/
-
"Model for Tabular Data and Metadata on the Web" from W3C https://www.w3.org/TR/tabular-data-model/
The former seems to have more recent activity than the latter. Therefore, this module is written with the goal of eventually aligning with Tabular Data Resource, in particular in how it documents the columns and types of the CSV file, the delimiter, which values are to be interpreted as "missing" and so on. However, since this is largely standardised across Trase, it is not so urgent to start including such information in our metadata, at least not for internal data. Therefore, for now this module is concerned only with data provenance and the source script.
Functions
def crawl_upstream(client, bucket, key, version_id=None, seen=None) ‑> Generator[trase.tools.utilities.helpers.Kinship, None, None]def dataclass_to_dict(object:) ‑> dict def find_metadata_object(client, key, bucket, version_id=None, check_exists=True) ‑> Tuple[str, str, Optional[str]]-
Tries to find the metadata object on S3 for a given S3 object
Returns: a tuple of (bucket, key, version_id) for the metadata object, where version_id may be None.
def generate_metadata(script_path: str, upstream: List[S3Object] = None) ‑> Metadatadef generate_metadata_key(key)def generate_metadata_path(path)def get_commit_hash(script_path)def get_metadata_from_environment() ‑> Scriptdef get_metadata_from_git(script_path) ‑> Script-
Fetch git commit hash information: requires user to have "git" installed and also assumes that
fileis in a git repository def get_script_path_relative_to_root(script_path)def upload_file_and_metadata(client, bucket: str, key: str, path: str, metadata_path: str, metadata_key: str, printer=<bound method Logger.debug of <Logger trase.tools.aws.metadata (WARNING)>>)-
Uploads a file and its metadata file to S3.
The important aspect of this function is that the S3 object will be uploaded with custom S3 metadata which allows you to locate the metadata object in S3.
Args
bucket- the s3 bucket that the file and metadata should be uploaded to
key- the s3 key that the file should be uploaed to
path- the location of the file on disk
metadata_path- the location of the metadata file on disk
metadata_key- the s3 key that the metadata file should be uploaded to
Raises
ValueError- if the key and the generated metadata key are the same. This can happen, for example, if the original object was itself a .yml file. In this case.
ValueError- if the files at
path_to_objectorpath_to_metadatado not exist
def write_csv_for_upload(df: pd.DataFrame, key: str, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, **pd_to_csv_kwargs)-
Writes a CSV file and either indicates to the user how they should upload it to, S3 or actually performs the upload itself.
It is a very intentional design that, by default, the upload to S3 does not occur. We want the user to explicitly choose to do the upload to minimise the chance that they accidentally overwrite data on S3. By putting a "human in the loop" we also give the user a chance to inspect the data before upload.
Args
df- the Pandas dataframe that should be serialized
key- the s3 key that the dataframe should be uploaded to
script_path:optional- the path of the script that generated the file. Often
this can be taken from
__file__. If it is not supplied then we will inspect the call stack and take the filename of the file which called this function, which may or may not be what was intended. path:optional- the (temporary) file location where the Pandas dataframe will be written to before uploading. The reason it is written to disk is to give the user the chance to inspect its contents before uploading. If not provided then a path will be generated in the operating system's temporary directory.
metadata_path:optional- the (temporary) file location where the metadata will be written to before uploading. If not provided then it will be written next to the data file with a ".yml" extension.
metadata_key:optional- the s3 key where the metadata will be uploaded to. If not provided then it will be the same as the s3 key for the data file, but with a ".yml" extension.
bucket:optional- the s3 bucket to upload the file and metadata file to.
Defaults to bucket defined in
trase.config.settings. upstream:optional- an iterable of
trase.tools.aws.metadata.S3Objectobjects representing the upstream datasets that were used to produce the dataframe. If not provided these will be taken fromtrase.tools.aws.metadata.S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION do_upload:optional- whether to actually do the upload or just print to the
user that they should do it themselves. If not provided then we will look
for the presence of "–upload" in
sys.argvto determine its value. pd_to_csv_kwargs- keyword arguments that will be passed to pd.to_csv
def write_file_and_metadata(path: str, script_path, metadata_path, do_write, upstream: List[S3Object] = None)-
Write a file to disk along with a metadata file.
Args
script_path- the path to the script that made the dataframe (you can take this
from
__file__) path- the destination file path to serialize the dataframe to
metadata_path- the destination fie path of the metadata file on disk
do_write- a function that takes a file path and writes the file there
upstream:optional- an iterable of
trase.tools.aws.metadata.S3Objectobjects representing the upstream datasets that were used to produce the dataframe.
Returns: the path of the metadata file that was written
def write_file_for_upload(key: str, do_write, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None)def write_geopandas_for_upload(gdf: gpd.GeoDataFrame, key: str, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, driver: str = 'geojson', **to_file_kwargs)-
Writes a GeoDataFrame to disk and optionally uploads to S3 with metadata.
Supported drivers: - "parquet" -> GeoParquet - "geojson" -> GeoJSON - "gpkg" -> GeoPackage
def write_json_for_upload(data, key: str, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, **json_dump_kwargs)def write_metadata(metadata: Metadata, path)def write_parquet_for_upload(df: pd.DataFrame, key: str, is_polars=False, script_path: str = None, path: str = None, metadata_path: str = None, metadata_key: str = None, bucket: str = 'trase-storage', do_upload: bool = None, upstream: List[S3Object] = None, **to_parquet_kwargs)
Classes
class Metadata (script: Script, user: str, upstream: List[S3Object] = <factory>)-
Metadata(script: trase.tools.aws.metadata.Script, user: str, upstream: List[trase.tools.aws.tracker.S3Object] =
) Class variables
var script : Scriptvar upstream : List[S3Object]var user : str
class Script (path: str, commit_hash: Optional[str], github_username: Optional[str] = 'sei-international', github_repository: Optional[str] = 'TRASE', type: Optional[str] = 'github_script')-
The location of a GitHub script
Class variables
var commit_hash : Optional[str]var github_repository : Optional[str]var github_username : Optional[str]var path : strvar type : Optional[str]