Giter Site home page Giter Site logo

etl_manager's Introduction

etl_manager

Actions Status

A python package that manages our data engineering framework and implements them on AWS Glue.

The main functionality of this package is to interact with AWS Glue to create meta data catalogues and run Glue jobs.

To install:

pip install etl_manager

Meta Data

Let's say I have a single table (a csv file) and I want to query it using Amazon athena. My csv file is in the following S3 path: s3://my-bucket/my-table-folder/file.csv.

file.csv is a table that looks like this:

col1 col2
a 1
b 12
c 42

As you can see col1 is a string and col2 is a integer.

Notes:

  • for Athena to work your table should not contain a header. So before file.csv is uploaded to S3 you should make sure it has no header.
  • Tables must be in a folder. I.e. the location of your table (table.location) should be the parent folder of where you data exists. See example below.

To create a schema for your data to be queried by Athena you can use the following code:

from etl_manager.meta import DatabaseMeta, TableMeta

# Create database meta object
db = DatabaseMeta(name = 'my_database', bucket='my-bucket')

# Create table meta object
tab = TableMeta(name = 'my_table', location = 'my-table-folder')

# Add column defintions to the table
tab.add_column(name = 'col1', 'character', description = 'column contains a letter')
tab.add_column(name = 'col2', 'int', description = 'column contains a number')

# Add table to the database
db.add_table(tab)

# Create the table on AWS glue
db.create_glue_database()

Now the table can be queried via SQL e.g. SELECT * FROM my_database.my_table

Meta data structure

Currently at very simple level. Assume you have the following folder structure from example code:

meta_data/
--- database.json
--- teams.json
--- employees.json

database.json is a special json file that holds the meta data for the database. In our example it looks like this:

{
    "description": "Example database",
    "name": "workforce",
    "bucket": "my-bucket",
    "base_folder": "database/database1"
}

