Giter Site home page Giter Site logo

d6tflow's People

Contributors

carlosml27 avatar citynorman avatar d6tdev avatar fire-hound avatar gebakaev avatar hfwittmann avatar matthieuloustaunau avatar mozin avatar paralax avatar stepheweffie avatar yigitaras avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

d6tflow's Issues

luigi related signal only works in main thread

Within a flask app that is a REST API with a post functionality, I am trying to post parameters to a d6tflow task to run the task with the posted params. Luigi throws a ValueError: signal only works in main thread when calling luigi/worker.py which in turn calls python 3.7 signal.py

Reuse all later tasks to keep the DRY principle?

I have a few tasks of the following nature, which are pretty standard, i.e., Import processed dataset, setup input data, split, then pass to a model.

# _tasks.py

import d6tflow

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
# loads a processed dataframe (probably pickled)

@d6tflow.requires(TaskLoadDataframe)
class TaskSetupExogEndogData(d6tflow.tasks.TaskCache):
# do stuff. Saves data and labels

@d6tflow.requires({'inputs' : TaskSetupExogEndogData, })
class TaskSplitData(d6tflow.tasks.TaskCache):
# do more stuff. Splits data and labels and saves to dictionary
# _tasks_sklearn.py

import _tasks

import d6tflow
from sklearn import svm

@d6tflow.requires(_tasks.TaskSplitData)
class TaskTrainSklearnSVM(d6tflow.tasks.TaskCache):
    kernel = d6tflow.Parameter(default = 'linear')
    
    def run(self):
        data = self.inputLoad()
        model_svm = svm.SVR(kernel = self.kernel)
        model_svm.fit(data['train_data'], data['train_labels'])
        model_svm.score(data['valid_data'], data['valid_labels'])
        # TODO: self.save model artifacts

Context: I would obviously want to reuse this as much as possible.

Question 1: Is it possible to create several independent tasks for processing dataset that I can set as the "initial task" of this workflow?
Question 2: If yes, wow would I call that as a dynamic requirement in TaskLoadDataframe?

image

Solution A:
It seems the best way to handle this within the scope of this package is to not create a TaskA and just do the following:

  1. Preprocess the dataframe
  2. Export to a pickle (or csv)
  3. Read the path to the exported file in as a parameter to the TaskLoadDataframe so I could run the WorkFlow, and continue on.

Solution B:
I know Luigi doesn't allow for passing dataframes as parameters, but could I call a dataframe in the run of a task as a means of reducing/completely removing the fileio in step 2?

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):

    def run(self, dataframe):
        self.save(dataframe)

I don't think the source code allows for this, what would the syntax to run that as a workfow?

Solution B (reprise): I also could alternatively save the processed dataframe in a dictionary and pass it into TaskLoadDataframe as a d6tflow-defined parameter.

Thoughts? Great work on this by the way.

task not complete run option for data load

Would like to see an option for the data load function to automatically run if task not marked complete. I find myself writing if else statements like the below to get the desired effect:

if TaskExample().complete: df = TaskExample().output()['df'].load() else: d6tflow.run(TaskExample()) df = TaskExample().output()['df'].load()

Fail to run the example

I am trying to run the d6tflow example (unmodified):

class TaskGetData(d6tflow.tasks.TaskPqPandas):  # save dataframe as parquet

    def run(self):
        iris = sklearn.datasets.load_iris()
        df_train = pd.DataFrame(iris.data, columns=['feature{}'.format(i) for i in range(4)])
        df_train['y'] = iris.target
        self.save(df_train)  # quickly save dataframe


class TaskPreprocess(d6tflow.tasks.TaskCachePandas):  # save data in memory
    do_preprocess = luigi.BoolParameter(default=True)  # parameter for preprocessing yes/no

    def requires(self):
        return TaskGetData()  # define dependency

    def run(self):
        df_train = self.input().load()  # quickly load required data
        if self.do_preprocess:
            df_train.iloc[:, :-1] = sklearn.preprocessing.scale(df_train.iloc[:, :-1])
        self.save(df_train)


