Giter Site home page Giter Site logo

osoceanacoustics / echodataflow Goto Github PK

View Code? Open in Web Editor NEW
4.0 3.0 1.0 260.46 MB

Orchestrated sonar data processing workflow

Home Page: https://echodataflow.readthedocs.io/en/latest/

License: MIT License

Python 28.66% Jupyter Notebook 71.07% Shell 0.20% Dockerfile 0.07%

echodataflow's Introduction

Echodataflow: Streamlined Data Pipeline Orchestration

Welcome to Echodataflow! Echodataflow is a powerful data pipeline orchestration tool designed to simplify and enhance the execution of data processing tasks. Leveraging the capabilities of Prefect 2.0 and YAML configuration files, Echodataflow caters to the needs of scientific research and data analysis. It provides an efficient way to define, configure, and execute complex data processing workflows.

Echodataflow integrates with Echopype, a renowned package for sonar data analysis, to provide a versatile solution for researchers, analysts, and engineers. With Echodataflow, users can seamlessly process and analyze sonar data using a modular and user-friendly approach.

Getting Started with Echodataflow

This guide will walk you through the initial steps to set up and run your Echodataflow pipelines.

1. Create a Virtual Environment

To keep your Echodataflow environment isolated, it's recommended to create a virtual environment using Conda or Python's built-in venv module. Here's an example using Conda:

conda create --name echodataflow-env
conda activate echodataflow-env

Or, using Python's venv:

python -m venv echodataflow-env
source echodataflow-env/bin/activate  # On Windows, use `echodataflow-env\Scripts\activate`

2. Clone the Project

Now that you have a virtual environment set up, you can clone the Echodataflow project repository to your local machine using the following command:

git clone <repository_url>

3. Install the Package

Navigate to the project directory you've just cloned and install the Echodataflow package. The -e flag is crucial as it enables editable mode, which is especially helpful during development and testing. Now, take a moment and let the echodataflow do its thing while you enjoy your coffee.

cd <project_directory>
pip install -e .

4. Echodataflow and Prefect Initialization

To kickstart your journey with Echodataflow and Prefect, follow these simple initialization steps:

4.1 Initializing Echodataflow

Begin by initializing Echodataflow with the following command:

echodataflow init

This command sets up the groundwork for your Echodataflow environment, preparing it for seamless usage.

4.2 Initializing Prefect

For Prefect, initialization involves a few extra steps, including secure authentication. Enter the following command to initiate the Prefect authentication process:

  • If you have a Prefect Cloud account, provide your Prefect API key to securely link your account. Type your API key when prompted and press Enter.
prefect cloud login
  • If you don't have a Prefect Cloud account yet, you can use local prefect account. This is especially useful for those who are just starting out and want to explore Prefect without an account.
prefect profiles create echodataflow-local

The initialization process will ensure that both Echodataflow and Prefect are properly set up and ready for you to dive into your cloud-based workflows.

5. Configure Blocks

Echodataflow utilizes the concept of blocks which are secure containers for storing credentials and sensitive data. If you're running the entire flow locally, feel free to bypass this step.To set up your cloud credentials, configure blocks according to your cloud provider. For detailed instructions, refer to the Blocks Configuration Guide.

6. Edit the Pipeline Configuration

Open the pipeline.yaml file. This YAML configuration file defines the processes you want to execute as part of your pipeline. Customize it by adding the necessary stages and functions from echopype that you wish to run.

7. Define Data Sources and Destinations

Customize the datastore.yaml file to define the source and destination for your pipeline's data. This is where Echodataflow will fetch and store data as it executes the pipeline.

8. Execute the Pipeline

You're now ready to execute your Echodataflow pipeline! Use the echodataflow_start function, which is a central piece of Echodataflow, to kick off your pipeline. Import this function from Echodataflow and provide the paths or URLs of the configuration files. You can also pass additional options or storage options as needed. Here's an example:

Customize the paths, block name, storage type, and options based on your requirements.

from echodataflow import echodataflow_start, StorageType, load_block

dataset_config = # url or path of datastore.yaml
pipeline_config = # url or path of pipeline.yaml
logfile_config = # url or path of logging.yaml (Optional)

aws = load_block(name="<block_name>", type=<StorageType>)