When you create your database it will have this name and description. The bucket key specifies where the database exists (and therefore where the tables exist) in S3. The base_folder is is the initial path to where the tables exist. If your tables are in folders directly in the bucket (e.g. s3://my-bucket/table1/) then you can leave base_folder as an empty string ("").

The employees table has an ID for each employee their name and dob. The table meta looks like this:

{
    "$schema" : "https://moj-analytical-services.github.io/metadata_schema/table/v1.0.0.json",
    "name": "employees",
    "description": "table containing employee information",
    "data_format": "parquet",
    "location": "employees/",
    "columns": [
        {
            "name": "employee_id",
            "type": "int",
            "description": "an ID for each employee"
        },
        {
            "name": "employee_name",
            "type": "character",
            "description": "name of the employee"
        },
        {
            "name": "employee_dob",
            "type": "date",
            "description": "date of birth for the employee"
        }
    ]
}

Currently supported data types for your columns currently are:

character | int | long | float | double | decimal | date | datetime | boolean

This is a standard layout for a table metadata json. $schema points to another json that validates the structure of out table metadata files. Your table will have this name and description (Note: It is strongly suggested that the name of your table matches the name of the metadata json file) when the database is created. The location is the relative folder path to where your table exists. This path is relative to your database base_folder. This means that the full path your table is s3://<database.bucket>/<database.base_folder>/<table.folder>/. So in this example the table employees should be in the s3 path s3://my-bucket/database/database1/employees. The data_format specifies what type of data the table is. Finally your columns is an array of objects. Where each object is a column definition specifying the name, description and type (data type) of the column. Each column can have optional arguments pattern, enum and nullable (see table_schema.json for definition).

Note: that the order of the columns listed here should be the order of the columns in the table (remember that data for a table should not have a header so the data will be queried wrong if the column order does not match up with the actual data).

Here is another table in the database called teams. The teams table is a list of employee IDs for each team. Showing which employees are in each team. This table is taken each month (so you can see which employee was in which team each month). Therefore this table is partitioned by each monthly snapshot.

{
    "$schema" : "https://moj-analytical-services.github.io/metadata_schema/table/v1.0.0.json",
    "name": "teams",
    "description": "month snapshot of which employee with working in what team",
    "data_format": "parquet",
    "location": "teams/",
    "columns": [
        {
            "name": "team_id",
            "type": "int",
            "description": "ID given to each team",
            "nullable" : false
        },
        {
            "name": "team_name",
            "type": "character",
            "description": "name of the team"
        },
        {
            "name": "employee_id",
            "type": "int",
            "description": "primary key for each employee in the employees table",
            "pattern" : "\\d+"
        },
        {
            "name": "snapshot_year",
            "type": "int",
            "description": "year at which snapshot of workforce was taken"
        },
        {
            "name": "snapshot_month",
            "type": "int",
            "description": "month at which snapshot of workforce was taken",
            "enum" : [1,2,3,4,5,6,7,8,9,10,11,12]
        }
    ],
    "partitions" : ["snapshot_year", "snapshot_month"]
}

From the above you can see this has additional properties enum, pattern, nullable and a partitions property:

  • enum: What values the column can take (does not have to include nulls - should use nullable property)
  • pattern: Values in this column should match this regex string in the pattern property
  • nullable: Specifies if this column should accept NULL values.
  • partitions: Specifies if any of the columns in the table are file partitions rather than columns in the data. etl_manager will force your meta data json files to have columns that are partitions at the end of your data's list of columns.

Note: etl_manager does not enforce information provided by enum, pattern and nullable. It is just there to provide information to other tools or functions that could use this information to validate your data. Also the information in pattern, enum and nullable can conflict etl_manager does not check for conflicts. For example a column with an enum of [0,1] and a pattern of [A-Za-z] is allowed.

Examples using the DatabaseMeta Class

The easiest way to create a database is to run the code below. It reads a database schema based on the json files in a folder and creates this database meta in the glue catalogue. Allowing you to query the data using SQL using Athena.

from etl_manager.meta import read_database_folder
db = read_database_folder('example_meta_data/')
db.create_glue_database()

The code snippet below creates a database meta object that allows you to manipulate the database and the tables that exist in it

from etl_manager.meta import read_database_folder

db = read_database_folder('example_meta_data/')

# Database has callable objects

db.name # workforce

db.table_names # [employees, teams]

# Each table in the database is an object from the TableMeta Class which can be callable from the database meta object

db.table('employees').columns # returns all columns in employees table

# The db and table object properties can also be altered and updated

db.name = 'new_db_name'
db.name # 'new_db_name

db.table('employees').name = 'new_name'

db.table_names # [new_name, teams]

db.remove_table('new_name')

db.name # workforce_dev (note as default the package adds _dev if a db_suffix is not provided in DatabaseMeta)

# Set all table types to parquet and create database schema in glue
for t in db_table_names :
    db.table(t).data_format = 'parquet'
db.create_glue_database()

Using the GlueJob Class

The GlueJob class can be used to run pyspark jobs on AWS Glue. It is worth keeping up to date with AWS release notes and general guidance on running Glue jobs. This class is a wrapper function to simplify running glue jobs by using a structured format.

from etl_manager.etl import GlueJob

my_role = 'aws_role'
bucket = 'bucket-to-store-temp-glue-job-in'

job = GlueJob('glue_jobs/simple_etl_job/', bucket=bucket, job_role=my_role, job_arguments={"--test_arg" : 'some_string'})
job.run_job()

print(job.job_status)

Glue Job Folder Structure

Glue jobs have the prescribed folder format as follows:

├── glue_jobs/
|   |
│   ├── job1/
|   |   ├── job.py
|   |   ├── glue_resources/
|   |   |   └── my_lookup_table.csv
|   |   └── glue_py_resources/
|   |       ├── my_python_functions.zip
|   |       └── github_zip_urls.txt
|   |   └── glue_jars/
|   |       └── my_jar.jar
|   |
│   ├── job2/
│   |   ├── job.py
│   |   ├── glue_resources/
│   |   └── glue_py_resources/
|   |
|   ├── shared_job_resources/
│   |   ├── glue_resources/
|   |   |   └── meta_data_dictionary.json
│   |   └── glue_py_resources/
|   |   └── glue_jars/
|   |       └── my_other_jar.jar

Every glue job folder must have a job.py script in that folder. That is the only required file everything else is optional. When you want to create a glue job object you point the GlueJob class to the parent folder of the job.py script you want to run. There are two additional folders you can add to this parent folder :

glue_resources folder

Any files in this folder are uploaded to the working directory of the glue job. This means in your job.py script you can get the path to these files by:

path_to_file = os.path.join(os.getcwd(), 'file_in_folder.txt')

The GlueJob class will only upload files with extensions (.csv, .sql, .json, .txt) to S3 for the glue job to access.

glue_py_resources

These are python scripts you can import in your job.py script. e.g. if I had a utils.py script in my glue_py_resources folder. I could import that script normally e.g.

from utils import *

You can also supply zip file which is a group of python functions in the standard python package structure. You can then reference this package as you would normally in python. For example if I had a package zipped as my_package.zip in the glue_py_resources folder then you could access that package normally in your job script like:

from my_package.utils import *

You can also supply a text file with the special name github_zip_urls.txt. This is a text file where each line is a path to a github zip ball. The GlueJob class will download the github package rezip it and send it to S3. This github python package can then be accessed in the same way you would the local zip packages. For example if the github_zip_urls.txt file had a single line https://github.com/moj-analytical-services/gluejobutils/archive/master.zip. The package gluejobutils would be accessible in the job.py script:

from gluejobutils.s3 import read_json_from_s3

shared_job_resources folder

This a specific folder (must have the name shared_job_resources). This folder has the same structure and restrictions as a normal glue job folder but does not have a job.py file. Instead anything in the glue_resources or glue_py_resources folders will also be used (and therefore uploaded to S3) by any other glue job. Take the example below:

├── glue_jobs/
│   ├── job1/
│   |   ├── job.py
│   |   ├── glue_resources/
|   |   |   └── lookup_table.csv
│   |   └── glue_py_resources/
|   |       └── job1_specific_functions.py
|   |
|   ├── shared_job_resources/
│   |   ├── glue_resources/
|   |   |   └── some_global_config.json
│   |   └── glue_py_resources/
|   |       └── utils.py

Running the glue job job1 i.e.

job = GlueJob('glue_jobs/job1/', bucket, job_role)
job.run_job()

This glue job would not only have access the the python script job1_specific_functions.py and file lookup_table.csv but also have access to the python script utils.py and file some_global_config.json. This is because the latter two files are in the shared_job_resources folder and accessible to all job folders (in their glue_jobs parent folder).

Note: Users should make sure there is no naming conflicts between filenames that are uploaded to S3 as they are sent to the same working folder.

Using the Glue Job class

Returning to the initial example:

from etl_manager.etl import GlueJob

my_role = 'aws_role'
bucket = 'bucket-to-store-temp-glue-job-in'

job = GlueJob('glue_jobs/simple_etl_job/', bucket=bucket, job_role=my_role)

Allows you to create a job object. The GlueJob class will have a job_name which is defaulted to the folder name you pointed it to i.e. in this instance the job is called simple_etl_job. To change the job name:

job.job_name = 'new_job_name'

In AWS you can only have unique job names.

Other useful function and properties:

# Increase the number of workers on a glue job (default is 2)
job.allocated_capacity = 5

# Set job arguments these are input params that can be accessed by the job.py script
job.job_arguments = {"--test_arg" : 'some_string', "--enable-metrics" : ""}

#### job_arguments

These are strings that can be passed to the glue job script. Below is an example of how these are accessed in the job.py script. This code snippit is taken from the simple_etl_job found in the example folder of this repo.

# Example job tests access to all files passed to the job runner class
import sys
import os

from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from gluejobutils.s3 import read_json_from_s3

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'metadata_base_path', 'test_arg'])