class TaskTrain(d6tflow.tasks.TaskPickle):  # save output as pickle
    do_preprocess = luigi.BoolParameter(default=True)

    def requires(self):
        return TaskPreprocess(do_preprocess=self.do_preprocess)

    def run(self):
        df_train = self.input().load()
        model = sklearn.svm.SVC()
        model.fit(df_train.iloc[:, :-1], df_train['y'])
        self.save(model)

if __name__ == "__main__":
    d6tflow.run(TaskTrain()) 

but this fails at the last line: self.save(model):

Welcome to d6tflow!
d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\parameter.py:284: UserWarning: Parameter "task_process_context" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
INFO: Informed scheduler that task   TaskTrain_True_e00389f8b2   has status   PENDING
INFO: Informed scheduler that task   TaskPreprocess_True_e00389f8b2   has status   PENDING
INFO: Informed scheduler that task   TaskGetData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) running   TaskPreprocess(do_preprocess=True)
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) done      TaskPreprocess(do_preprocess=True)
INFO: Informed scheduler that task   TaskPreprocess_True_e00389f8b2   has status   DONE
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) running   TaskTrain(do_preprocess=True)
ERROR: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) failed    TaskTrain(do_preprocess=True)
Traceback (most recent call last):
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\worker.py", line 199, in run
    new_deps = self._run_get_new_deps()
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\worker.py", line 139, in _run_get_new_deps
    task_gen = self.task.run()
  File "d:\dev\py\luigi\d6t_example_2.py", line 44, in run
    self.save(model)
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\tasks\__init__.py", line 135, in save
    self.output().save(data)
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\targets\__init__.py", line 288, in save
    pickle.dump(obj, open(self.path, "wb"), **kwargs)
TypeError: invalid file: WindowsPath('data/TaskTrain/TaskTrain_True_e00389f8b2-data.pkl')
INFO: Informed scheduler that task   TaskTrain_True_e00389f8b2   has status   FAILED
INFO: Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 complete ones were encountered:
    - 1 TaskGetData()
* 1 ran successfully:
    - 1 TaskPreprocess(do_preprocess=True)
* 1 failed:
    - 1 TaskTrain(do_preprocess=True)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

Traceback (most recent call last):
  File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\ptvsd_launcher.py", line 43, in <module>
    main(ptvsdArgs)
  File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\lib\python\ptvsd\__main__.py", line 434, in main
    run()
  File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\lib\python\ptvsd\__main__.py", line 312, in run_file
    runpy.run_path(target, run_name='__main__')
  File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "d:\dev\py\luigi\d6t_example_2.py", line 48, in <module>
    d6tflow.run(TaskTrain())  # , local_scheduler=False)  # single task
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\__init__.py", line 101, in run
    raise RuntimeError('Exception found running flow, check trace')
RuntimeError: Exception found running flow, check trace

For some reason it cannot create the file. The 'data' directory is created, it contains 2 subdirectories: TaskGetData (containing a pq file as expected) and a TaskTrain directory (which is empty).

I am running on windows 10, and I am using python 3.5.

Any idea what might be wrong?

Setting execution_summary to False causes an error in d6tflow.run()

I am using d6tflow version 0.2.2 and Luigi version 3.0.2. I tried setting d6tflow.settings.execution_summary to False and ran into an error when trying to run a task using d6tflow.run()

The error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "[My Package Path]\venv\lib\site-packages\d6tflow\__init__.py", line 116, in run
    if abort and not result.scheduling_succeeded:
AttributeError: 'bool' object has no attribute 'scheduling_succeeded'

I investigated the source code at \d6tflow\__init__.py and found that if execution_summary is False, not having detailed_summary = True for luigi.build causes result to only be a boolean object, and thus does not have the attribute scheduling_succeeded (luigi.build source code is here). It could be fixed by simply removing the execution_summary condition in the first if statement, but perhaps there's a more appropriate solution

Source code of interest:

if execution_summary and luigi.__version__>='3.0.0':
        opts['detailed_summary']=True
    result = luigi.build(tasks, **opts)
    if abort and not result.scheduling_succeeded:
        raise RuntimeError('Exception found running flow, check trace. For more details see https://d6tflow.readthedocs.io/en/latest/run.html#debugging-failures')

Luigi >= 2.8.3: AttributeError: 'str' object has no attribute 'exists'

I get an error if I have luigi>=2.8.4 installed.

When I reinstall luigi==2.8.3 I do not get this error when trying to run my tasks.

Traceback (most recent call last):
File "/home/chris/Code/cloud_storage/combined_tasks.py", line 21, in
d6tflow.show(TaskCombineData())
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/init.py", line 165, in show
preview(task)
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/init.py", line 53, in preview
print(luigi.tools.deps_tree.print_tree(t))
File "/home/chris/anaconda3/lib/python3.7/site-packages/luigi/tools/deps_tree.py", line 48, in print_tree
is_task_complete = task.complete()
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tcollect/init.py", line 107, in wrapper
raise e
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tcollect/init.py", line 101, in wrapper
return func(self, *args, **kwargs)
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/tasks/init.py", line 48, in complete
complete = super().complete()
File "/home/chris/anaconda3/lib/python3.7/site-packages/luigi/task.py", line 572, in complete
return all(map(lambda output: output.exists(), outputs))
File "/home/chris/anaconda3/lib/python3.7/site-packages/luigi/task.py", line 572, in
return all(map(lambda output: output.exists(), outputs))
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/targets/init.py", line 58, in exists
return self.path.exists()
AttributeError: 'str' object has no attribute 'exists'

Too many open files

Hello,

I've been using d6tflow to run multiple ML models with different parameters. In some executions I trained over 20K models.

I'm using 10 workers in a linux environment and after some time I get an error of two many open files. I do not open any file in the custom code inside my tasks. I'm using TaskPqPandas and TaskPickle targets to this for me....

Any help on this issue would be appreciated.

Thanks!

Potential Typo in the Docs - Define Upstream Dependency Tasks

https://d6tflow.readthedocs.io/en/latest/tasks.html

This following code defines a single output task and calls it as a dependency to other tasks. Yet TaskSingleOutput1 & TaskSingleOutput2 are not defined anywhere on this page.

# quick save one output
class TaskSingleOutput(d6tflow.tasks.TaskPqPandas):

    def run(self):
        self.save(data_output)

# no dependency
class TaskSingleInput(d6tflow.tasks.TaskPqPandas):
    #[...]

# single dependency
@d6tflow.requires(TaskSingleOutput)
class TaskSingleInput(d6tflow.tasks.TaskPqPandas):
    #[...]

# multiple dependencies
@d6tflow.requires({'input1':TaskSingleOutput1, 'input2':TaskSingleOutput2})
class TaskMultipleInput(d6tflow.tasks.TaskPqPandas):
    #[...]

Also, it should be made clear in something like this example that the child keys are labeled in the persist, and the parent keys are defined in the dependency call.

# multiple dependencies, single & multiple outputs
@d6tflow.requires({'input1':TaskSingleOutput, 'input2':TaskMultipleOutput})
class TaskMultipleInput(d6tflow.tasks.TaskPqPandas):
    def run(self):
        data = self.inputLoad(as_dict=True)
        data1a = data['input1'] # We reference the key defined in the dependency call
        data2a = data['input2']['output1']  # 'output1' is a persist label defined in TaskMultipleOutput
        data2b = data['input2']['output2']

Add `verbose` flag to disable all the output from `d6tflow.run(Task())`

If I run

d6tflow.run(Task())

I get lots of output (the 'Luigi execution summary').

Granted, this is useful information, but sometimes I'd like to be able to just run a task without getting so much output.

Could we add a flag (e.g. verbose) which, if set to False, just runs the task without the Luigi execution summary getting displayed?

This is something I could work on, if you'd consider it worthwhile.

d6tflow without ML is recommended ?