options = {"storage_options_override": False} # Enabling this assigns the block for universal use, avoiding the need for repetitive configurations when employing a single credential block throughout the application.
data  = echodataflow_start(dataset_config=dataset_config, pipeline_config=pipeline_config, logging_config=logfile_config, storage_options=aws, options=options)

License

Licensed under the MIT License; you may not use this file except in compliance with the License. You may obtain a copy of the License here.

echodataflow's People

Contributors

sohambutala avatar lsetiawan avatar valentina-s avatar leewujung avatar pre-commit-ci[bot] avatar

Stargazers

Jay Patel avatar Kevin George avatar Szymon Kowalewski avatar echoxiangzhou avatar

Watchers

 avatar  avatar  avatar

Forkers

sohambutala

echodataflow's Issues

Stage Dependency Functionality

  1. Add function to extract dependency data from json ( currently only in apply_mask function )
  2. Validation to check the flow and its dependencies and take input based on yaml definition

Echoflow package dependencies

While installing the required dependencies, the 'echoflow' package is also automatically installed. As a result, when attempting to edit or create new flows, the system fails to recognize the changes made to the 'echoflow' package.

Cloud Credentials and Prefect Profiles

Echoflow now employs Prefect Blocks to create AWS, AZ Cosmos and Prefect credentials. Storage options can now be configured by specifying the type and block_name instead of passing a dictionary (refer sample below). For the accepted types refer echoflow.config.models.datastore.StorageType.

Configure in datastore.yaml or pipeline.yaml :

storage_options: block_name: echoflow-aws-credentials type: AWS

To access or view all the blocks configured, login to your prefect dashboard on cloud or local dashboard and refer the echoflow-config block.

Logging Testing

Write test suite for various testing options available including some erroneous files.

Prefect Rate Limit for Cloud Account (Free Tier)

While processing the extensive array of raw files tied to the EK60 system for survey SH1707, a challenge emerged. Among the 4343 files, certain ones triggered errors due to rate limitations within the Prefect cloud API's free tier. To counteract this, I instituted a solution involving task retries. This mechanism, applied individually to each task, enables the system to automatically retry failed tasks in the presence of rate limitations.

`echoflow rules --add` command incorrectly appends rules without new lines

Description

The echoflow rules --add command does not insert new rules on new lines. Instead, it appends new rules directly to the end of the last rule. This formatting issue prevents the system from parsing and applying new rules correctly.

Expected Behavior

When using the echoflow rules --add command, each new rule should be added on a separate line to ensure correct parsing and application.

Actual Behavior

New rules are appended to the end of the existing rule line, leading to a continuous string of rules that the system cannot interpret or apply.

Steps to Reproduce

  1. Execute the echoflow rules --add command to add a new rule.
  2. Add another rule using the same command.
  3. Check the rules configuration file or output, noting that the second rule is appended directly to the first without a newline in between.

Possible Solution

Modify the rule addition logic in the echoflow rules --add command to ensure that each new rule is added on a new line.

Logging Documentation

Need documentation to cover logging options available and how to troubleshoot (what keywords to look for).

Retry Mechanism for Echoflow (between two successive runs)

Prefect offers a convenient way to enable retries for flows or tasks in case of failure, which can be configured in the pipeline.yaml Issue using the "retries" options. Several options are being considered for storing the retry metadata:

  1. Local SQLite3 tables stored in a semi-structured format.
  2. Parquet tables stored on cloud vendors.
  3. Utilizing a REST API to retrieve metadata from the Prefect server.

Release on PyPI

Prepare and release the current project version on the Python Package Index (PyPI).

Update credential block configuration to read from a file

Overview

After some discussion over zoom. There's now an agreement that user should setup their credentials to a credential file using the ini format, and that the program should read from that file and create blocks when it doesn't exists. This functionality can leverage the python built-in configparser library to read these files

INI Specification

[DEFAULT] # default section, shouldn't go into a block
provider = aws # default provider

[my-aws-block1] # section header, which is the unique block name as well
# This block will assume the default provider of aws, so should require 'aws_key' and 'aws_secret'
# else put default of None!
aws_key = SOMEAWSKEY
aws_secret = MY$up3rSecretK33Y

