Giter Site home page Giter Site logo

aicoe-aiops / sync-pipelines Goto Github PK

View Code? Open in Web Editor NEW
1.0 6.0 8.0 573 KB

Solgate home repo. Event-driven kubernetes native S3 sync toolkit

License: GNU General Public License v3.0

Python 98.26% HTML 1.74%
argo-workflows argo-pipelines sync-pipeline s3 data-mirroring argo-events hacktoberfest

sync-pipelines's Introduction

Solgate

License Python version Latest release PyPI Quay.io

Yet another data sync pipelines job runner.

A CLI utility that is expected to be automated via container native workflow engines like Argo or Tekton.

Installation

pip install solgate

Configuration

Solgate relies on a configuration file that holds all the information required to fully perform the synchronization. This config file is expected to be of a YAML format and it should contain following keys:

  • source key. Value to this key specifies where the data are sourced from.
  • destinations key. It's value is expected to be an array for locations. Their purpose is to define sync destinations.
  • other top level keys for a general configuration that is not specific to a single location.

General config section

All configuration in this section is optional. Use this section if you'd like to modify the default behavior. Default values are denoted below:

alerts_smtp_server: smtp.corp.redhat.com
alerts_from: [email protected]
alerts_to: [email protected]
timedelta: 1d

Description:

  • alerts_smtp_server, alerts_from, alerts_to are used for email alerting only
  • timedelta defines a time window in which the objects in the source bucket must have been modified, to be eligible fo the bucket listing. Only files modified no later than timedelta from now are included.

Source key

source:
  aws_access_key_id: KEY_ID
  aws_secret_access_key: SECRET
  base_path: DH-PLAYPEN/storage/input # at least the bucket name is required, sub path within this bucket is optional
  endpoint_url: https://s3.amazonaws.com # optional, defaults to s3.amazonaws.com
  formatter: "{date}/{collection}.{ext}" # optional, defaults to None

If the formatter is not set, no repartitioning is expected to happen and the S3 object key is left intact, same as it is in the source bucket (within the base_path context). Specifying the formatter in the source section only, doesn't result in repartitioning of all object by itself, only those destinations that also have this option specified are eligible for object key modifications.

Destinations key

destinations:
  - aws_access_key_id: KEY_ID
    aws_secret_access_key: SECRET
    base_path: DH-PLAYPEN/storage/output # at least the bucket name is required, sub path within this bucket is optional
    endpoint_url: https://s3.upshift.redhat.com # optional, defaults to s3.upshift.redhat.com
    formatter: "{date}/{collection}.{ext}" # optional, defaults to None
    unpack: yes # optional, defaults to False/no

The endpoint_url defaults to a different value for destination compared to source section. This is due to the usual data origin and safe destination host.

If the formatter is not set, no repartitioning is expected to happen and the S3 object key is left intact, same as it is in the source bucket (within the base_path context). If repartitioning is desired, the formatter string must be defined in the source section as well - otherwise object name can't be parsed properly from the source S3 object key.

unpack option specifies if the gunzipped archives should be unpacked during the transfer. The .gz suffix is automatically dropped from the resulting object key, no matter if the repartitioning is on or off. Switching this option on results in weaker object validation, since the implicit metadata checksum and size checks can't be used to verify the file integrity.

Separate credentials into different files

In case you don't feel like inlining aws_access_key_id, aws_secret_access_key in plaintext into the config file is a good idea, you can separate these credentials into their distict files. If the credentials keys are not found (inlined) in the config, solgate tries to locate them in the config folder (the same folder as the main config file is located).

The credentials file is expected to contain following:

aws_access_key_id: KEY_ID
aws_secret_access_key: SECRET

For source the expected filename is source.creds.yaml, for destinations destination.X.creds.yaml where X is the index in the destinations list in the main config file. For destinations we allow credentials sharing, therefore if destination.X.creds.yaml is not located, solgate tries to load destination.creds.yaml (not indexed).

Full example

Let's have this file structure in our /etc/solgate:

$ tree /etc/solgate
/etc/solgate
├── config.yaml
├── destination.0.creds.yaml
├── destination.creds.yaml
└── source.creds.yaml

And a main config file /etc/solgate/config.yaml looking like this:

source:
  base_path: DH-PLAYPEN/storage/input

destinations:
  - base_path: DH-PLAYPEN/storage/output0 # idx=0

  - base_path: DH-PLAYPEN/storage/output1 # idx=1

  - base_path: DH-PLAYPEN/storage/output2 # idx=2
    aws_access_key_id: KEY_ID
    aws_secret_access_key: SECRET