Hi all,

I am a beginner, so I have a very naive question: Is d6tflow recommended, even I am not using any type of Machine-Learning tools? I am asking that because all examples I came across involve some kind of ML.

I just wanted a framework, which has guidelines to get my codes organized.

Thanks,
Uendert

Configure logging

Hi, I would like your help figuring out how to configure the logging, so an example like this outputs the log message in the console. The only mention of logging configuration in the docs is to change the logging level.

import d6tflow
import logging

log = logging.getLogger()


class TaskTest(d6tflow.tasks.TaskCache):

    def run(self):
        log.info("Hello!")
        self.save({"output": 0})


d6tflow.run(TaskTest())

Reconfigure save() option to point to external store

Is there a method by which you could configure d6tflow to save a result of the output outside of a local filesystem. For example, if you do not have the memory to save large datasets from spark into your local development environment, is there a way to configure d6tflow to reference that environment instead of its local path?

Child tasks

Guys, could you please tell me how can I create child tasks? Suppose, Task1 has a list as an output (A)
Then, Task2 must be executed for each value of A. It will have a list as an output (B).
Then I want to run Task3 for each result of list B. How can I do it?
I've create the aggregator:

class TaskAggregatorJavaFiles(d6tflow.tasks.TaskAggregator):
    dir_to_search = d6tflow.Parameter()
    dir_to_save = d6tflow.Parameter()

    def run(self):
        test_files = set(Path(self.dir_to_search).glob('**/*Test*.java'))
        not_test_files = set(Path(self.dir_to_search).glob('**/*.java'))
        files_without_tests = list(not_test_files.difference(test_files))
        for _, file in enumerate(files_without_tests):
            print(_)
            yield TaskPreprocessJavaFile(file=str(file))

I see, that it iterates over files. But then the child Task is not spawned:

class TaskPreprocessJavaFile(d6tflow.tasks.TaskPickle):
    file = d6tflow.Parameter()

    def run(self):
        # it is not executed in parallel
        self.save({'bla': 'bla', "text": self.file})

The problem that they are not executed in parallel. How can I run asynchronous child (in parallel)

How to catch specific errors when running a task?

I would like to catch an AssertionError that is raised within my task, but it seems I can only catch generic RuntimeErrors, and I can't figure out how to see what other errors may have been raised before it (programmatically)

For example, this will not work:

import d6tflow

class TaskExample(d6tflow.tasks.TaskCache):
    def run(self):
        result = 5 + 5
        assert result > 100, "result must be greater than 100"
        self.save(result)

try:
    d6tflow.run(TaskExample())
except AssertionError:
    # do some special logic here for this particular error
    print("Exception caught")

This will work but it's not desirable to catch all generic RuntimeErrors

import d6tflow

class TaskExample(d6tflow.tasks.TaskCache):
    def run(self):
        result = 5 + 5
        assert result > 100, "result must be greater than 100"
        self.save(result)

try:
    d6tflow.run(TaskExample())
except RuntimeError:
    print("Exception caught")

I tried using the traceback library to get the stack trace but I could only get the stack trace of the RuntimeError, which is not very useful. Is there any way to catch a specific error or at least get the last error that was raised before RuntimeError?

Edit: Also, when you except a RuntimeError, the stack trace still prints and I'm wondering if there's a way to suppress that

Support for Tasks that outputs different types and other extensions...

I started using d6tflow for a data science project but, after several weeks, I got somewhat frustrated with this limitation: I needed Tasks able to generate outputs of different types (like, for example, csvs and pickles).

Finally, we decided to write our own "version" of d6tflow by extending the luigi library.

I share how we implemented this, just in case anyone is interested in making this public (I am a newby in github).

Comments and names are sometimes in spanish. But I think it's easy to understand the logic.