print "JOB SPECS..."
print "JOB_NAME: ", args["JOB_NAME"]
print "test argument: ", args["test_arg"]

# Read in meta data json
meta_employees = read_json_from_s3(os.path.join(args['metadata_base_path'], "employees.json"))

### etc

Notes:

  • The test_arg does not have two dashes in front of it. When specifying job_arguments with the GlueJob class it must be suffixed with -- but you should remove these when accessing the args in the job.py script.
  • metadata_base_path is a special parameter that is set by the GlueJob class. It is the S3 path to where the meta_data folder is in S3 so that you can read in your agnostic metadata files if you want to use them in your glue job. Note that the gluejobutils package has a lot of functionality with integrating our metadata jsons with spark.
  • The GlueJob argument --enable-metrics is also a special parameter that enables you to see metrics of your glue job. See here for more details on enabling metrics.
  • Note that JOB_NAME is a special parameter that is not set in GlueJob but automatically passed to the AWS Glue when running job.py. See here for more on special parameters.

Example of full glue_job and meta_data structures and code can be found here.

Unit Tests

This package has unit tests which can also be used to see functionality.

Unit tests can be ran by:

python -m unittest tests.test_tests -v

etl_manager's People

Contributors

anthonycody avatar calumabarnett avatar gwionap avatar isichei avatar jhpyke avatar mandarinduck avatar mralecjohnson avatar nicholsondarius avatar robinl avatar s-block avatar staberinde avatar thomas-hirsch avatar xoen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