[my-azure-block1]
# This block will need to check for 'connection_string' if azure
provider = "azure" # will trump the default provider
connection_string = "az://blah@blah:blah"

Behavior

  1. The initial config should look and read from a file called credentials.ini in ~/.echoflow/ directory
  2. It will go through the sections in ini file and create the appropriate blocks for it
  3. If a user rotates a key, they should create a new block by adding a section in the ini and run the configuration step again, otherwise a user can simply go to the Prefect UI and update the block values there

Expected function

  • echoflow_config: A function to configure the credentials by reading from a file as described above
    • sync: A bool argument to echoflow_config. This will read the latest values from the existing blocks within prefect and update the local credential files for the respective sections... additionally, for any new sections, new blocks will be created.

Echoflow Design

I have started a Figma design to get more understanding of the flow of the functions and make validations more robust.

Figma Prototype

Desired output directories from `echoflow_open_raw`

The flow echoflow_open_raw will generate the following directories:

  • in datastore.yml we add an argument to specify where the .raw files are saved to
  • the raw-converted zarr files will be saved to {working_directory}/{flow_name}/{subgroup_txt_filename} (i.e. 1 folder per subgroup txt file)
    • flow_name default is echoflow_open_raw

When multiple subgroup txt files are in the input, they will be passed as a zip file.

#30 is a temporary patch.

Clean up outdated folders/notebooks

There are now some outdated folders and notebooks in the repo, which may be confusing to a new user jumping in and trying to run some notebooks. A few of them are (I believe):

  • jupyterbook (now things are moved to docs)
  • notebooks (some scratch and testing notebooks: maybe indicate that in the name)
  • config/logging_config.yaml (I think out of date)

and possibly some other ones, so it would be good to check.

Package Reorganization

Structuring packages in individual stages and not combining them into levels to keep things more modular.

Prefect Serialization limitation

Prefect serializes parameters before passing onto the next flow/task. There is a limit of 512KB for the size of these serialized parameters. Due to this limitation, passing data (json with out_path and other metadata) are now being stored locally under .echoflow directory and deleted once the function is executed.

Add deployment script for local service

Right now we have bash Script to deploy prefect to an instance https://github.com/OSOceanAcoustics/echoflow/blob/dev/deployment/deploy_echoflow_worker.sh
The above deploys all the code on the ec2 and starts an prefect service. (not triggering anything)
After that we can trigger the run from the dashboard (or otherwise one needs to log in the instance and run it from a notebook).

The current script is for running service through the Prefect cloud dashboard. We may want this for our own local service.

Dynamic Module Import and Function execution

Module name and function which needs to be called need to be mentioned in the yaml file. For now, I am matching the names and executing the functions.
Below is the structure revised for the workaround.

active_recipe: standard
pipeline:
- recipe_name: standard
  generateGrpah: false
  stages:
  - name: open_raw # Name of Function
    module: echoflow.stages_v2.Open_Raw # Module
    saveOutput: true
    parallelize: true
    externalParams:
      sonar_model: EK60
      xml_path: s3//
  - name: compute_Sv

Dask vs Prefect-Dask

Which option might be better to use here? I saw that Don implemented Dask separately but since Prefect already has dask integration should we consider dask separately?

Rename compute_SV -> compute_Sv?

@Sohambutala in acoustics there is difference in meaning between S_v and s_v. So that it is not confusing it is best to rename SV to Sv (and it is common to be written like that and Target Strength being capitalized TS). It will align with echopype's compute_Sv.

Dask Code Debug

Unable to debug code related to dask and running into the below issue.
Couldn't find a debug adapter descriptor for the debug type 'python' (the extension might have failed to activate)

Distributed Logging

Logging in distributed Dask environments presents unique challenges due to each worker spinning up a new Python environment, leading to the loss of any logging mechanisms established in the master node.

To address this, we're considering several approaches:

Centralized Logging with AWS CloudWatch: This is the simplest solution that would allow us to centralize all logs for easy access and analysis.

Utilizing Dask Worker Streams for Echoflow Logs: If maintaining the exact order of logs from workers is not crucial, configuring dask worker streams to handle Echoflow logs can be a straightforward fix.

Advanced Logging with Kafka and Elastic Stack: For those with the infrastructure to support it, leveraging Kafka for log aggregation and Elastic Stack for log analysis and visualization offers a robust solution.