Appart from this improvement (having a dict-like "persist" allowing to state the type of each target), there are more differences that for us made life easier:

  • Integration with cookiesutter template for a data science project.
  • Generation of one only folder for each Task, and leaving TaskId for the name of the files.
  • Auto-generation of a txt with the parameters, to see for each file how it was generated.
  • A common_params function, that returns the common parameters between a dict of parameters and a Task class.
  • Support for docx targets (we use them to generate documentation of the process).
  • MidasTask requieres a get_path method, to state where the files will be located.

I know this is "too much" for an issue, but just in case someone finds something interesting and worth integrating in d6tflow, I share it with the community.

class MidasLocalTarget(luigi.LocalTarget):
   
   
    '''
    path: ruta completa de la carpeta final donde se almacena el objeto. Es un objeto tipo Pathlib
    task_id: identificador de la tarea propietaria del target
    name: nombre del objeto (clave del diccionario persist)
    extension: extensión del archivo
    '''
    def __init__(self, path=None, name=None, task_id=None, extension = None):
        self.path = path
        self.task_id = task_id
        self.name = name
        self.extension = extension
        self.path_completo = self.path / (self.task_id + '_' + self.name + '.{}'.format(self.extension))
        super().__init__(self.path_completo)
        # Restauramos el path por si ha sido modificado:
        self.path = path
       
    def exists(self):
        return self.path_completo.exists()

    def invalidate(self):
        if self.exists():
            self.path_completo.unlink()
        return not self.exists()
   
    def write_params_file(self, path, parameters):
        path_file = path / (self.task_id + '_' + 'parameters.txt')
        file = open(path_file,"w")
        file.write(parameters)
        file.close()
   
class MidasPickleTarget(MidasLocalTarget):  
   
       
    def load(self, **kwargs):
       
        if self.exists():
            with open(self.path_completo,"rb" ) as fhandle:
                data = pickle.load(fhandle)
            return data
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
       
    def save(self, obj, parameters, **kwargs):
       
        self.path.mkdir(parents=True, exist_ok=True)
        self.write_params_file(self.path, parameters)
        with open(self.path_completo, "wb") as fhandle:
            pickle.dump(obj, fhandle, **kwargs)
        return self.path_completo

class MidasPandasTarget(MidasLocalTarget):
    def generate_profiling(self, reports_path, parameters):
        (reports_path).mkdir(parents=True, exist_ok=True)
        self.write_params_file(reports_path, parameters)
        pd_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'pandas_profiling.html')
        df = self.load()
        profile = ProfileReport(df, title="Pandas Profiling Report", minimal=True)
        profile.to_file(pd_profile_name)
        if 'target' in df.columns:
            sv_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'sweetviz.html')
            my_report = sv.analyze(df, target_feat='target', pairwise_analysis = 'off')
            my_report.show_html(sv_profile_name, open_browser=False)

class MidasCSVTarget(MidasPandasTarget):
   
       
    def load(self, **kwargs):
        if self.exists():
            opts = {**{'sep':';','decimal':','},**kwargs}
            df = pd.read_csv(self.path_completo, **opts)
            return df
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
   
   
    def save(self, df, parameters, save_index=True,**kwargs):
       
        (self.path).mkdir(parents=True, exist_ok=True)
        self.write_params_file(self.path, parameters)
        opts = {**{'sep':';','decimal':',', 'compression':'gzip', 'index': save_index},**kwargs}
        df.to_csv(self.path_completo,**opts)
        return self.path_completo


class MidasPqTarget(MidasPandasTarget):
   
       
    def load(self, **kwargs):
        if self.exists():
            df = pd.read_parquet(self.path_completo)
            return df
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
       
   
    def save(self, df, parameters, save_index=True,**kwargs):
       
        (self.path).mkdir(parents=True, exist_ok=True)
        self.write_params_file(self.path, parameters)
        opts = {**{'compression':'gzip', 'index': save_index, 'engine': 'pyarrow'},**kwargs}
        df.to_parquet(self.path_completo,**opts)
        return self.path_completo