etl_manager's Issues

Need to catch boto throttling

See traceback

[2018-11-23 16:55:18,326] {models.py:1428} INFO - Executing <Task(PythonOperator): count> on 2018-11-23 16:47:34.247276
[2018-11-23 16:55:18,326] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run crest_preprocess_tables_etl count 2018-11-23T16:47:34.247276 --job_id 3263 --raw -sd DAGS_FOLDER/crest_preprocess_tables_etl.py']
[2018-11-23 16:55:20,522] {base_task_runner.py:98} INFO - Subtask: [2018-11-23 16:55:20,521] {__init__.py:45} INFO - Using executor LocalExecutor
[2018-11-23 16:55:20,825] {base_task_runner.py:98} INFO - Subtask: [2018-11-23 16:55:20,824] {models.py:189} INFO - Filling up the DagBag from /Users/karik/airflow/dags/crest_preprocess_tables_etl.py
[2018-11-23 16:55:21,097] {base_task_runner.py:98} INFO - Subtask: [2018-11-23 16:55:21,097] {credentials.py:1032} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2018-11-23 16:55:23,282] {cli.py:374} INFO - Running on host Karik.local
[2018-11-23 16:55:23,338] {logging_mixin.py:84} INFO - Starting job "airflow_crest_count"...

[2018-11-23 16:57:25,607] {models.py:1595} ERROR - An error occurred (ThrottlingException) when calling the GetJobRun operation (reached max retries: 4): Rate exceeded
Traceback (most recent call last):
  File "/anaconda/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/anaconda/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 89, in execute
    return_value = self.execute_callable()
  File "/anaconda/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/karik/airflow/dags/crest_preprocess_tables_etl.py", line 90, in run_glue_job_as_airflow_task
    job.wait_for_completion()
  File "/anaconda/lib/python3.6/site-packages/etl_manager/etl.py", line 404, in wait_for_completion
    status = self.job_status
  File "/anaconda/lib/python3.6/site-packages/etl_manager/etl.py", line 379, in job_status
    return _glue_client.get_job_run(JobName=self.job_name, RunId=self.job_run_id)
  File "/anaconda/lib/python3.6/site-packages/botocore/client.py", line 320, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/anaconda/lib/python3.6/site-packages/botocore/client.py", line 623, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the GetJobRun operation (reached max retries: 4): Rate exceeded
[2018-11-23 16:57:25,609] {models.py:1624} INFO - Marking task as FAILED.
[2018-11-23 16:57:25,631] {models.py:1644} ERROR - An error occurred (ThrottlingException) when calling the GetJobRun operation (reached max retries: 4): Rate exceeded
[2018-11-23 16:57:25,633] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-11-23 16:57:25,633] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/bin/airflow", line 27, in <module>
[2018-11-23 16:57:25,633] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:     return_value = self.execute_callable()
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-11-23 16:57:25,634] {base_task_runner.py:98} INFO - Subtask:   File "/Users/karik/airflow/dags/crest_preprocess_tables_etl.py", line 90, in run_glue_job_as_airflow_task
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:     job.wait_for_completion()
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/etl_manager/etl.py", line 404, in wait_for_completion
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:     status = self.job_status
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/etl_manager/etl.py", line 379, in job_status
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:     return _glue_client.get_job_run(JobName=self.job_name, RunId=self.job_run_id)
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/botocore/client.py", line 320, in _api_call
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:     return self._make_api_call(operation_name, kwargs)
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:   File "/anaconda/lib/python3.6/site-packages/botocore/client.py", line 623, in _make_api_call
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask:     raise error_class(parsed_response, operation_name)
[2018-11-23 16:57:25,635] {base_task_runner.py:98} INFO - Subtask: botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the GetJobRun operation (reached max retries: 4): Rate exceeded

