d6t / d6tflow Goto Github PK
View Code? Open in Web Editor NEWPython library for building highly effective data science workflows
Home Page: https://d6tflow.readthedocs.io/en/latest/
License: MIT License
Python library for building highly effective data science workflows
Home Page: https://d6tflow.readthedocs.io/en/latest/
License: MIT License
Within a flask app that is a REST API with a post functionality, I am trying to post parameters to a d6tflow task to run the task with the posted params. Luigi throws a ValueError: signal only works in main thread when calling luigi/worker.py which in turn calls python 3.7 signal.py
I have a few tasks of the following nature, which are pretty standard, i.e., Import processed dataset, setup input data, split, then pass to a model.
# _tasks.py
import d6tflow
class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
# loads a processed dataframe (probably pickled)
@d6tflow.requires(TaskLoadDataframe)
class TaskSetupExogEndogData(d6tflow.tasks.TaskCache):
# do stuff. Saves data and labels
@d6tflow.requires({'inputs' : TaskSetupExogEndogData, })
class TaskSplitData(d6tflow.tasks.TaskCache):
# do more stuff. Splits data and labels and saves to dictionary
# _tasks_sklearn.py
import _tasks
import d6tflow
from sklearn import svm
@d6tflow.requires(_tasks.TaskSplitData)
class TaskTrainSklearnSVM(d6tflow.tasks.TaskCache):
kernel = d6tflow.Parameter(default = 'linear')
def run(self):
data = self.inputLoad()
model_svm = svm.SVR(kernel = self.kernel)
model_svm.fit(data['train_data'], data['train_labels'])
model_svm.score(data['valid_data'], data['valid_labels'])
# TODO: self.save model artifacts
Context: I would obviously want to reuse this as much as possible.
Question 1: Is it possible to create several independent tasks for processing dataset that I can set as the "initial task" of this workflow?
Question 2: If yes, wow would I call that as a dynamic requirement in TaskLoadDataframe?
Solution A:
It seems the best way to handle this within the scope of this package is to not create a TaskA and just do the following:
Solution B:
I know Luigi doesn't allow for passing dataframes as parameters, but could I call a dataframe in the run of a task as a means of reducing/completely removing the fileio in step 2?
class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
def run(self, dataframe):
self.save(dataframe)
I don't think the source code allows for this, what would the syntax to run that as a workfow?
Solution B (reprise): I also could alternatively save the processed dataframe in a dictionary and pass it into TaskLoadDataframe as a d6tflow-defined parameter.
Thoughts? Great work on this by the way.
Would like to see an option for the data load function to automatically run if task not marked complete. I find myself writing if else statements like the below to get the desired effect:
if TaskExample().complete: df = TaskExample().output()['df'].load() else: d6tflow.run(TaskExample()) df = TaskExample().output()['df'].load()
I am trying to run the d6tflow example (unmodified):
class TaskGetData(d6tflow.tasks.TaskPqPandas): # save dataframe as parquet
def run(self):
iris = sklearn.datasets.load_iris()
df_train = pd.DataFrame(iris.data, columns=['feature{}'.format(i) for i in range(4)])
df_train['y'] = iris.target
self.save(df_train) # quickly save dataframe
class TaskPreprocess(d6tflow.tasks.TaskCachePandas): # save data in memory
do_preprocess = luigi.BoolParameter(default=True) # parameter for preprocessing yes/no
def requires(self):
return TaskGetData() # define dependency
def run(self):
df_train = self.input().load() # quickly load required data
if self.do_preprocess:
df_train.iloc[:, :-1] = sklearn.preprocessing.scale(df_train.iloc[:, :-1])
self.save(df_train)
class TaskTrain(d6tflow.tasks.TaskPickle): # save output as pickle
do_preprocess = luigi.BoolParameter(default=True)
def requires(self):
return TaskPreprocess(do_preprocess=self.do_preprocess)
def run(self):
df_train = self.input().load()
model = sklearn.svm.SVC()
model.fit(df_train.iloc[:, :-1], df_train['y'])
self.save(model)
if __name__ == "__main__":
d6tflow.run(TaskTrain())
but this fails at the last line: self.save(model):
Welcome to d6tflow!
d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\parameter.py:284: UserWarning: Parameter "task_process_context" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
INFO: Informed scheduler that task TaskTrain_True_e00389f8b2 has status PENDING
INFO: Informed scheduler that task TaskPreprocess_True_e00389f8b2 has status PENDING
INFO: Informed scheduler that task TaskGetData__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) running TaskPreprocess(do_preprocess=True)
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) done TaskPreprocess(do_preprocess=True)
INFO: Informed scheduler that task TaskPreprocess_True_e00389f8b2 has status DONE
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) running TaskTrain(do_preprocess=True)
ERROR: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) failed TaskTrain(do_preprocess=True)
Traceback (most recent call last):
File "d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\worker.py", line 199, in run
new_deps = self._run_get_new_deps()
File "d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\worker.py", line 139, in _run_get_new_deps
task_gen = self.task.run()
File "d:\dev\py\luigi\d6t_example_2.py", line 44, in run
self.save(model)
File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\tasks\__init__.py", line 135, in save
self.output().save(data)
File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\targets\__init__.py", line 288, in save
pickle.dump(obj, open(self.path, "wb"), **kwargs)
TypeError: invalid file: WindowsPath('data/TaskTrain/TaskTrain_True_e00389f8b2-data.pkl')
INFO: Informed scheduler that task TaskTrain_True_e00389f8b2 has status FAILED
INFO: Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 1 complete ones were encountered:
- 1 TaskGetData()
* 1 ran successfully:
- 1 TaskPreprocess(do_preprocess=True)
* 1 failed:
- 1 TaskTrain(do_preprocess=True)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
Traceback (most recent call last):
File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\ptvsd_launcher.py", line 43, in <module>
main(ptvsdArgs)
File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\lib\python\ptvsd\__main__.py", line 434, in main
run()
File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\lib\python\ptvsd\__main__.py", line 312, in run_file
runpy.run_path(target, run_name='__main__')
File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "d:\dev\py\luigi\d6t_example_2.py", line 48, in <module>
d6tflow.run(TaskTrain()) # , local_scheduler=False) # single task
File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\__init__.py", line 101, in run
raise RuntimeError('Exception found running flow, check trace')
RuntimeError: Exception found running flow, check trace
For some reason it cannot create the file. The 'data' directory is created, it contains 2 subdirectories: TaskGetData (containing a pq file as expected) and a TaskTrain directory (which is empty).
I am running on windows 10, and I am using python 3.5.
Any idea what might be wrong?
pyarrow can't save duplicate columns
SSL Error on python 3.6 on MAC OSX while calling api.register
method
I am using d6tflow version 0.2.2 and Luigi version 3.0.2. I tried setting d6tflow.settings.execution_summary
to False
and ran into an error when trying to run a task using d6tflow.run()
The error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "[My Package Path]\venv\lib\site-packages\d6tflow\__init__.py", line 116, in run
if abort and not result.scheduling_succeeded:
AttributeError: 'bool' object has no attribute 'scheduling_succeeded'
I investigated the source code at \d6tflow\__init__.py
and found that if execution_summary
is False, not having detailed_summary = True
for luigi.build
causes result
to only be a boolean object, and thus does not have the attribute scheduling_succeeded
(luigi.build
source code is here). It could be fixed by simply removing the execution_summary
condition in the first if
statement, but perhaps there's a more appropriate solution
Source code of interest:
if execution_summary and luigi.__version__>='3.0.0':
opts['detailed_summary']=True
result = luigi.build(tasks, **opts)
if abort and not result.scheduling_succeeded:
raise RuntimeError('Exception found running flow, check trace. For more details see https://d6tflow.readthedocs.io/en/latest/run.html#debugging-failures')
I get an error if I have luigi>=2.8.4 installed.
When I reinstall luigi==2.8.3 I do not get this error when trying to run my tasks.
Traceback (most recent call last):
File "/home/chris/Code/cloud_storage/combined_tasks.py", line 21, in
d6tflow.show(TaskCombineData())
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/init.py", line 165, in show
preview(task)
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/init.py", line 53, in preview
print(luigi.tools.deps_tree.print_tree(t))
File "/home/chris/anaconda3/lib/python3.7/site-packages/luigi/tools/deps_tree.py", line 48, in print_tree
is_task_complete = task.complete()
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tcollect/init.py", line 107, in wrapper
raise e
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tcollect/init.py", line 101, in wrapper
return func(self, *args, **kwargs)
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/tasks/init.py", line 48, in complete
complete = super().complete()
File "/home/chris/anaconda3/lib/python3.7/site-packages/luigi/task.py", line 572, in complete
return all(map(lambda output: output.exists(), outputs))
File "/home/chris/anaconda3/lib/python3.7/site-packages/luigi/task.py", line 572, in
return all(map(lambda output: output.exists(), outputs))
File "/home/chris/anaconda3/lib/python3.7/site-packages/d6tflow/targets/init.py", line 58, in exists
return self.path.exists()
AttributeError: 'str' object has no attribute 'exists'
Hello,
I've been using d6tflow to run multiple ML models with different parameters. In some executions I trained over 20K models.
I'm using 10 workers in a linux environment and after some time I get an error of two many open files. I do not open any file in the custom code inside my tasks. I'm using TaskPqPandas and TaskPickle targets to this for me....
Any help on this issue would be appreciated.
Thanks!
https://d6tflow.readthedocs.io/en/latest/tasks.html
This following code defines a single output task and calls it as a dependency to other tasks. Yet TaskSingleOutput1 & TaskSingleOutput2 are not defined anywhere on this page.
# quick save one output
class TaskSingleOutput(d6tflow.tasks.TaskPqPandas):
def run(self):
self.save(data_output)
# no dependency
class TaskSingleInput(d6tflow.tasks.TaskPqPandas):
#[...]
# single dependency
@d6tflow.requires(TaskSingleOutput)
class TaskSingleInput(d6tflow.tasks.TaskPqPandas):
#[...]
# multiple dependencies
@d6tflow.requires({'input1':TaskSingleOutput1, 'input2':TaskSingleOutput2})
class TaskMultipleInput(d6tflow.tasks.TaskPqPandas):
#[...]
Also, it should be made clear in something like this example that the child keys are labeled in the persist, and the parent keys are defined in the dependency call.
# multiple dependencies, single & multiple outputs
@d6tflow.requires({'input1':TaskSingleOutput, 'input2':TaskMultipleOutput})
class TaskMultipleInput(d6tflow.tasks.TaskPqPandas):
def run(self):
data = self.inputLoad(as_dict=True)
data1a = data['input1'] # We reference the key defined in the dependency call
data2a = data['input2']['output1'] # 'output1' is a persist label defined in TaskMultipleOutput
data2b = data['input2']['output2']
If I run
d6tflow.run(Task())
I get lots of output (the 'Luigi execution summary').
Granted, this is useful information, but sometimes I'd like to be able to just run a task without getting so much output.
Could we add a flag (e.g. verbose
) which, if set to False
, just runs the task without the Luigi execution summary getting displayed?
This is something I could work on, if you'd consider it worthwhile.
Hi all,
I am a beginner, so I have a very naive question: Is d6tflow recommended, even I am not using any type of Machine-Learning tools? I am asking that because all examples I came across involve some kind of ML.
I just wanted a framework, which has guidelines to get my codes organized.
Thanks,
Uendert
Hi, I would like your help figuring out how to configure the logging, so an example like this outputs the log message in the console. The only mention of logging configuration in the docs is to change the logging level.
import d6tflow
import logging
log = logging.getLogger()
class TaskTest(d6tflow.tasks.TaskCache):
def run(self):
log.info("Hello!")
self.save({"output": 0})
d6tflow.run(TaskTest())
Is there a method by which you could configure d6tflow to save a result of the output outside of a local filesystem. For example, if you do not have the memory to save large datasets from spark into your local development environment, is there a way to configure d6tflow to reference that environment instead of its local path?
Guys, could you please tell me how can I create child tasks? Suppose, Task1 has a list as an output (A)
Then, Task2 must be executed for each value of A. It will have a list as an output (B).
Then I want to run Task3 for each result of list B. How can I do it?
I've create the aggregator:
class TaskAggregatorJavaFiles(d6tflow.tasks.TaskAggregator):
dir_to_search = d6tflow.Parameter()
dir_to_save = d6tflow.Parameter()
def run(self):
test_files = set(Path(self.dir_to_search).glob('**/*Test*.java'))
not_test_files = set(Path(self.dir_to_search).glob('**/*.java'))
files_without_tests = list(not_test_files.difference(test_files))
for _, file in enumerate(files_without_tests):
print(_)
yield TaskPreprocessJavaFile(file=str(file))
I see, that it iterates over files. But then the child Task is not spawned:
class TaskPreprocessJavaFile(d6tflow.tasks.TaskPickle):
file = d6tflow.Parameter()
def run(self):
# it is not executed in parallel
self.save({'bla': 'bla', "text": self.file})
The problem that they are not executed in parallel. How can I run asynchronous child (in parallel)
I would like to catch an AssertionError that is raised within my task, but it seems I can only catch generic RuntimeErrors, and I can't figure out how to see what other errors may have been raised before it (programmatically)
For example, this will not work:
import d6tflow
class TaskExample(d6tflow.tasks.TaskCache):
def run(self):
result = 5 + 5
assert result > 100, "result must be greater than 100"
self.save(result)
try:
d6tflow.run(TaskExample())
except AssertionError:
# do some special logic here for this particular error
print("Exception caught")
This will work but it's not desirable to catch all generic RuntimeErrors
import d6tflow
class TaskExample(d6tflow.tasks.TaskCache):
def run(self):
result = 5 + 5
assert result > 100, "result must be greater than 100"
self.save(result)
try:
d6tflow.run(TaskExample())
except RuntimeError:
print("Exception caught")
I tried using the traceback
library to get the stack trace but I could only get the stack trace of the RuntimeError, which is not very useful. Is there any way to catch a specific error or at least get the last error that was raised before RuntimeError?
Edit: Also, when you except a RuntimeError
, the stack trace still prints and I'm wondering if there's a way to suppress that
I started using d6tflow for a data science project but, after several weeks, I got somewhat frustrated with this limitation: I needed Tasks able to generate outputs of different types (like, for example, csvs and pickles).
Finally, we decided to write our own "version" of d6tflow by extending the luigi library.
I share how we implemented this, just in case anyone is interested in making this public (I am a newby in github).
Comments and names are sometimes in spanish. But I think it's easy to understand the logic.
Appart from this improvement (having a dict-like "persist" allowing to state the type of each target), there are more differences that for us made life easier:
I know this is "too much" for an issue, but just in case someone finds something interesting and worth integrating in d6tflow, I share it with the community.
class MidasLocalTarget(luigi.LocalTarget):
'''
path: ruta completa de la carpeta final donde se almacena el objeto. Es un objeto tipo Pathlib
task_id: identificador de la tarea propietaria del target
name: nombre del objeto (clave del diccionario persist)
extension: extensión del archivo
'''
def __init__(self, path=None, name=None, task_id=None, extension = None):
self.path = path
self.task_id = task_id
self.name = name
self.extension = extension
self.path_completo = self.path / (self.task_id + '_' + self.name + '.{}'.format(self.extension))
super().__init__(self.path_completo)
# Restauramos el path por si ha sido modificado:
self.path = path
def exists(self):
return self.path_completo.exists()
def invalidate(self):
if self.exists():
self.path_completo.unlink()
return not self.exists()
def write_params_file(self, path, parameters):
path_file = path / (self.task_id + '_' + 'parameters.txt')
file = open(path_file,"w")
file.write(parameters)
file.close()
class MidasPickleTarget(MidasLocalTarget):
def load(self, **kwargs):
if self.exists():
with open(self.path_completo,"rb" ) as fhandle:
data = pickle.load(fhandle)
return data
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, obj, parameters, **kwargs):
self.path.mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
with open(self.path_completo, "wb") as fhandle:
pickle.dump(obj, fhandle, **kwargs)
return self.path_completo
class MidasPandasTarget(MidasLocalTarget):
def generate_profiling(self, reports_path, parameters):
(reports_path).mkdir(parents=True, exist_ok=True)
self.write_params_file(reports_path, parameters)
pd_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'pandas_profiling.html')
df = self.load()
profile = ProfileReport(df, title="Pandas Profiling Report", minimal=True)
profile.to_file(pd_profile_name)
if 'target' in df.columns:
sv_profile_name = reports_path / (self.task_id + '_' + self.name + '_' + 'sweetviz.html')
my_report = sv.analyze(df, target_feat='target', pairwise_analysis = 'off')
my_report.show_html(sv_profile_name, open_browser=False)
class MidasCSVTarget(MidasPandasTarget):
def load(self, **kwargs):
if self.exists():
opts = {**{'sep':';','decimal':','},**kwargs}
df = pd.read_csv(self.path_completo, **opts)
return df
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, df, parameters, save_index=True,**kwargs):
(self.path).mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
opts = {**{'sep':';','decimal':',', 'compression':'gzip', 'index': save_index},**kwargs}
df.to_csv(self.path_completo,**opts)
return self.path_completo
class MidasPqTarget(MidasPandasTarget):
def load(self, **kwargs):
if self.exists():
df = pd.read_parquet(self.path_completo)
return df
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, df, parameters, save_index=True,**kwargs):
(self.path).mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
opts = {**{'compression':'gzip', 'index': save_index, 'engine': 'pyarrow'},**kwargs}
df.to_parquet(self.path_completo,**opts)
return self.path_completo
class MidasDocxTarget(MidasLocalTarget):
def load(self, **kwargs):
if self.exists():
docx = Document(self.path_completo)
return docx
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, docx, parameters, **kwargs):
(self.path).mkdir(parents=True, exist_ok=True)
self.write_params_file(self.path, parameters)
docx.save(self.path_completo)
class MidasCacheTarget(luigi.LocalTarget):
'''
task_id: identificador de la tarea propietaria del target
name: nombre del objeto (clave del diccionario persist)
'''
def __init__(self, name=None, task_id=None):
super().__init__(Path(os.path.abspath(os.getcwd())).parent)
self.task_id = task_id
self.name = name
self.clave = task_id + name
def exists(self):
return self.clave in cached_targets
def invalidate(self):
if self.clave in cached_targets:
cached_targets.pop(self.clave)
def load(self):
if self.exists():
return cached_targets.get(self.clave)
else:
raise RuntimeError('Target does not exist, make sure task is complete')
def save(self, o):
"""
Save object to in-memory cache
Args:
df (obj): pandas dataframe
Returns: filename
"""
cached_targets[self.clave] = o
return self.clave
class MidasTask(luigi.Task):
"""
Clase propia que añade funcionalidad sobre la clase genérica de Luigi. Características:
- Tiene un método save, al que se debe invocar al final del run(). Este método save realiza el guardado de todos los objetos
definidos en el atributo persist. No es necesario implementar este método en las clases que extiendan a MidasTask.
- Tiene un método write_parameters, que devuelve un string con todos los parámetros (significativos) de la clase.
Se usa de manera instrmental, y para guardar un txt asociado a cada objeto que se guarde en disco, y de esta manera
tener a mano los parámetros de creación de cada objeto). No es necesario implementar este método en las clases
que extiendan a MidasTask.
- Hay que implementar un método get_path, que, en función del tipo de ubicación, devuelve el Path donde se almacenará el objeto.
Nota: a ese Path se añadirá otro nivel más con el nombre de la tarea, pero eso se hace de manera automática.
- Tiene un método para generar reports con PandasProfiling y SweetViz en todos los targets tipo csv.
Es preciso que los targets existan para que funcione.
El diccionario persist debe ser creado en cada clase particular que extienda a MidasTask. Debe contener un item por
cada objeto que se quiere guardar en disco, y los valores son una lista de dos elementos, el formato y el tipo de
ubicación. Ejemplos:
persist = {
'dataset_salida': ['csv','data_processed']
}
persist = {
'modelo': ['pkl','modelos']
}
Admite tres formatos:
csv (que se guarda comprimido para ahorrar espacio)
docx
pkl
pq (para parquet)
cache (que no guarda en disco)
Respecto las ubicaciones, admite los siguientes valores:
'data_interim'
'data_processed'
'docs'
'modelos'
'reports'
'resultados'
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task_id_hash = self.task_id.split('_')[-1]
def save(self, data, save_index=True,**kwargs):
targets = self.output()
if not set(data.keys())==set(targets.keys()):
print(data.keys())
print(targets.keys())
raise ValueError('El diccionario guardado ha de ser consistente con el objeto persist')
for k, v in data.items():
if isinstance(targets[k],MidasCSVTarget) or isinstance(targets[k],MidasPqTarget):
targets[k].save(v, self.str_parameters(), save_index=save_index, **kwargs)
elif isinstance(targets[k],MidasCacheTarget):
targets[k].save(v, **kwargs)
else:
targets[k].save(v, self.str_parameters(), **kwargs)
def output(self):
output = {}
for k,v in self.persist.items():
if v[0] == 'csv':
extension = 'csv.gz'
elif v[0] == 'pq':
extension = 'pq.gz'
else:
extension = v[0]
save_path = self.get_path(v[1]) / self.get_task_family()
if v[0] == 'csv':
output[k] = MidasCSVTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'pq':
output[k] = MidasPqTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'pkl':
output[k] = MidasPickleTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'docx':
output[k] = MidasDocxTarget(path=save_path, name = k, task_id=self.task_id_hash, extension = extension)
elif v[0] == 'cache':
output[k] = MidasCacheTarget(name = k, task_id=self.task_id_hash)
else:
raise ValueError('Formato de objeto no implmentado: ' + v[0])
return output
def generate_profiling(self):
reports_path = self.get_path('reports')
for k,v in self.output().items():
if isinstance(v,MidasPandasTarget):
v.generate_profiling(reports_path,self.str_parameters())
def get_path(self,tipo_ubicacion):
"""
Este método se debe implementar en cada clase final que extienda de MidasTask. Debe definir dónde se guarda cada tipo de objeto.
No se debe añadir el nombre de la tarea (eso se hace automáticamente después)
Debe devolver un objeto tipo Path.
Ejemplo de implementación:
project_root = Path(os.path.abspath(os.getcwd())).parent
if tipo_ubicacion == 'data_processed':
return project_root / 'data' / 'processed'
...
"""
raise Exception("get_path() not implemented")
def str_parameters(self):
params_text = ""
for k in self.param_kwargs.keys():
params_text = params_text + '{key} tiene el valor {value}\n'.format(key=k, value=self.param_kwargs[k])
return params_text
def common_params(dict_params, task_cls):
"""
Grab all the values in dict_params that are found in task_cls.
Función similar a luigi.utils.common_params, pero no entre instancia y clase sino entre diccionario y clase
"""
dict_param_names = dict_params.keys()
task_cls_params_dict = dict(task_cls.get_params())
task_cls_param_names = task_cls_params_dict.keys()
common_param_names = set(dict_param_names).intersection(set(task_cls_param_names))
common_param_vals = {key : dict_params[key] for key in common_param_names}
return common_param_vals
TaskC().child().load()
I am currently working with AWS SageMaker, and it requires a text file to be uploaded as the data.
Would we be able to implement a Task that will output a simple text file?
Hi! Just started trying out d6tflow.
I seem to have run into a problem:
When running:
`class TaskCombineData(d6tflow.tasks.TaskPqPandas):
def requires(self):
return TaskGetData1(), TaskGetData2()
def run(self):
df_data1, df_data2 = self.loadInputs()
data = pd.concat([df_likes, df_dislikes])
self.save(data)`
I get the following error:
'TaskCombineData' object has no attribute 'loadInputs'
Have I misunderstood the usage mentioned here?
d6tcollect==1.0.5
d6tflow==0.1.5
Hello!
Suppose that I created a TaskData
instance, executed it and saved outputs. Then I'd like to reuse its results in some other file (say, Jupyter Notebook to analyze results) by creating an instance of that task there and calling output
and then load
. But I would be unable to do so as data
directory won't be found there - I'll have to re-execute a task.
My suggestion: use absolute path in output
method: instead of settings.dirpath (simply data
) use something like
pathlib.Path(__file__).parent / "data"
- that would enable an option to access results from everywhere.
What do you think?
An example:
import d6tflow
import pandas as pd
import numpy as np
class test(d6tflow.tasks.TaskCSVPandas):
def run(self):
data = np.random.randn(2, 2)
data = pd.DataFrame(data, index=['a', 'b'], columns=['c', 'd'])
self.save(data)
d6tflow.run(test())
The actual data in the output csv file is:
c,d
1.5490553923182304,-0.3279984496021263
0.7946535471877705,0.5790784973358706
However, the expected data in the output file should be:
,c,d
a,0.20200720089562157,0.5134288778567592
b,2.918867471040273,-0.5393324706416279
The index is lost when using TaskCSVPandas to save data.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.