class MidasDocxTarget(MidasLocalTarget):
   
       
    def load(self, **kwargs):
        if self.exists():
            docx = Document(self.path_completo)
            return docx
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')
   
   
    def save(self, docx, parameters, **kwargs):
         (self.path).mkdir(parents=True, exist_ok=True)
         self.write_params_file(self.path, parameters)
         docx.save(self.path_completo)
         
         
         
class MidasCacheTarget(luigi.LocalTarget):
   
    '''
    task_id: identificador de la tarea propietaria del target
    name: nombre del objeto (clave del diccionario persist)
    '''
    def __init__(self, name=None, task_id=None):
        super().__init__(Path(os.path.abspath(os.getcwd())).parent)
        self.task_id = task_id
        self.name = name
        self.clave = task_id + name
   
    def exists(self):
        return self.clave in cached_targets

    def invalidate(self):
        if self.clave in cached_targets:
            cached_targets.pop(self.clave)

    def load(self):
       
        if self.exists():
            return cached_targets.get(self.clave)
        else:
            raise RuntimeError('Target does not exist, make sure task is complete')

    def save(self, o):
        """
        Save object to in-memory cache

        Args:
            df (obj): pandas dataframe

        Returns: filename

        """
        cached_targets[self.clave] = o
        return self.clave
       