I'm running 8 python operators in parallel - wondering if because it's running from same python kernel they are throwing an error from boto3 because running in parallel. Don't seem to get this issue with dockerised versions on kubernetes pod operator but not running as much in parallel so needs proper testing on dockerised version.

base spec is getting overwritten

FIRST RUN =>

from etl_manager.meta import _get_spec, read_database_folder
print(_get_spec('base'))

OUTPUT =>

{'Name': '',
 'Description': '',
 'Owner': 'owner',
 'Retention': 0,
 'StorageDescriptor': {'Columns': [],
  'Location': '',
  'InputFormat': '',
  'OutputFormat': '',
  'Compressed': False,
  'NumberOfBuckets': -1,
  'SerdeInfo': {'SerializationLibrary': '', 'Parameters': {}},
  'BucketColumns': [],
  'SortColumns': [],
  'Parameters': {},
  'StoredAsSubDirectories': False},
 'PartitionKeys': [],
 'TableType': 'EXTERNAL_TABLE',
 'Parameters': {}}

THEN RUN =>

db = read_database_folder('example/meta_data/db1/')
glue_def_dump = db.table('pay').glue_table_definition()
print(_get_spec('base'))

OUTPUT =>

{'Name': '',
 'Description': '',
 'Owner': 'owner',
 'Retention': 0,
 'StorageDescriptor': {'Columns': [{'Name': 'employee_id',
    'Comment': 'an ID for each employee',
    'Type': 'int'},
   {'Name': 'annual_salary', 'Comment': 'Annual salary', 'Type': 'float'}],
  'Location': 's3://my-bucket/database/database1/pay/',
  'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
  'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
  'Compressed': False,
  'NumberOfBuckets': -1,
  'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
   'Parameters': {'field.delim': ','}},
  'BucketColumns': [],
  'SortColumns': [],
  'Parameters': {'classification': 'csv',
   'delimiter': ',',
   'skip.header.line.count': '1'},
  'StoredAsSubDirectories': False},
 'PartitionKeys': [],
 'TableType': 'EXTERNAL_TABLE',
 'Parameters': {'classification': 'csv',
  'delimiter': ',',
  'skip.header.line.count': '1'}}

base_spec gets overwritten after applying dict merge. Error is caused by not properly copying dictionary from _template here

Create db does not overwrite existing

db.create_glue_database()

AlreadyExistsException Traceback (most recent call last)
in ()
----> 1 db.create_glue_database()

/opt/conda/lib/python3.6/site-packages/etl_manager/meta.py in create_glue_database(self)
460 }
461
--> 462 _glue_client.create_database(**db)
463
464 for tab in self._tables :

/opt/conda/lib/python3.6/site-

add latest partition for calculated tables

SCD2 tables have dea_latest_record

For denormalised tables - could have the same variable by file partition.

DAG will be slightly more involved but not that more complicated.