Each of these solutions has its own set of trade-offs in terms of complexity, infrastructure requirements, and the precision of log ordering. We're exploring these options to improve our logging framework in distributed Dask setups.

Transect file processing fails without specific file extensions

Description

The current implementation expects input filenames in transect groups to have specific extensions like .raw. If the transect files lack these expected extensions, none of the files get processed, leading to a complete halt in file handling.

Expected Behavior

The application should handle transect files regardless of their file extensions, or provide clear error messages if specific extensions are required.

Actual Behavior

Transect files without .raw or similar extensions are completely ignored by the process, resulting in no files being processed.

Steps to Reproduce

  1. Prepare a set of transect files without any file extensions.
  2. Run the file processing function.
  3. Observe that none of the files are processed.

Possible Solution

Modify the file validation logic to either accept files without extensions or enhance the documentation and error messages to clearly specify the required file extensions.

Additional Context

Splitting existing flows

Splitting the existing flows to make a separate flow for each individual function in order to reduce the dependency.

Invalid Argument - Using Dask

Ran into an issue while using dask client gather where the code worked perfectly fine when executing it sequentially. Initially thought it might be something related to IO access since we are dealing with local files, but doesn't look like the case since switching to prefect-dask worked fine.

Windows Setup

Ensure compatibility and setup procedures for Windows.

Module import issue

Unable to import modules while executing the HakeFlowDemo.ipynb.

Note: this was performed after uninstalling the echoflow dependency.

No log prints after Open_Raw

I noticed that logs for "open_raw" and subsequent functions are not being recorded after the initial flows. Despite using explicit loggers, there seems to be no logging activity. This issue occurs within the flow that triggers Dask and parallelly executes tasks.

Issue with newline character handling in _extract_from_zip and _extract_from_text functions

Description

The _extract_from_zip and _extract_from_text functions currently expect filenames in a transect group to be formatted without newline characters. This expectation is problematic because typical zip operations and file generation processes often include newline characters in filenames.

Expected Behavior

The functions should be able to correctly handle filenames that include newline characters.

Actual Behavior

When newline characters are included in filenames, the functions do not process the files correctly.

Steps to Reproduce

  1. Create a zip file with filenames that include newline characters.
  2. Attempt to process the zip file using the _extract_from_zip function.
  3. Observe that the function fails to handle the files correctly.

Possible Solution

Adjust the filename parsing logic in both functions to strip newline characters or appropriately handle them during processing.

Additional Context

Writing to specified path is not permitted

The attempt to store the output of the open_raw function as a Zarr file using the to_zarr function is failing because there is no permission to write to the specified path.

Error while executing HakeFlowDemo.ipynb

'TypeError("open_raw() got an unexpected keyword argument 'offload_to_zarr'")'

The Echopype dependency installed with echoflow is the latest one which does not have a keyword argument 'offload_to_zarr'.

Dcoker Image with PIP

Implemented a Docker container with Echoflow installed via pip install echoflow, using Test PyPI. This setup requires fetching setuptools from Test PyPI.

Failed Resolutions :

  1. Adding PyPI as an extra index:
  • Placed before Test PyPI, it installs an incorrect Echoflow version.
  • Placed after Test PyPI, it fails to find setuptools.
  1. Using --no-deps doesn't prevent the installation of setuptools as required by Echoflow's pyproject.toml.

@leewujung @valentina-s

Generate Stage boilerplate code

Add CLI capabilities to generate boilerplate code required to create a stage

  • gs: Helps create boilerplate code for a specific stage.
    Arguments:
    - stage_name: Name of the stage for which to generate boilerplate code.

  • To create boilerplate code for a specific stage:

echoflow gs <stage_name>

Jupyter Book

  1. Detailed config sections after getting started
  2. Reference pointers for complete documentation on both config files
  3. Copyright year change on build release
  4. Add links to example configurations in complete documentation
  5. Add presentation content wherever applicable

Standardized Output for Internal Stage Processing

I am trying to come up with a standard output for each stage such that it will just be an additional wrapper to deal with the issue of passing parameters missing between two functions. For now, I am assuming that passing parameters can be managed in a dictionary which can then be coupled with the externalParams of pipeline configuration.

