Giter Site home page Giter Site logo

irods_capability_automated_ingest's Introduction

iRODS Automated Ingest Framework

The automated ingest framework gives iRODS an enterprise solution that solves two major use cases: getting existing data under management and ingesting incoming data hitting a landing zone.

Based on the Python iRODS Client and Celery, this framework can scale up to match the demands of data coming off instruments, satellites, or parallel filesystems.

The example diagrams below show a filesystem scanner and a landing zone.

Automated Ingest: Filesystem Scanner Diagram

Automated Ingest: Landing Zone Diagram

Quick Start

Configure python-irodsclient environment

python-irodsclient (PRC) is used by the Automated Ingest tool to interact with iRODS. The configuration and client environment files used for a PRC application apply here as well.

If you are using PAM authentication, remember to use the Client Settings File.

iRODSSessions are instantiated using an iRODS client environment file. The client environment file used can be controlled with the IRODS_ENVIRONMENT_FILE environment variable. If no such environment variable is set, the file is expected to be found at ${HOME}/.irods/irods_environment.json. A secure connection can be made by making the appropriate configurations in the client environment file.

Starting Redis Server

Install Redis server: https://redis.io/docs/latest/get-started

Starting the Redis server with package installation:

redis-server

Or, dameonized:

sudo service redis-server start
sudo systemctl start redis

The Redis GitHub page also describes how to build and run Redis: https://github.com/redis/redis?tab=readme-ov-file#running-redis

Setting up virtual environment

You may need to upgrade pip:

pip install --upgrade pip

Install virtualenv:

pip install virtualenv

Create a virtualenv with python3:

python -m venv rodssync

Activate virtual environment:

source rodssync/bin/activate

Install this package

pip install irods_capability_automated_ingest

Set up environment for Celery:

export CELERY_BROKER_URL=redis://<redis_host>:<redis_port>/<redis_db> # e.g. redis://127.0.0.1:6379/0
export PYTHONPATH=`pwd`

Start celery worker(s):

celery -A irods_capability_automated_ingest worker -l error -Q restart,path,file -c <num_workers>

Note: Make sure queue names match those of the ingest job (default queue names shown here).

Using the sync script

Start sync job

python -m irods_capability_automated_ingest.irods_sync start <source_dir> <destination_collection>

List jobs

python -m irods_capability_automated_ingest.irods_sync list

Stop jobs

python -m irods_capability_automated_ingest.irods_sync stop <job_name>

Watch jobs (same as using --progress)

python -m irods_capability_automated_ingest.irods_sync watch <job_name>

System Overview

Celery tasks

The Celery application for the Automated Ingest framework handles a few different Celery tasks. The Celery tasks are affiliated with a main task which is enqueued by the irods_sync script. The main task is also called the overall sync "job".

Celery queues

There are 3 Celery queues used by the Automated Ingest framework: restart, path, and file.

The restart queue is for the main task which enqueues the first path sync task. This is the initial task enqueued by the irods_sync script. It is called the restart queue because the restart task can be restarted if it is run periodically.

The path queue is used for the task which iterates over the entries in the source storage. For each entry in the source storage system, other asynchronous tasks are enqueued to sync the data to iRODS. In the case of a filesystem, recursively syncing a sub-directory involves enqueueing a separate, asynchronous task on the path queue. For individual files or objects, tasks are composed of syncing a number of files controlled by the --files_per_task option.

The file queue is used for tasks which sync data and directories (where applicable) from the source storage. The tasks in this queue involve syncing data from a number of files or objects controlled by the --files_per_task option.

The Celery queue names can be adjusted with some options available to the irods_sync script. This is useful if you want to use the same Redis database for multiple simultaneous sync jobs. For instance, if you have Celery workers dedicated to different parts of your infrastructure, the tasks for each system could be separated by giving them unique names and having the different Celery workers watching the queues for the system for which they are responsible.

Celery message broker