Step 1:
Copy .../dernormalised_table/dea_latest_record=true/dea_snapshot_date=x/* to .../dernormalised_table/dea_latest_record=false/dea_snapshot_date=x/*

Step 2:
Then run glue job that writes to .../dernormalised_table/dea_latest_record=true/dea_snapshot_date=y/

Should only run Step 1 if dea_database_run_date > max(dea_snapshot_date)

new line json types

Test json with different datatypes to make sure etl_manager can read more than strings

json.load error

db.create_glue_database() will cause the package to fail. Specifically the line json.load(_conversion[spec_name]) from the _get_specs function. Function seems to work when I call it by itself e.g. _get_spec('base').

Might be something to do with specifying name if a object class is calling it but haven't had time to figure out.

Should have single init for db and table class

This is a big rewrite across our dependencies but probably worth it.


db = meta.read_database_json('database.json')
tab = meta.read_table_json('table1.json')

Then the only way to create a database will be those functions or initialising the db or table class like:

from etl_manager.meta import DatabaseMeta

db = DatabaseMeta(**kwargs)
tab = TableMeta(**kwargs)

add proper delete for GlueJob class

Something like this

if pre_jobs[t].job_run_state in ['succeeded', 'stopped', 'failed'] :
        pre_jobs[t].delete_s3_job_temp_folder()
        pre_jobs[t].delete_job()

Add releases and CHANGELOG.md

I think as a library there would be some value in using Semantic Versioning/tags.

Also, it's very useful to have a changelog which documents these changes and help the client code writers to investigate potential problems. We use this format in several of our helm charts, etc...so probably good to add one here with the same format.

Partitions as last columns is not persistent

Partition columns in the TableMeta object should always be at the end of columns list. When partitions property of object is set columns are correctly re-ordered. But this is not the case when a partition is set and then further columns are added later.

Example:

from etl_manager.meta import TableMeta
tb = TableMeta(name='test', location='test')

tb.add_column('p', 'int', '')
tb.add_column('a', 'int', '')

tb.partitions = ['p']

tb.add_column('b', 'int', '')

print(tb.column_names) # ['a', 'p', 'b']

Add pattern search to meta data

Give each table column a pattern property to validate values in table columns.

Specifically etl_manager should have a to_json_schema function that converts the columns in the meta data to a JSON schema which can be used to validate each table row.

job runner

When running a job boto3 should check to see if job of same name is running first before glue job tries to rerun that job. Atm boto3 will throw an error but I think the already running job gets messed up so we need to catch at the beginning of the function call.

json schema for meta data table not explicit enough

Boto throwing error:

ValidationException: An error occurred (ValidationException) when calling the CreateTable operation: 2 validation errors detected: 
...
at 'table.storageDescriptor.columns.5.member.comment' failed to satisfy constraint: Member must satisfy regular expression pattern: [\u0020-\uD7FF\uE000-\uFFFD\uD800\uDC00-\uDBFF\uDFFF\t]*;

Should update schemas to catch this error further upstream.

Floating point numbers should be represented using doubles in Athena

Parquet's 'float' type is actually a 32 bit float and its 'double' type is 64 bit.

If you write a parquet out from Panads, Athena will refuse to read the file if you set the datatype to flat, because it's a double within the parquet file.

Your query has the following error(s):

HIVE_BAD_DATA: Field X's type DOUBLE in parquet is incompatible with type float defined in table schema

Parquet file formats are enumerated here.

https://drill.apache.org/docs/parquet-format/

and Sparks are here:

http://spark.apache.org/docs/2.2.1/api/python/_modules/pyspark/sql/types.html#FloatType

Need way of updating db rather than creating from scratch

If we have two repos that contribute to the same database, then it's hard to include tables from both repos.

For instance, if we have two repositories that both create tables in the open_data database (e.g. one repo that ETLs ONS data to the platform, and another than ETLs travel time data), then you can add either the ONS data or the travel time data to the glue catalogue, but you can't easily have both.

create_glue_database doc string out of date

Just upgraded to v2.0.0 from a very early version.
Running (SOP Engineering Draft) main.py throws the following error when trying to call create_glue_database in meta.py:

Traceback (most recent call last):
File "main.py", line 76, in
db_st.create_glue_database()
File "/Users/anthonycody/.pyenv/versions/3.6.2/lib/python3.6/site-packages/etl_manager/meta.py", line 499, in create_glue_database
_glue_client.create_database(**db)
File "/Users/anthonycody/.pyenv/versions/3.6.2/lib/python3.6/site-packages/botocore/client.py", line 320, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/Users/anthonycody/.pyenv/versions/3.6.2/lib/python3.6/site-packages/botocore/client.py", line 623, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.AlreadyExistsException: An error occurred (AlreadyExistsException) when calling the CreateDatabase operation: Database already exists.

It appears there's a code diff in create_glue_database between old and new version, the following line has been deprecated:
del_resp = self.delete_glue_database()

This appears (?) to be the cause of the error.

Comments for function (unchanged across versions) reads;
Creates a database in Glue based on the database object calling the method function. If a database with the same name (db.name) already exists it overwrites it.

Error indicates this is no longer the case

Stop dashes in glue job params

AWS converts dashes to underscores e.g.'--my-param' will be my_param in the glue job arguments.

Better for GlueJob class to throw error rather than expect users to know that this name conversion happens.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.