class MidasTask(luigi.Task):
    """
    Clase propia que añade funcionalidad sobre la clase genérica de Luigi. Características:
       
        - Tiene un método save, al que se debe invocar al final del run(). Este método save realiza el guardado de todos los objetos
        definidos en el atributo persist. No es necesario implementar este método en las clases que extiendan a MidasTask.
       
        - Tiene un método write_parameters, que devuelve un string con todos los parámetros (significativos) de la clase.
        Se usa de manera instrmental, y para guardar un txt asociado a cada objeto que se guarde en disco, y de esta manera
        tener a mano los parámetros de creación de cada objeto). No es necesario implementar este método en las clases
        que extiendan a MidasTask.
       
        - Hay que implementar un método get_path, que, en función del tipo de ubicación, devuelve el Path donde se almacenará el objeto.
        Nota: a ese Path se añadirá otro nivel más con el nombre de la tarea, pero eso se hace de manera automática.
       
        - Tiene un método para generar reports con PandasProfiling y SweetViz en todos los targets tipo csv.
        Es preciso que los targets existan para que funcione.
               
       
    El diccionario persist debe ser creado en cada clase particular que extienda a MidasTask. Debe contener un item por
    cada objeto que se quiere guardar en disco, y los valores son una lista de dos elementos, el formato y el tipo de
    ubicación. Ejemplos:
    persist = {
        'dataset_salida': ['csv','data_processed']
        }
    persist = {
        'modelo': ['pkl','modelos']
        }
    Admite tres formatos:
        csv (que se guarda comprimido para ahorrar espacio)
        docx
        pkl
        pq (para parquet)
        cache (que no guarda en disco)
    Respecto las ubicaciones, admite los siguientes valores:
        'data_interim'
        'data_processed'
        'docs'
        'modelos'
        'reports'
        'resultados'
       
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.task_id_hash = self.task_id.split('_')[-1]

       
    def save(self, data, save_index=True,**kwargs):

        targets = self.output()
        if not set(data.keys())==set(targets.keys()):
            print(data.keys())
            print(targets.keys())
            raise ValueError('El diccionario guardado ha de ser consistente con el objeto persist')
        for k, v in data.items():
            if isinstance(targets[k],MidasCSVTarget) or isinstance(targets[k],MidasPqTarget):    
                targets[k].save(v, self.str_parameters(), save_index=save_index, **kwargs)
            elif isinstance(targets[k],MidasCacheTarget):    
                targets[k].save(v, **kwargs)
            else:
                targets[k].save(v, self.str_parameters(), **kwargs)

    def output(self):

        output = {}
        for k,v in self.persist.items():
            if v[0] == 'csv':
                extension = 'csv.gz'
            elif v[0] == 'pq':
                extension = 'pq.gz'
            else:
                extension = v[0]
           
            save_path = self.get_path(v[1]) / self.get_task_family()      
           
            if v[0] == 'csv':
                output[k] = MidasCSVTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'pq':
                output[k] = MidasPqTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'pkl':
                output[k] = MidasPickleTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'docx':
                output[k] = MidasDocxTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
            elif v[0] == 'cache':
                output[k] = MidasCacheTarget(name = k, task_id=self.task_id_hash)
            else:
                raise ValueError('Formato de objeto no implmentado: ' + v[0])
               
        return output
   
    def generate_profiling(self):
        reports_path = self.get_path('reports')
        for k,v in self.output().items():
            if isinstance(v,MidasPandasTarget):
                v.generate_profiling(reports_path,self.str_parameters())
   
    def get_path(self,tipo_ubicacion):
        """
        Este método se debe implementar en cada clase final que extienda de MidasTask. Debe definir dónde se guarda cada tipo de objeto.
        No se debe añadir el nombre de la tarea (eso se hace automáticamente después)
        Debe devolver un objeto tipo Path.
        Ejemplo de implementación:
            project_root = Path(os.path.abspath(os.getcwd())).parent
            if tipo_ubicacion == 'data_processed':
                return project_root / 'data' / 'processed'
            ...
        """
        raise Exception("get_path() not implemented")
   
    def str_parameters(self):
        params_text = ""
        for k in self.param_kwargs.keys():
            params_text = params_text + '{key} tiene el valor {value}\n'.format(key=k, value=self.param_kwargs[k])
        return params_text
   
   
   
def common_params(dict_params, task_cls):
    """
    Grab all the values in dict_params that are found in task_cls.
    Función similar a luigi.utils.common_params, pero no entre instancia y clase sino entre diccionario y clase
    """
   
    dict_param_names = dict_params.keys()
    task_cls_params_dict = dict(task_cls.get_params())
    task_cls_param_names = task_cls_params_dict.keys()
    common_param_names = set(dict_param_names).intersection(set(task_cls_param_names))
    common_param_vals = {key : dict_params[key] for key in common_param_names}
    return common_param_vals

Task type to output txt files.

I am currently working with AWS SageMaker, and it requires a text file to be uploaded as the data.

Would we be able to implement a Task that will output a simple text file?

object has no attribute 'loadInputs'

Hi! Just started trying out d6tflow.

I seem to have run into a problem:
When running:

`class TaskCombineData(d6tflow.tasks.TaskPqPandas):

def requires(self):
    return TaskGetData1(), TaskGetData2()

def run(self):
    df_data1, df_data2 = self.loadInputs()
    data = pd.concat([df_likes, df_dislikes])
    self.save(data)`

I get the following error:
'TaskCombineData' object has no attribute 'loadInputs'

Have I misunderstood the usage mentioned here?

d6tcollect==1.0.5
d6tflow==0.1.5

Unable to reuse Task in different file

Hello!

Suppose that I created a TaskData instance, executed it and saved outputs. Then I'd like to reuse its results in some other file (say, Jupyter Notebook to analyze results) by creating an instance of that task there and calling output and then load. But I would be unable to do so as data directory won't be found there - I'll have to re-execute a task.

My suggestion: use absolute path in output method: instead of settings.dirpath (simply data) use something like
pathlib.Path(__file__).parent / "data" - that would enable an option to access results from everywhere.

What do you think?

TaskCSVPandas loses index in the csv file output file

An example:

import d6tflow
import pandas as pd
import numpy as np

class test(d6tflow.tasks.TaskCSVPandas):

    def run(self):
        data = np.random.randn(2, 2)
        data = pd.DataFrame(data, index=['a', 'b'], columns=['c', 'd'])
        self.save(data)

d6tflow.run(test())

The actual data in the output csv file is:
c,d
1.5490553923182304,-0.3279984496021263
0.7946535471877705,0.5790784973358706

However, the expected data in the output file should be:
,c,d
a,0.20200720089562157,0.5134288778567592
b,2.918867471040273,-0.5393324706416279

The index is lost when using TaskCSVPandas to save data.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.