The Automated Ingest framework uses Redis as a message broker, leveraging redis-py (see https://pypi.org/project/redis). The message broker is used to coordinate Celery workers and store the task queues.

Automated Ingest and Redis

In addition to acting as the message broker for Celery, the Redis database is used by the Automated Ingest framework as a cache of information about previously synced paths. This offers the advantage of being able to sync the same path repeatedly while only needing to do work (e.g. contact iRODS) for paths which have changed.

The Automated Ingest framework also uses Redis to store any custom event handlers which are fetched by the Celery workers to customize behavior. This is useful because the custom event handler files do not need to exist on every Celery worker node - just the host running the irods_sync script.

irods_sync

The irods_capability_automated_ingest.irods_sync script is used to manage sync jobs. The script has 4 subcommands: start, stop, list, and watch. This script does not need to be run on the same host as the Celery workers and can be used as the "starting point" for users wishing to schedule sync jobs.

Event Handler file

The Event Handler file is a Python file that is imported as a module by Celery workers and used to modify and extend the behavior provided by the Automated Ingest framework. The contents of the Python file are stored in Redis so that the file needs only to exist locally to the irods_sync script rather than on every host on which Celery workers are running. Most supported event handler methods have example implementations in the irods_capability_automated_ingest/examples directory. More information about the methods which can be implemented in an Event Handler file can be found in Event Handler behavioral overrides and Event Handlers.

meta dict

Almost all of the Celery tasks, internal functions, and event handler methods take a parameter called meta. meta is a Python dict which holds the context surrounding an ingest job as well as information for specific tasks. While meta holds a lot of different information that is useful at various times, there are a few very important keys which would be good to note.

root

The root key maps to the path in the source storage that is being synced. Let's say that the irods_sync script being run had the following options:

python3 -m irods_capability_automated_ingest.irods_sync start \
    /source/data /tempZone/home/rods/data

The value of meta["root"] would be "/source/data". This value does not change for the duration of the overall sync job or in any constituent task.

path

The path key maps to the current path on the source storage which is being synced. The value changes in different tasks depending on what is being synced. For most cases, path values will always be a subdirectory or file under the path to which root is mapped.

If a file is being synced from a filesystem, the path key maps to the path of that file. If a directory is being synced from a filesystem, the path key maps to the path of that directory.

Let's say that the irods_sync script being run had the following options:

python3 -m irods_capability_automated_ingest.irods_sync start \
    /source/data /tempZone/home/rods/data

If a file at the path /source/data/subdir/file1 is being synced, /source/data/subdir/file1 would be the value of path.

If a path being synced requires modification due to a character_map Event Handler method or cannot be represented due to a UnicodeEncodeError, path will be updated accordingly.

target

The target key maps to the current logical path in iRODS to which data is being synced. The value changes in different tasks depending on what is being synced.

If a file is being synced from a filesystem, the target key maps to the logical path of the data object to which the data is being synced. If a directory is being synced from a filesystem, the target key maps to the logical path of the collection to which the data is being synced.

The iRODS logical path used as a value for target is constructed based on the path of the source storage being synced. Let's say that the irods_sync script being run had the following options:

python3 -m irods_capability_automated_ingest.irods_sync start \
    /source/data /tempZone/home/rods/data

If a file at the path /source/data/subdir/file1 is being synced (note: this would be the value of path at this point), /tempZone/home/rods/data/subdir/file1 would be the value for target.

job_name

The job_name key maps to the name of the overall sync job to which the current task being executed belongs. This can be useful for fetching information about the sync job from the Redis database, or for annotating metadata regarding the sync job to data objects or collections.

restart_queue, path_queue, and file_queue

The restart_queue, path_queue, and file_queue keys map to the queue names for the restart, path, and file queues.

config

The config key maps to another Python dict which holds configuration information about the sync job. There are 3 keys in this dict.

The log key configures the Python logger used by Celery workers. See sync_logger below for more information about the log configuration.

The profile key configures information for the profile feature. Its value is a dict similar to the log configuration whose keys correspond to the --profile_filename, --profile_when, --profile_interval, and --profile_level options of the irods_sync script. See Profiling options for more information.

The redis key configures communication with the Redis database. Its value is a dict that looks like this:

{
    // ... Other stuff ...

    "config": {
        // ... Other stuff ...

        "redis": {
            "host": "<string>",
            "port": <int>,
            "db": <int>
        }

        // ... Other stuff ...
    }

    // ... Other stuff ...
}

The host, port, and db correspond to the --redis_host, --redis_port, and --redis_db options for the irods_sync script. See Redis configuration options for more information.

sync_logger

The sync_logger is the logger used by the Celery workers while executing tasks. The client submitting a job (e.g. the irods_sync script) configures the logger in meta. This is what the configuration looks like in meta:

{
    // ... Other stuff ...

    "config": {
        // ... Other stuff ...

        "log": {
            "filename": "<string>" or None,
            "log_when": "<string>" or None,
            "interval": "<string>" or None,
            "level": "<string>" or None
        }

        // ... Other stuff ...
    }

    // ... Other stuff ...
}

Each member of the dict corresponds with the Logging Options described below.

To instantiate the sync_logger for use, do something like this:

# Get the "config" from meta...
config = meta["config"]
# Get the "log" from the config...
log_config = config.get("log")
# Get the sync_logger based on the log_config.
logger = sync_logging.get_sync_logger(log_config)

The returned logger can then be used to log messages in a standard way. Note: This should only be done inside a Celery task or Event Handler method.

irods_sync start

irods_sync start is used to start a sync job by enqueueing the main task of the overall sync job. The main task is run asynchronously by default, but can be run synchronously with the --synchronous option.

This subcommand has 2 required positional arguments: a source path and a destination path.

The source path must point to a path for a supported storage type. Right now, filesystems (e.g. UNIX, Windows, etc.) and S3 buckets are supported.

The destination path must point to an iRODS collection. The root collection (/) cannot be used as the destination collection.

Celery task options

These options are used to control various aspects of the sync job and resultant individual Celery tasks.

In this section, wherever a word with all uppercase letters follows an option name, it is meant to signify a placeholder for a value meant to accompany the option. For example, if the option name is --option, --option VALUE means that VALUE is the placeholder for the value to be provided.

--job_name

--job_name JOB_NAME will use the name JOB_NAME as the reference name for the overall sync job which is used to track individual tasks spawned from the main task. The default name for the sync job is a generated UUID.

--restart_queue

--restart_queue RESTART_QUEUE_NAME will use the name RESTART_QUEUE_NAME for the restart queue. The default name for the restart queue is "restart". Remember: The restart queue is where the task for the main sync job is enqueued.

--path_queue

--path_queue PATH_QUEUE_NAME will use the name PATH_QUEUE_NAME for the path queue. The default name for the path queue is "path".

--file_queue

--file_queue FILE_QUEUE_NAME will use the name FILE_QUEUE_NAME for the file queue. The default name for the file queue is "file".

--interval

--interval INTERVAL_IN_SECONDS will enqueue - or, "restart" - the main task of the sync job repeatedly with an interval of INTERVAL_IN_SECONDS seconds. If absent, the sync job will only be run once. To stop a recurring job, irods_sync stop JOB_NAME can be used on the job specified by JOB_NAME. Note: Scheduling sync jobs using a dedicated job scheduling tool such as cron has been shown to be very effective as an alternative to this option. Consider using an external tool for scheduling regularly occurring sync jobs.

--ignore_cache

--ignore_cache will ignore the cache maintained in Redis by the Automated Ingest framework. The sync tasks will not consult the Redis cache to "sync" data but rather will treat all data as new, even if it is not. This is useful if you wish to scan a source after having already scanned it but need to sync the data to the same destination again. Notice that this option will not work with certain Operation types (e.g. PUT).

--event_handler

--event_handler /path/to/event_handler.py will use the specified /path/to/event_handler.py Python file as a custom event handler to modify the default behavior of the Automated Ingest framework as well as allow for other custom behavior to be implemented. The event handler methods are detailed below in Event Handler behavioral overrides and Event Handlers.

Script options

These options are used to control various aspects of the irods_sync script itself.

--synchronous

--synchronous specifies that the irods_sync script should not exit until the scheduled sync job is complete. By default, the irods_sync script will exit as soon as the main sync job Celery task has been enqueued.

--progress

--progress specifies that a progress bar and Celery task counts associated with the enqueued sync job should be shown. This option requires the --synchronous option in order to have any effect.

Optimization options

These options are used for optimizing job completion for speed and resilience.

--files_per_task

--files_per_task NUMBER_OF_FILES_PER_TASK will use NUMBER_OF_FILES_PER_TASK as the number of files to sync in a given Celery task. A value of 1 means that 1 Celery task will be enqueued for every file or object that is synced. This number must be a positive integer. The default value is 50 files per task due to experiments which found that this number of files per task optimally balances the amount of work required of a worker by each task against the overhead of completing the task and fetching a new one from the queue. Your results may vary, so adjust the value based on your own use case.

--initial_ingest

--initial_ingest specifies that the destination iRODS path to which data is being synced does not need to be checked before syncing the data. Use this option on the first time a sync occurs to a new destination iRODS path to avoid the unnecessary calls to the iRODS API. By default, the destination iRODS path is checked to ensure that it does not already exist in order to avoid unnecessary syncs or syncs which would result in errors (e.g. a PUT operation to an existing path will not overwrite the data - it will produce an error).

--irods_idle_disconnect_seconds

--irods_idle_disconnect_seconds SECONDS_BEFORE_IDLE_DISCONNECT will disconnect an iRODS session after SECONDS_BEFORE_IDLE_DISCONNECT seconds of being idle. This is useful for refreshing iRODS connections in the pool maintained by the Automated Ingest framework. The default value is 60 seconds.

Selective syncing options

These options are used to filter out certain files and directories from being synced to iRODS. These are useful if a type of file or name of a directory / file is unwanted in iRODS for whatever reason.

--exclude_file_type

--exclude_file_type FILE_TYPE will exclude any file detected as being of type FILE_TYPE. FILE_TYPE must be one of the following strings:

  • regular: This excludes all files which are not one of the rest of the list of supported file types.
  • directory: This excludes any directories found in the source path. Note: This will exclude any subdirectories of the source filesystem, effectively making the sync non-recursive.
  • character: This excludes any character device files. See Character Special File documentation for more information.
  • block: This excludes any block device files. See Block Special File documentation for more information.
  • socket: This excludes any socket files. See Socket documentation for more information.
  • pipe: This excludes any named pipe files (FIFOs). See FIFO Special File documentation for more information.
  • link: This excludes any symbolic links. See Symbolic Links documentation for more information.

The way the Automated Ingest framework tests for these types of files follows the process described here: Testing the Type of a File

The default behavior is to include all types of files; or, put another way, the default behavior is to exclude no file types. This option is only valid for filesystem syncs.

--exclude_file_name

--exclude_file_name EXCLUDE_FILES_MATCHING_REGEX will exclude any file whose path matches the regular expression specified by EXCLUDE_FILES_MATCHING_REGEX. See the Python Regular Expression Syntax documentation for more information.

--exclude_directory_name

--exclude_directory_name EXCLUDE_DIRECTORIES_MATCHING_REGEX will exclude any directory whose path matches the regular expression specified by EXCLUDE_DIRECTORIES_MATCHING_REGEX. See the Python Regular Expression Syntax documentation for more information. Note: This will exclude syncing the directory and any of its files and subdirectories. This option is only valid for filesystem syncs.

Redis configuration options

These options can be used in lieu of the CELERY_BROKER_URL environment variable to point the irods_sync script at a particular Redis instance.

--redis_host

--redis_host REDIS_HOSTNAME will use REDIS_HOSTNAME as the hostname for the Redis database with which the irods_sync script will communicate to submit the main task for the sync job. The default REDIS_HOSTNAME used in the absence of any other input is localhost.

--redis_port

--redis_port REDIS_PORT will use REDIS_PORT as the port number on which the Redis database is listening for incoming requests. The default REDIS_PORT used in the absence of any other input is 6379, which is the official default Redis port number.

--redis_db

--redis_db REDIS_DB will use REDIS_DB as the database number to which the irods_sync tasks will be submitted. The default REDIS_DB used in the absence of any other input is 0, which is the official default Redis database number.

S3 options

These options are used to specify that irods_sync is using an S3 bucket as a source. In order to sync an S3 bucket, the --s3_keypair option and a source path of the form /bucket_name/path/to/root/folder are the minimum requirements.

--s3_keypair

--s3_keypair /path/to/s3_keypair will use /path/to/s3_keypair to authenticate with the S3 endpoint where the source bucket exists. There is no default value for this option and it must be specified when syncing an S3 bucket. The keypair file must exist on a filesystem accessible from the host running the irods_sync script and should have exactly the following form:

ACCESS_KEY
SECRET_KEY

ACCESS_KEY should be the S3 access key (sometimes this is used as a username). SECRET_KEY should be the S3 secret key (sometimes this is used as a password).

--s3_endpoint_domain

--s3_endpoint_domain S3_ENDPOINT_DOMAIN will use S3_ENDPOINT_DOMAIN as the domain name of the S3 endpoint. The default value is "s3.amazonaws.com", which would indicate an AWS bucket.

--s3_region_name

--s3_region_name S3_REGION_NAME will use S3_REGION_NAME as the region name for the S3 endpoint. The default value is "us-east-1".

--s3_proxy_url

--s3_proxy_url S3_PROXY_URL will use S3_PROXY_URL as the URL for a proxy host for S3 access. This could be useful if your organization requires S3 access through a centralized location.

--s3_insecure_connection

--s3_insecure_connection specifies that SSL should not be used when connecting to the S3 endpoint. The default value is False, which means that SSL is used for S3 endpoint communications when this option is not specified. This option is not recommended for use in production deployments.

Logging options

--log_level

--log_level LOG_LEVEL specifies that only messages of level LOG_LEVEL or higher should be logged by the irods_sync script. LOG_LEVEL must be one of the following string values (listed in ascending order):

  • NOTSET
  • DEBUG
  • INFO
  • WARNING
  • ERROR
  • CRITICAL

By default, only messages of level WARNING or higher will be logged.

Celery worker log messages are not affected by this option. To specify a log level for the Celery workers, the --loglevel option should be used when running the worker. See Celery documentation for more information.

--log_filename

--log_filename /path/to/log_file.txt will use /path/to/log_file.txt as the destination for log messages emitted by the irods_sync script. The default value is None, which means that log messages go to stdout by default.

Celery worker log messages are not affected by this option. To specify a log file for the Celery workers, the --logfile option should be used when running the worker. See Celery documentation for more information.

--log_when

--log_when WHEN will use WHEN as the value passed to the when parameter of the TimedRotatingFileHandler used for managing log files. See Python TimedRotatingFileHandler documentation for more information. This option only has an effect if --log_filename is used.

--log_interval

--log_interval INTERVAL will use INTERVAL as the value passed to the interval parameter of the TimedRotatingFileHandler used for managing log files. See Python TimedRotatingFileHandler documentation for more information. This option only has an effect if --log_when is used. The default value is 1.

Profiling options

--profile allows you to use vis to visualize a profile of Celery workers over time of ingest job.

option effect default
profile_filename Specify name of profile filename (JSON output) None
profile_level Minimum level of message to log for profiling None
profile_interval Time interval with which to rollover ingest profile file None
profile_when Type/units of profile_interval (see TimedRotatingFileHandler) None

Event Handler behavioral overrides

The contents of the Event Handler Python file are stored in Redis so that the file needs only to exist locally to the irods_sync script rather than on every host on which Celery workers are running. Event handler implementation examples can be found in the examples directory.

Event handler methods can use logger to write logs. See structlog for available logging methods and signatures.

This section details methods which override default behaviors of the Automated Ingest framework such as retry logic and target resources for replicas. The methods for the actual Event Handlers is detailed in the Event Handlers section.

character_map

If an application should require that iRODS logical paths produced by the ingest process exclude subsets of the range of possible Unicode characters, we can add a character_map method that returns a dict object. For example:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def character_map():
        return {
            re.compile('[^a-zA-Z0-9]'):'_'
        }
    # ...

The returned dictionary, in this case, indicates that the ingest process should replace all non-alphanumeric (as well as non-ASCII) characters with an underscore wherever they may occur in an otherwise normally generated logical path. The substition process also applies to the intermediate (ie collection name) elements in a logical path, and a suffix is appended to affected path elements to avoid potential collisions with other remapped object names.

Each key of the returned dictionary indicates a character or set of characters needing substitution. Possible key types include:

  1. character
    # substitute backslashes with underscores
    '\\': '_'
  1. tuple of characters
    # any character of the tuple is replaced by a Unicode small script x
    ('\\','#','-'): '\u2093'
  1. regular expression
    # any character outside of range 0-256 becomes an underscore
    re.compile('[\u0100-\U0010ffff]'): '_'
  1. callable accepting a character argument and returning a boolean
    # ASCII codes above 'z' become ':'
    (lambda c: ord(c) in range(ord('z')+1,128)): ':'

In the event that the order-of-substitution is significant, the method may instead return a list of key-value tuples.

UnicodeEncodeError handling

Any file whose path in the filesystem whose ingest results in a UnicodeEncodeError exception being raised (e.g. by the inclusion of an unencodable UTF8 sequence) will be automatically renamed using a base-64 sequence to represent the original, unmodified vault path.

Additionally, data objects that have had their names remapped, whether pro forma or via a UnicodeEncodeError, will be annotated with an AVU of the form

Attribute: "irods::automated_ingest::" + ANNOTATION_REASON Value: A PREFIX plus the base64-converted "bad filepath" Units: "python3.base64.b64encode(full_path_of_source_file)"

Where :

  • ANNOTATION_REASON is either "UnicodeEncodeError" or "character_map" depending on why the remapping occurred.
  • PREFIX is either "irods_UnicodeEncodeError_" or blank(""), again depending on the re-mapping cause.

Note that the UnicodeEncodeError type of remapping is unconditional, whereas the character remapping is contingent on an event handler's character_map method being defined. Also, if a UnicodeEncodeError-style ingest is performed on a given object, this precludes character mapping being done for the object.

as_user

This method should return a pair of strings specifying the zone and the name of the user on behalf of whom the sync will be peformed. In other words, the Automated Ingest framework will act as a proxy for this user and all data syncing will be done as the as_user user. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def as_user(meta, **options):
        # Replace "zone_name" with the user's zone name and "user_name" with the user's username.
        return "zone_name", "user_name"

The default behavior performs the sync tasks as the user authenticated in the client environment of the Celery workers.

target_path

This method should return a string specifying the physical path of the data that should be registered for registration operations. This allows specifying a mount path on the iRODS server which can be different from the mount path being used on the client (i.e. the Celery worker nodes) when registering data on the same networked filesystem. This is used directly as the physical path to register in iRODS, so the path needs to be accessible by the iRODS server where the registration is occurring and must be an absolute path to the file being registered. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def target_path(session, meta, **options):
        # Manipulate path such that the base directory uses the desired mountpoint name.
        path = meta["path"]
        return path

The default behavior is to use the path handed to it.

to_resource

This method returns a string specifying a target resource to which data will be synced. This must refer to a root resource because iRODS does not allow directly accessing child resources. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def to_resource(session, meta, **options):
        # Replace "resource_name" with the name of the resource to which data will be synced.
        return "resource_name"

The default behavior is to use the default resource specified in the client environment. If no default resource is specified, resource selection is deferred to the server.

Note: This event handler method is required when using REGISTER_AS_REPLICA_SYNC.

max_retries

This method returns an integer specifying the maximum number of times a task can fail and be retried before the Celery workers should give up on it. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def max_retries(hdlr_mod, logger, meta):
        # Replace 1 with the desired value, represented by a non-negative integer.
        return 1

The default retry value is 0 times, meaning that tasks will not be retried on failure by default.

delay

This method returns an integer specifying the amount of time in seconds to wait between retries of a particular Celery task. If max_retries is not defined in the event handler or returns 0, this method has no effect. The method takes a retries argument in order to let the implementer know how many retries have been attempted so far. This can be useful for implementing backoff logic based on the number of retries attempted already. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def delay(hdlr_mod, logger, meta, retries):
        # Replace 0 with the desired value, represented by a non-negative integer.
        return 0

The default delay value is 0 seconds, meaning that tasks will be retried instantly, if needed, by default.

timeout

This method returns an integer specifying the number of seconds until a particular task in the sync job times out. In other words, this acts as a time limit for a task to complete. After the timeout value has been reached, the Celery task fails as timed out. This is useful for ensuring that Celery workers do not get stuck forever on tasks which cannot complete for whatever reason. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def timeout(hdlr_mod, logger, meta):
        # Replace 3600 with the desired value, represented by a non-negative integer.
        return 3600

The default timeout value is 3600 seconds, which means that each task has 3600 seconds (1 hour) to complete before Celery considers it timed out.

operation

This method returns a member of the Operation enumeration specifying the type of operation the sync job should be performing on each entry in the storage being traversed. More information about the accepted values can be found in Event Handler Operations. Here is what the method should look like:

from irods_capability_automated_ingest.core import Core
from irods_capability_automated_ingest.utils import Operation
class event_handler(Core):
    @staticmethod
    def operation(session, meta, **options):
        # Replace Operation.REGISTER_SYNC with the desired Operation value.
        return Operation.REGISTER_SYNC

The default operation is Operation.REGISTER_SYNC.

Event Handler Operations

One of the most important overrides supported by the custom event handler file is the operation method. In this section, we detail the supported operation types and what they are used for. The Operation enum is defined in irods_capability_automated_ingest.utils, so be sure to import this module when using these.

Each operation has potentially different behaviors based on whether the target logical path being synced exists or not at the time of the sync.

Operation.REGISTER_SYNC

If the target logical path does not exist at the time of sync, this operation registers the physical path in the catalog as a new data object.

If the target logical path exists and has a replica on the target resource, this operation updates the system metadata in the catalog (e.g. size, modify time, etc.) for the replica in the target resource. Note: If the existing replica refers to a physical path which differs from the physical path being synced, this operation will NOT update the physical path in the catalog, leading to potentially incorrect system metadata.

If the target logical path exists but does not have a replica on the target resource, this operation emits an error. Use REGISTER_AS_REPLICA_SYNC to register new replicas for existing data objects.

REGISTER_SYNC is the default operation if no operation method is defined in the custom event handler.

Operation.REGISTER_AS_REPLICA_SYNC

This operation requires the to_resource event handler method to be implemented and return a valid resource name in order to have an effect.

If the target logical path does not exist at the time of sync, this operation registers the physical path in the catalog as the first replica of a new data object.

If the target logical path exists and has a replica on the target resource, this operation updates the system metadata in the catalog (e.g. size, modify time, etc.) for the replica in the target resource. Note: If the existing replica refers to a physical path which differs from the physical path being synced, this operation will NOT update the physical path in the catalog, leading to potentially incorrect system metadata.

If the target logical path exists but does not have a replica on the target resource, this operation registers the physical path in the catalog as a new replica of the existing data object.

Operation.PUT

This operation creates only new data objects by writing the data to the vault of the target resource and registering the data object in the catalog.

If the target logical path does not exist at the time of sync, this operation writes the data to the vault of the target resource and registers it in the catalog as a new data object.

If the target logical path exists and has a replica on the target resource, this operation emits an error. Use PUT_SYNC or PUT_APPEND to sync data to existing replicas.

If the target logical path exists but does not have a replica on the target resource, this operation emits an error. The "put"-type operations are not allowed to create new replicas of existing data objects.

Operation.PUT_SYNC

If the target logical path does not exist at the time of sync, this operation writes the data to the vault of the target resource and registers it in the catalog as a new data object.

If the target logical path exists and has a replica on the target resource, this operation overwrites the entire replica and updates the catalog accordingly.

If the target logical path exists but does not have a replica on the target resource, this operation emits an error. The "put"-type operations are not allowed to create new replicas of existing data objects.

Operation.PUT_APPEND

If the target logical path does not exist at the time of sync, this operation writes the data to the vault of the target resource and registers it in the catalog as a new data object.

If the target logical path exists and has a replica on the target resource, this operation copies only the part of the file which has been appended since the last time it was synced.

If the target logical path exists but does not have a replica on the target resource, this operation emits an error. The "put"-type operations are not allowed to create new replicas of existing data objects.

Operation.NO_OP

This operation does not sync data in any way. It exists to allow Event Handlers to execute without populating the catalog.

Event Handlers

In this section, we detail the supported hooks for user-defined code invoked around certain data syncing "events" which occur in the Automated Ingest framework. These are called Event Handlers. For those familiar with iRODS, this is analagous to Policy Enforcement Points (PEPs) implemented in a server's rulebase.

The methods which begin with pre_ are invoked before the actual event occurs, and the methods which begin with post_ are invoked after the event successfully completes. If the event fails, the post-hook will not be invoked. If an error occurs before the data syncing event occurs, the pre-hook will not be invoked. If an error occurs in the pre-hook, the operation will not be invoked (which means that the post-hook will also not be invoked).

None of the methods described are required to be defined in a custom event handler. The default behavior (that is, if no method is defined for a particular hook) is to do nothing before or after the event.

pre_data_obj_create and post_data_obj_create

A data_obj_create event occurs whenever a data object or replica is created as a result of a data syncing operation in a sync job. The following operations can trigger a data_object_create event: NO_OP, REGISTER_SYNC, REGISTER_AS_REPLICA_SYNC, PUT, PUT_SYNC, and PUT_APPEND.

A particular Celery task can trigger the data_obj_create event multiple times as multiple paths can be synced in one task.

Here is what the methods should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def pre_data_obj_create(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur before a data object or replica is created.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

    @staticmethod
    def post_data_obj_create(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur after a data object or replica is created.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

pre_data_obj_modify and post_data_obj_modify

A data_obj_modify event occurs whenever an existing data object is modified as a result of a data syncing operation in a sync job. The following operations can trigger a data_object_modify event: NO_OP, REGISTER_SYNC, REGISTER_AS_REPLICA_SYNC, PUT_SYNC, and PUT_APPEND.

A particular Celery task can trigger the data_obj_modify event multiple times as multiple paths can be synced in one task.

Here is what the methods should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def pre_data_obj_modify(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur before a data object is modified.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

    @staticmethod
    def post_data_obj_modify(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur after a data object is modified.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

pre_coll_create and post_coll_create

A coll_create event occurs whenever a collection is created as a result of syncing a path in a sync job. The following operations can trigger a coll_create event: NO_OP, REGISTER_SYNC, REGISTER_AS_REPLICA_SYNC, PUT, PUT_SYNC, and PUT_APPEND.

A coll_create event may not occur for every collection in a sync job if the top-level target collection already exists. Only new collections created as a result of a sync will trigger this event.

Here is what the methods should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def pre_coll_create(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur before a collection is created.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

    @staticmethod
    def post_coll_create(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur after a collection is created.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

pre_coll_modify and post_coll_modify

A coll_modify event occurs whenever an existing collection is synced in a sync job. The following operations can trigger a coll_modify event: NO_OP, REGISTER_SYNC, REGISTER_AS_REPLICA_SYNC, PUT, PUT_SYNC, and PUT_APPEND.

This event only occurs once per directory per sync job. This event only occurs for filesystem scans. This event does not occur for S3 bucket scans because S3 has no concept of "directories" that is acknowledged by the Automated Ingest framework.

Here is what the methods should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def pre_coll_modify(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur before a collection is modified.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

    @staticmethod
    def post_coll_modify(hdlr_mod, logger, session, meta, *args, **options):
        # Implement some actions which should occur after a collection is modified.
        destination_logical_path = meta["target"]
        source_physical_path = meta["path"]
        pass

pre_job and post_job

The job event represents the entire overall sync job. The event occurs for every sync job regardless of operation, and it occurs only once because there is only one sync job. This operation is invoked by the main Celery task for the sync job.

Here is what the methods should look like:

from irods_capability_automated_ingest.core import Core
class event_handler(Core):
    @staticmethod
    def pre_job(hdlr_mod, logger, meta):
        # Implement some actions which should occur before any tasks for a sync job begin.
        pass

    @staticmethod
    def post_job(hdlr_mod, logger, meta):
        # Implement some actions which should occur after all tasks for a sync job complete.
        pass

Configuring Celery

This section gives some pointers for how to configure Celery for effective use with the Automated Ingest framework.

Memory management

The Celery documentation offers some tips on how to manage memory on the Celery worker nodes in the Memory Usage section.

Worker task prefetch

Celery workers can receive a number of tasks from the Celery coordinator up to a certain "prefetch" limit. If the tasks in a given sync job take a long time, you want to use a lower prefetch limit. If the tasks in a given sync job are quick, you want to use a higher prefetch limit in order to limit the roundtrip messaging between the workers and the coordinator. This can also be controlled with the --files_per_task option in the irods_sync script.

The prefetch limit can be adjusted via the CELERYD_PREFETCH_MULTIPLIER environment variable. If setting the prefetch limit to 1, the Celery documentation also recommends setting the environment variable CELERY_ACKS_LATE=True.

See Prefetch Limits in the Celery documentation for more information.

Configuring Redis

This section gives some pointers for how to configure Redis for effective use with the Automated Ingest framework.

Memory management

The Redis documentation recommends the following:

Make sure to set the Linux kernel overcommit memory setting to 1. Add vm.overcommit_memory = 1 to /etc/sysctl.conf and then reboot or run the command sysctl vm.overcommit_memory=1 for this to take effect immediately.

This allows the Linux kernel to overcommit virtual memory even if this exceeds the physical memory on the host machine. See kernel.org documentation for more information.

Networking

If running in a distributed environment, make sure Redis server accepts connections by editing the bind line in /etc/redis/redis.conf or /etc/redis.conf.

Run tests

Note: The tests start and stop their own Celery workers, and they assume a clean Redis database.

python -m irods_capability_automated_ingest.test.test_irods_sync

See docker/ingest-test/README.md for how to run tests with Docker Compose.

irods_capability_automated_ingest's People

Contributors

acnewton avatar alanking avatar avkrishnamurthy avatar d-w-moore avatar dependabot[bot] avatar gabiiiiiii avatar jonathanmelius avatar rmoreas avatar trel avatar xu-hao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

irods_capability_automated_ingest's Issues

add option for synchronous mode

start command (without -i option) would be able to block and not return control to the user until all the work is complete.

  • --synchronous

add new command-line parameter for additional redis annotation

  • should be named... --append_json or --redis_annotation?

  • should take a well-formed json object (including brackets)

    • should complain if not well-formed json
  • appended information should pass through to any derivative jobs (e.g. scanning a directory should pass annotations to new file scanning redis jobs)

CATALOG_ALREADY_HAS_ITEM_BY_THAT_NAME error in the ingest log

The ingest log has many CATALOG_ALREADY_HAS_ITEM_BY_THAT_NAME error, even though reported file path in the error is ingested by ingest job. Looks like in somecase same file is trying to ingest by more than one celery worker and then throws this error.

First, it should not be processed by more than one celery workers. Even if it does, it should update it without throwing this error.

Issue with broken symbolic - not consistent between Ingest and iscan

iRODS ingest tool is able to ingest broken symbolic link and it works fine. But after initial ingest is run, running iscan -rd does not consider broken symbolic link object as valid object and returns as deleted

Physical file on server is missing, corresponding to iRODS object

add --notification switch to send an email notification

Add --notification switch to send an email notification either to send simple one like ingest started and ingest completed or with log file attached.

Probably default is disabled.

date; python -m irods_capability_automated_ingest.irods_sync start --event_handler XYZ_eventhandler --log_filename --log_level ERROR --job_name ingest --append_json '"uid:d05a0c4454b9e08ff6e4f156ec0"' --timeout 3600 --redis_host xx.xxx.xx.xx --redis_port xxxx --redis_db 0 --synchronous --all --notification

investigate on disabling prefetch for ingest performance improvement

Investigate whether disabling prefetch will improve ingest performance using -Ofair and below environment variables.

export CELERYD_PREFETCH_MULTIPLIER=1
export CELERY_ACKS_LATE=True

celery -A irods_capability_automated_ingest.sync_task worker -l error -Ofair -Q restart,path,file -c 32 &

Add delete detection with switch for logical delete or unregister in iRODS catalog

Currently, this ingest supports for detecting only new and updated files from source directory and syncs to target collection in iRODS.

Similarly, Add delete detection with switch for logical delete or unregister in iRODS catalog . Based on the switch provided:

  • if -- unregister false, then
    set filesystem::deleted to Y

  • if -- unregister true, then
    unregister it from iRODS catalog

This can be optimized to compare the source directory with Redis cache itself if needed and then sync the changes to iRODS catalog.

Remove list_dir option

Remove listdir and related codepaths in favor of scandir for filesystems. This is the standard in Python 3.7.

updating registered file fails within hierarchy

when registering into a random resource with children... the initial registration completes and the data objects are dispersed among the children correctly.

$ python irods_sync.py start allthefiles /tempZone/home/rods/3 -i 1 --event_handler examples.register_with_resc_name
$ ilsresc
demoResc:unixfilesystem
regiResc:random
├── u1:unixfilesystem
└── u2:unixfilesystem
$ ils -rL                                                   
/tempZone/home/rods:
  C- /tempZone/home/rods/3
/tempZone/home/rods/3:
  rods              0 regiResc;u1           11 2018-02-25.22:02 & a
        generic    /home/trel/repos/irods_rsync/allthefiles/a
  rods              0 regiResc;u1            5 2018-02-25.22:02 & bananas
        generic    /home/trel/repos/irods_rsync/allthefiles/bananas
  rods              0 regiResc;u1            5 2018-02-25.22:02 & foo
        generic    /home/trel/repos/irods_rsync/allthefiles/foo
  rods              0 regiResc;u2           14 2018-02-25.22:02 & newfile
        generic    /home/trel/repos/irods_rsync/allthefiles/newfile

then, if one of the source files is touched (touch allthefiles/newfile), the scanner picks up the difference, but then cannot match the data object in the catalog because it does not know the resc_name of the already registered data object.

after fixing the case itself:

diff --git a/sync_irods.py b/sync_irods.py
index 06fa200..8a58bd5 100644
--- a/sync_irods.py
+++ b/sync_irods.py
@@ -79,6 +79,7 @@ def update_metadata(hdlr_mod, session, target, path, **options):
     else:
         resc_name = None
 
+    found = False
     for row in session.query(Resource.name, DataObject.path).filter(DataObject.name == basename(target), Collection.name == dirname(target)):
         if (resc_name is None or row[Resource.name] == resc_name) and row[DataObject.path] == path:
             found = True

I end up hitting this in the log:

2018-02-25 22:06:56,598:sync_task:INFO:synchronizing file. path = /home/trel/repos/irods_rsync/allthefiles/newfile
2018-02-25 22:06:56,599:sync_task:INFO:synchronizing file. path = /home/trel/repos/irods_rsync/allthefiles/newfile, t0 = 1519614174.502373, t = 1519614416.59961, mtime = 1519614229.9204679.
2018-02-25 22:06:56,698:sync_irods:INFO:updating object: /tempZone/home/rods/3/newfile, options = {}
2018-02-25 22:06:56,702:sync_irods:ERROR:updating object: wrong resource or path, target = /tempZone/home/rods/3/newfile, path = /home/trel/repos/irods_rsync/allthefiles/newfile, options = {}
2018-02-25 22:06:56,704:sync_task:ERROR:Unexpected error: wrong resource or path

Running asynchronous ingest job is not showing up in the list job command output

Running asynchronous ingest job is not showing up in the list job command output.

python -m irods_capability_automated_ingest.irods_sync start --event_handler --log_filename /var/lib/irods/log/automated_ingest.log --log_level ERROR --job_name ingest_system --redis_host --redis_port 6379 --redis_db 0 --irods_idle_disconnect_seconds 300 --ignore_cache

After running the ingest job, running the below command is not showing up in the output:

python -m irods_capability_automated_ingest.irods_sync list --redis_host --redis_port 6379 --redis_db 0

But running stop job commnand appears that it stopped.
python -m irods_capability_automated_ingest.irods_sync stop ingest --redis_host --redis_port 6379 --redis_db 0

100% (638511 of 638511) |##########################################################################################################################################################################| Elapsed Time: 0:04:04 Time: 0:04:04

Then, running iquest count command shows the count of ingested files is increasing which indicates that it did not stop.

iquest "SELECT COUNT(DATA_NAME) WHERE COLL_NAME like '%<collectionname%'"

ingesting files names with special characters

  • Handle register sync
  • Handle put
  • Add test for register sync
  • Add test for put

Ingesting files names with special characters throws an error "UnicodeEncodeError('utf-8', 'lock:sync_file:"

Example files:

Test_?f?[?^?x?[?X?A?b?v?f?[?g??????.doc
Test_????.zip
Test_}.ppt
?e?[?u???A?_Test.txt

Registering these files using ireg works fine.

add ability to scan s3

In addition to scanning a filesystem, add ability for the scanner to be pointed at an S3 bucket to register all files already in S3 to an S3 target resource in iRODS.

verify / validate whether event handler exists and valid or not

Currently, I can give handler name that does not exist due to either typo or sometime it is removed when we get latest code from github repositry with git pull command and then we run the ingest command, it does not report any error indicating that it is successfully completed.

date; python -m irods_capability_automated_ingest.irods_sync start --event_handler XYZ_eventhandler --log_filename --log_level ERROR --job_name ingest --append_json '"uid:d05a0c4454b9e08ff6e4f156ec0"' --timeout 3600 --redis_host xx.xxx.xx.xx --redis_port xxxx --redis_db 0 --synchronous --all

In this example, if XYZ_eventhandler is not exist, it should report an error something like "event handler XYZ_eventhandler.py" does not exist."

improve logging

service logging (currently at /tmp/sync.log) should be handled by production-quality library or service

  • probably should include rotation
  • verbosity settings

Port various improvements to rq branch

rq branch is based on an old commit and several changes have occurred on the icai-celery branch that are needed for the ingest tool to function properly.

  • close connections for stability (6ae4184)
  • Add data size and modify KWs to register file (4b4aaa7)
  • Add session.cleanup() to try/finally block (ab7caa5)

subcommands

  • add --job_name
    uuid by default

  • if --interval is not provided, then do not restart

  • put_start

  • register_start

  • stop

  • unregister_start

  • unregister_stop

  • list
    jobname jobtype

Multiple errors in the ingest log

Found the following multiple errors frequently in the ingest log While running ingest for a large number of files.

  • WRONG RESOURCE OR PATH
  • CAT_NAME_EXISTS_AS_DATAOBJ
  • COULD NOT RECEIVE SERVER RESPONSE
  • No such file or directory
  • KeyError (516002) UNIX_FILE_STAT_ERR No such file or directory

add a switch --showprogress to enable/disable

Add a switch --showprogress to enable/disable so that if we are running ingest in irods, it can be used as --showprogress but when it is invoked from other tools such as pipeline tools or jenkins, then it is not required progressbar so can be disabled.

Probably default is disabled.

date; python -m irods_capability_automated_ingest.irods_sync start --event_handler XYZ_eventhandler --log_filename --log_level ERROR --job_name ingest --append_json '"uid:d05a0c4454b9e08ff6e4f156ec0"' --timeout 3600 --redis_host xx.xxx.xx.xx --redis_port xxxx --redis_db 0 --synchronous --all --showprogress

[Elapsed Time: 0:01:05] |####################################################################################################| (Time: 0:01:05) count: 5e+04 tasks: ------ failures: ------ retries: ------

Allocate N number of worker process to the given Ingest job

In order to support multiple ingest job at the same time, need to allocate ONLY N (e.g. 8 or 16) number of worker process out of available number of worker process (e.g. 40 or 32) to the given Ingest job.

Currently, when we run the long running ingest job of the large volume of files ( e.g. 1 million +), then it consumes all worker process for this one ingest job and any other sub-sequent smaller ingest job will be blocked and has to wait till first ingest completes.

handle all non-regular file-types consistently

  • link (hard or soft) (file or dir)
  • block
  • pipe
  • char
  • socket

current code handles symlinks and socket files as special cases. other file types error out.

rather than including more special cases, should have simpler logic and treat all non-regular files as zero-length registrations and mark them as special with AVU metadata in the catalog with the following format:

A: irods::automated_ingest::special_file
V: <type> OR <link>::<path to link target>

Add stats of ingest job at the end of log file when completes

Add stats of ingest job such as start time, end time, total number of files in source directory, total number of new files ingested, total number of files updated, total number of files deleted, averga number of worker process used etc at the end of log file when ingest job is completed.

sync tool hangs when given invalid --log_filename target

Hangs with invalid path name (/gpfs/mdadm/irods/log/) for log target:

(irodssync) [root@dtp13 irodssync]# python -m irods_capability_automated_ingest.irods_sync start "/gpfs/appfs1/test_deepak_src" "/sc1dtpZone/home/rods/gpfs/appfs1/test_deepak_src" --event_handler ingest_eventhandler --log_filename /gpfs/mdadm/irods/log/deepak_automated_ingest.log --log_level ERROR --job_name ingest --append_json '"uid:201806d05a0c4454b9e08ff6e4f156ec0"' --timeout 3600 --redis_host 10.164.67.18 --redis_port 6379 --redis_db 0 --synchronous --all

[Elapsed Time: 0:01:49] | | (ETA: --:--:--) count: 1 tasks: 1 failures: ------ retries: ------

add noop operation

Need a sixth operation to detect new/updated files, but make no calls to iRODS.

This will allow other tools to use the scanner infrastructure for orchestration.

Handle paths with special characters

Related to #40

Special characters in physical paths will need to be handled differently from the approach used for filenames. This could require a revisit to the approach used for filenames.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.