Solgate will use these credentials:

  • For source the source.creds.yaml is read, because no credentials are inlined
  • For destination idx=0 the destination.0.creds.yaml is used, because no credentials are inlined
  • For destination idx=1 the destination.creds.yaml is used, because no credentials are inlined and there's no destination.1.creds.yaml file
  • For destination idx=2 the inlined credentials are used

The resolution priority:

type priority
source inlined > source.creds.yaml
destination inlined > destination.INDEX.creds.yaml > destination.creds.yaml

Example config file

Here's a full configuration file example, all together.

alerts_smtp_server: smtp.corp.redhat.com
alerts_from: [email protected]
alerts_to: [email protected]
timedelta: 1d

source:
  aws_access_key_id: KEY_ID
  aws_secret_access_key: SECRET
  endpoint_url: https://s3.upshift.redhat.com
  formatter: "{date}/{collection}.{ext}"
  base_path: DH-PLAYPEN/storage/input

destinations:
  - aws_access_key_id: KEY_ID
    aws_secret_access_key: SECRET
    endpoint_url: https://s3.upshift.redhat.com
    formatter: "{collection}/historic/{date}-{collection}.{ext}"
    base_path: DH-PLAYPEN/storage/output

  - aws_access_key_id: KEY_ID
    aws_secret_access_key: SECRET
    endpoint_url: https://s3.upshift.redhat.com
    formatter: "{collection}/latest/full_data.csv"
    base_path: DH-PLAYPEN/storage/output
    unpack: yes

Usage

Solgate is mainly intended for use in automation within Argo Workflows. However it can be also used as a standalone CLI tool for manual transfers and (via extensions) for (TBD) manifest scaffold generation and (TBD) deployed instance monitoring.

List bucket for files ready to be transferred

Before the actual sync can be run, it is required

solgate list
CLI option Config file entry Description
-o Output to a file instead of stdout. Creates a listing file.
timedelta Define a lookup restriction. Only files newer than this value are reported. Defaults to 1 day.

Sync objects

solgate send KEY
CLI option Description
-l, --listing-file A listing file ingested by this command. Format is expected to be the same as solgate list output. If set, the KEY argument is ignored.

Notification service

Send an workflow status alert via email from Argo environment.

Command expects to be passed values matching available Argo variable format as described here.

solgate report

Options can be set either via CLI argument or via environment variable:

  • Options which map to Argo Workflow variables:

    CLI option Environment variable name Value should map to Argo workflow variable Description
    --failures WORKFLOW_FAILURES {{workflow.failures}} JSON serialized into a string listing all the failed workflow nodes
    -n, --name WORKFLOW_NAME {{workflow.name}} Workflow instance name.
    --namespace WORKFLOW_NAMESPACE {{workflow.namespace}} Project namespace where the workflow was executed.
    -s, --status WORKFLOW_STATUS {{workflow.status}} Current status of the workflow execution.
    -t, --timestamp WORKFLOW_TIMESTAMP {{workflow.creationTimestamp}} Workflow execution timestamp.
  • Options which map to config file entries. Priority order:

    CLI option > Environment variable > Config file entry > Default value
    CLI option Environment variable name Config file entry Description
    --from ALERT_SENDER alerts_from Email alert sender address. Defaults to [email protected].
    --to ALERT_RECIPIENT alerts_to Email alert recipient address. Defaults to [email protected].
    --smtp SMTP_SERVER alerts_smtp_server SMTP server URL. Defaults to smtp.corp.redhat.com.
  • Other:

    CLI option Environment variable name Description
    --host ARGO_UI_HOST Argo UI external facing hostname.

Workflow manifests

Additionally to the solgate package this repository also features deployment manifests in the manifests folder. The current implementation of Kubernetes manifests relies on Argo, Argo Events and are structured in a Kustomize format. Environments for deployment are specified in the manifests/overlays/ENV_NAME folder.

Each environment features multiple solgate workflow instances. Configuration config.ini file and selected triggers are defined in instance subfolder within the particular environment folder.

Deploy

Environment deployments are expected to be handled via Argo CD in AI-CoE SRE, however it can be done manually as well.

Local prerequisites:

Already deployed platform and running services:

Build and deploy manifests

kustomize build --enable_alpha_plugins manifests/overlays/ENV_NAME | oc apply -f -

Create a new instance

Will be handled via scaffold in next version!

Prerequisites:

Import GPG keys EFDB9AFBD18936D9AB6B2EECBD2C73FF891FBC7E, A76372D361282028A99F9A47590B857E0288997C, 04DAFCD9470A962A2F272984E5EB0DA32F3372AC

gpg --keyserver keyserver.ubuntu.com --recv EFDB9AFBD18936D9AB6B2EECBD2C73FF891FBC7E A76372D361282028A99F9A47590B857E0288997C 04DAFCD9470A962A2F272984E5EB0DA32F3372AC
  1. Create new folder named after the instance in the selected environment overlay (make a copy of prod/TEMPLATE).

  2. Create a kustomization.yaml file in this new folder with following content, change the NAME to your instance name:

    apiVersion: kustomize.config.k8s.io/v1beta1
    kind: Kustomization
    
    generators:
      - ./secret-generator.yaml
    
    commonLabels:
      app.kubernetes.io/name: NAME
    
    resources:
      - ./cronwf.yaml
  3. Create a secret-generator.yaml file in this new folder with following content:

    apiVersion: viaduct.ai/v1
    kind: ksops
    metadata:
      name: secret-generator
    files:
      - secret.enc.yaml
  4. Create a secret.enc.yaml file in this folder and encrypt it via sops:

    apiVersion: v1
    kind: Secret
    metadata:
      name: solgate-NAME
    stringData:
      source.creds.yaml: |
        aws_access_key_id: KEY_ID_FOR_SOURCE
        aws_secret_access_key: SECRET_FOR_SOURCE
    
      destination.creds.yaml: |
        aws_access_key_id: DEFAULT_KEY_ID_FOR_DESTINATIONS
        aws_secret_access_key: DEFAULT_SECRET_FOR_DESTINATIONS
    
      destination.2.creds.yaml: |
        aws_access_key_id: KEY_ID_FOR_DESTINATION_ON_INDEX_2
        aws_secret_access_key: SECRET_FOR_DESTINATION_ON_INDEX_2
    
      config.yaml: |
        alerts_smtp_server: smtp.corp.redhat.com
        alerts_from: [email protected]
        alerts_to: [email protected]
        timedelta: 5h
    
        source:
          endpoint_url: https://s3.upshift.redhat.com
          formatter: "{date}/{collection}.{ext}"
          base_path: DH-PLAYPEN/storage/input
    
        destinations:
          - endpoint_url: https://s3.upshift.redhat.com
            formatter: "{collection}/historic/{date}-{collection}.{ext}"
            base_path: DH-PLAYPEN/storage/output
            unpack: yes
    
          - endpoint_url: https://s3.upshift.redhat.com
            formatter: "{collection}/latest/full_data.csv"
            base_path: DH-PLAYPEN/storage/output
            unpack: yes
    
          - endpoint_url: https://s3.upshift.redhat.com
            base_path: DH-PLAYPEN/storage/output
    sops -e -i overlays/ENV_NAME/NEW_INSTANCE_NAME/INSTANCE_NAME.env.yaml

    Please make sure the *.creds.yaml entries in the secret are encrypted.

  5. Create cronwf.yaml with following content, please change the name and config variable value to match the secret above:

    apiVersion: argoproj.io/v1alpha1
    kind: CronWorkflow
    metadata:
      generateName: solgate-NAME
      name: solgate-NAME
    spec:
      schedule:
      concurrencyPolicy: "Replace"
      workflowSpec:
        arguments:
          parameters:
            - name: config
              value: solgate-NAME
        workflowTemplateRef:
          name: solgate
  6. Update the resource and patch listing in the overlays/ENV_NAME/kustomization.yaml:

    resources:
      - ...
      - ./NEW_INSTANCE_NAME

Backfill

A backfill job ensures processing of all objects in the source bucket. This job assumes none of the objects were processed before and syncs it all potentially overwriting any changes in the destination bucket.

There's a backfill.yaml available to be submitted directly. Please specify the config parameter before submitting. Value must match a name of a Secret config resource for targeted pipeline.

argo submit -p config=solgate-NAME manifests/backfill.yaml

Workflow parameters

CronWorkflow resource defined for each pipeline instance allows you to define 3 parameters:

Parameter Value Required Description
config string yes Define which config secret to mount to pods and pass to the solgate runtime
is-backfil string (boolean in quotes) no If set to true sync all data in the source bucket. Defaults to false
split string (int in quotes) no Define amount of files that is handled by a single sync pod. If there's more files to sync, the pipeline will spin up additional pods. Defaults to 5000

Developer setup

Local setup

