Hi,
I'm not sure if this is a bug or a feature request.
I have a workflow that is very memory intensive but also very well suited to decomposition to a DAG.
My problem is that it if I keep any intermediate outputs in memory I will quickly exceed the capacity of one computer to hold the data in RAM.
I had hoped pyungo would be clever enough to allow intermediate states to be garbage collected, but it doesn't seem so.
See a sample program:
`
from pyungo import Graph
import numpy as np
import gc
@Profile
def main():
graph = Graph()
@graph.register()
def calc_a():
a = np.random.rand(8192,8192)
return a
@graph.register()
def calc_b():
b = np.random.rand(8192,8192)
return b
@graph.register()
def calc_c(a,b):
gc.collect()
c = a * b
print("c")
return c
@graph.register()
def calc_d():
gc.collect()
d = np.random.rand(8192,8192)
print("d")
return d
@graph.register()
def calc_pfd(c,d):
gc.collect()
e = c * d
return e
gc.collect()
res = graph.calculate(data={})
gc.collect()
print(res)
del res
gc.collect()
del graph
gc.collect()
main()
`
Output:
`
(venv) zenbook% python -m memory_profiler memtest.py
INFO:root:Starting calculation...
INFO:root:Ran Node(08f958eb-84ff-49ad-a2fb-a2ada5788705, <calc_a>, [], ['a']) in 0:00:02.127759
d
INFO:root:Ran Node(9cd9ce4e-16d7-4a43-83b7-a8e01e8bd8ba, <calc_d>, [], ['d']) in 0:00:01.618884
INFO:root:Ran Node(ea8b8c8f-d8fc-4967-a7c5-c3ba0dbcd550, <calc_b>, [], ['b']) in 0:00:01.519026
c
INFO:root:Ran Node(ac6d7004-7cb1-41ff-8476-a2a1ce9e64d6, <calc_c>, ['a', 'b'], ['c']) in 0:00:01.029356
INFO:root:Ran Node(7786d30e-7796-4d19-a692-07f2904ea6c8, <calc_pfd>, ['c', 'd'], ['e']) in 0:00:00.853072
INFO:root:Calculation finished in 0:00:07.152394
[[0.32979496 0.00617538 0.01675385 ... 0.08284045 0.03303956 0.09351132]
[0.00268712 0.20226707 0.06033366 ... 0.07918911 0.01333745 0.15655172]
[0.0007408 0.01337496 0.17597583 ... 0.19520472 0.0274126 0.07911974]
...
[0.00958562 0.00919059 0.10846052 ... 0.01235475 0.02207799 0.26674223]
[0.06822633 0.03539608 0.08139489 ... 0.08097827 0.10901089 0.02113664]
[0.01915152 0.00518849 0.34347554 ... 0.04939359 0.48837681 0.11771939]]
Filename: memtest.py
Line # Mem usage Increment Line Contents
5 29.688 MiB 29.688 MiB @profile
6 def main():
7 29.688 MiB 0.000 MiB graph = Graph()
8
9 29.691 MiB 0.000 MiB @graph.register()
10 29.691 MiB 0.004 MiB def calc_a():
11 541.562 MiB 511.871 MiB a = np.random.rand(8192,8192)
12 541.562 MiB 0.000 MiB return a
13
14 1053.578 MiB 0.000 MiB @graph.register()
15 29.691 MiB 0.000 MiB def calc_b():
16 1565.590 MiB 512.012 MiB b = np.random.rand(8192,8192)
17 1565.590 MiB 0.000 MiB return b
18
19 1565.590 MiB 0.000 MiB @graph.register()
20 29.691 MiB 0.000 MiB def calc_c(a,b):
21 1565.590 MiB 0.000 MiB gc.collect()
22 2077.605 MiB 512.016 MiB c = a * b
23 2077.605 MiB 0.000 MiB print("c")
24 2077.605 MiB 0.000 MiB return c
25
26 541.562 MiB 0.000 MiB @graph.register()
27 29.691 MiB 0.000 MiB def calc_d():
28 541.562 MiB 0.000 MiB gc.collect()
29 1053.578 MiB 512.016 MiB d = np.random.rand(8192,8192)
30 1053.578 MiB 0.000 MiB print("d")
31 1053.578 MiB 0.000 MiB return d
32
33 2077.605 MiB 0.000 MiB @graph.register()
34 29.691 MiB 0.000 MiB def calc_pfd(c,d):
35 2077.605 MiB 0.000 MiB gc.collect()
36 2589.621 MiB 512.016 MiB e = c * d
37 2589.621 MiB 0.000 MiB return e
38
39 29.691 MiB 0.000 MiB gc.collect()
40 2589.621 MiB 0.000 MiB res = graph.calculate(data={})
41 2589.621 MiB 0.000 MiB gc.collect()
42 2589.621 MiB 0.000 MiB print(res)
43 2589.621 MiB 0.000 MiB del res
44 2589.621 MiB 0.000 MiB gc.collect()
45 29.730 MiB 0.000 MiB del graph
46 29.730 MiB 0.000 MiB gc.collect()
`
After calc_c has run, a and b should be able to be garbage collected, but it seems a reference is held by graph to every output.