class Output(BaseModel):
    data: Any
    passing_params: Dict[str, Any] = {}

Echopype Function Input and Output

@leewujung , Since the functions are sequentially executed will it be safe to assume that some function's output will always be served as an input to the next function?

For example, we call open_raw and use its output in the combine_echodata function and then use it to compute_Sv. If function arguments will change then it might need some extra handling in the master flow.

Config specifications

Overview

In order to be able to pass in specifications for the underlying processing after stage 0, there needs to be a mechanism to do so. OSOceanAcoustics/echopype#817 has the proposed subpackages, so below are the proposed specifications that can happen:

Spec

name: (str) Name of the full pipeline must be unique
sonar_model: (str) The sonar model, can only be the available ones in echopype
raw_regex: (str) The regular expression of the pattern for raw files
args: # input arguments for raw files
  urlpath: (str) urlpath to the input raw file... can be a jinja template where the values retrieved form parameters below
  parameters: {} # (dict) Set default parameter values as found in urlpath
  storage_options: {} # (dict) fsspec filesystem storage options for the source
  transect: # **Optional** field, if exists it indicates that converted files should be organized by transect (yyyy/transect)
    file: (str) urlpath to transect files
    storage_options: {} # (dict) fsspec filesystem storage options for the transect files
output:
  urlpath: (str) The urlpath to the output converted raw file
  storage_options: {} # (dict) fsspec filesystem storage options for the output where files will be stored
  overwrite: (bool) Flag to allow for overwriting or not

# Below are echopype specifications, if not provided will be using defaults
echopype:
  consolidate:
    add_location: {}
    add_splitbeam_angle: {}
  calibrate:
    compute_Sv:
      env_params: {}
      cal_params: {}
      waveform_mode: {}
    compute_TS: {}
  filter:
    median: {}
    conv: {}
    remove_noise: {}
    noise:
      estimate_noise: {}
      mean_bkg: {}
      spike: {}
  unify:
    compute_MVBS: {}
    compute_MVBS_index_binning: {}
    regrid_Sv: {}
  mask:
    from_Sv:
      freq_diff: {}
    from_labels:
      boundary: {}
      region: {}
  metrics:
    summary statistics: {}
    compute_NASC: {}

Rule Modification For Better Flow Control

Add CLI capabilities to add and cleanup defined rules.

  • rules: View, add, or import flow rules from a file.
    Options:
    --add: Add a new rule interactively. Requires input in parent_flow:child_flow format.
    --add-from-file: Path to a file containing rules to be added. Each rule should be on a new line in parent_flow:child_flow format.

  • To add a new rule interactively:

echoflow rules --add
  • To import rules from a file:
echoflow rules --add-from-file

Without any options, configured rules will be displayed and duplicate rules will be removed.

Change `.raw` and raw-converted `.zarr` files directory structure

Currently with the echoflow_open_raw:

  • the raw-converted .zarr files are grouped under the {working_directory}/echoflow_open_raw/{transect_name} (i.e. one folder per "transect", or subgroup txt files)
  • the .raw files are downloaded and saved to {working_directory}/echoflow_open_raw/{transect_name}_raw

We would like to change this to be such that the .raw and raw-converted .zarr files live in the following directory structure:

  • {working_directory}/echoflow_open_raw/{year}: this contains the raw-converted .zarr files
  • {working_directory}/echoflow_open_raw/{year}_raw: this contains the .raw files
    • we will move these files somewhere else later

Code Design for intermediate processing steps requiring external or dependent parameters

I was searching for sample use cases for applying masks in echoflow. Specifically, we're utilizing the frequency_differencing function and employing it as a mask. The challenge arises from our utilization of lists for freqAB and chanAB, and echoflow using YAML configuration. The question is: how should I accommodate the input, considering that these lists are generated from the "sv" output?

Here's an example of how we're currently approaching this:

freqAB = list(out_ds.frequency_nominal.values[:2]) freqdiff_da = ep.mask.frequency_differencing(source_Sv=out_ds, freqAB=freqAB, operator=">", diff=5)

This is merely a singular instance; there could exist additional functions that also rely on preceding data. An illustration of this is the add_location function, which requires both an xarray dataset and an echodata object to perform a location update.

@leewujung @lsetiawan @valentina-s

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.