Install pipenv and set up the environment:

pipenv sync -d

Install/enable pre-commit for this project:

pip install -g pre-commit
pre-commit install

Running tests

With local environment set up, you can run tests locally like this:

pipenv run pytest . --cov solgate

Building manifests

Install local prerequisites for kustomize manifests:

Use kustomize build --enable_aplha_plugins ... to build manifests.

CI/CD

We rely on AICoE-CI GitHub application and bots to provide CI for us. All is configured via .aicoe-ci.yaml.

Releasing

If you're a maintainer, please release via GitHub issues. New release creates:

sync-pipelines's People

Contributors

accorvin avatar anishasthana avatar dependabot[bot] avatar durandom avatar gmfrasca avatar harshad16 avatar khebhut[bot] avatar sesheta avatar skateman avatar tumido avatar victoremepunto avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

sync-pipelines's Issues

Solgate as an operator

Overview

Is your feature request related to a problem? Please describe.
Solgate should be accompanied wit an operator capabilities - only a single CRD should be required to define a dataset and set up a new sync pipeline to maintain said dataset in sync.

Describe the solution you'd like
Once a CRD instance is created, the operator would facilitate the initial sync. It will keep the data set up to date. It may be requested to delete the dataset on CRD delete.

Describe alternatives you've considered
n/a

Additional context
Would streamline deployment of solgate. Should be easy to implement via https://github.com/nolar/kopf

Proposal

Define a CustomResourceDefinition that would hold information about the said dataset like name, origin (source), desired local on-cluster location, sync triggers etc.

It can look something like this:

apiVersion: solgate.io/v1alpha1
kind: DataSet
metadata:
  name: my-dataset
  annotations:  # Populated by the operator
    solgate.io/dataset-origin: kaggle
    solgate.io/dataset-name: Original name of the dataset
    solgate.io/dataset-description: Description pulled from origin if available
spec:
  initialSync: true  # Schedule an initial sync of the dataset to destinations
  cleanupOnDelete: false # Delete data from destinations on DataSet object delete

  triggers: # Forwarded to KNative or Argo Events
    - calendar:
        interval: x
        schedule: cron
    - webhook: "..."

  source: # Source with type like S3/Mailing list/Kaggle, etc.
    s3:
      endpoint:
      bucket:
      key:
      accessKeySecret:
        name:
        key:
      secretKeySecret:
        name:
        key:

  destinations: # S3 or PersistentVolume
    - name:
      s3:
        endpoint:
        bucket:
        key:
        accessKeySecret:
          name:
          key:
        secretKeySecret:
          name:
          key:
      ...
status: # Filled in by the operator
  triggers:
    - calendar:
      lastActivated: # timestamp
  destinations:
    - name:
      lastSync: # timestamp
  ...

The operator may (aka a far-fetched road map):

  • schedule the initial sync using minio client mirror job
  • schedule sync pipeline to keep the datasets up to date using different backends (Argo Workflow etc...)
  • trigger individual preprocess pipelines for the destinations via different backends (Argo Workflows, AirFlow...)
  • delete data on DataSet resource delete
  • fire events when dataset was updated
  • if deployed cluster-wide, monitor different DataSet instances in distinct namespaces and coordinate the sync jobs (so we're not pulling the same source for may data scientists at similar times, rather aggregate the destinations and sync them all together - possibility for optimizations like when syncing within the same S3 cluster can be done via object .copy method, which is much faster than copying byte per byte)

As a result, adding a new dataset to the cluster + keeping it up to date is just matter of deploying a single DataSet resource (compared to deploying many manifests for current solgate to spin up a new sync pipeline instance). It can also serve as a base for a "local" dataset catalogue (can be aggregated from annotations).

OOMKilled on large buckets

In case the source bucket contains too many files the lookup pod can be OOM killed. Refactor the s3fs lookup to use enforce generators at all possible places.

Use a per customer configmap instead of a per customer workflow

Currently we store the customer specific workflow in a configmap with prepopulated per-customer default parameters. This workflow is rendered from a template via Ansible.

In order to aicoe-aiops/sync-pipelines-private#7 we need to have those config values loaded from the workflow dynamically from a configmap, since we can't template in Kustomize so the current approach would require a huge copypasting when setting up a new pipeline -> maintenance hell.

Error codes and messages

Describe the bug
When a automated workflow step fail, solgate doesn't allow user to determine the failure reason from metrics or report emails. Please support more granular error codes and messages

Test coverage

Obligatory test coverage request. Include codecov reporting or something.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.