Comments (5)
I have seen this issue too, where the pipeline stays in the Finalizing status, when it should have otherwise aborted
, but I am not certain that this is the root cause.
What this line is trying to achieve is idempotence, making sure that we haven't already fired the same task from the taskqueue
perspective. The goal of this codeblock is to prevent errors when we've actually already created and run the notify tasks.
What is an issue here, at a glance, is that task_list
contains one or more tasks, but any one of those tasks being duplicated will throw one of (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError)
.
There's nothing in the documentation that tells me what happens to the remaining tasks when one fails for this reason. Anyone from the Google team know for sure?
from appengine-pipelines.
@soundofjw
thanks for your reply.
Now I agree that it appears that this block of code is not responsible for the problem,I hope someone can help on this issue:
The root pipeline is either on run state or finalize for ever.
Number of child pipelines I can see from the UI in run state but they never actually started.
There is no errors at all in the logs.
Which gives me the feeling that the problem may be is that we have number of tasks that has not been added to the queue somehow.
from appengine-pipelines.
@ymohii Can you share the root pipeline code, specifically if you've got something going on in the finalized
method? - Obfuscate any business logic as necessary.
I've seen this problem occur when my finalized
methods throw Exceptions - do you see anything lik that in the logs? (I'd guess not, by your comment).
I wouldn't completely throw out your initial assumption - when I'm in situations like this I tend to start adding logging statements everywhere.
As an alternative debugging option, can you get your pipeline successfully running on your local environment?
I tend to choose using pipe.start()
over pipe.start_test()
and then executing the queues via the taskqueue stub. This is a pretty intimidating process the first time, so I recommend looking at how appengine-mapreduce
runs test cases. - Here's a stripped down version of similar implementation:
https://gist.github.com/soundofjw/8bb8247e5f7d1d31d917
from appengine-pipelines.
@soundofjw
Thanks for your feedback
my pipelines in simply an implementation of the fan-in example in this article http://sookocheff.com/post/appengine/pipelines/fan-in-fan-out/
I'm taking this code snippet from the article as an example
class SquarePipeline(pipeline.Pipeline):
def run(self, number):
logging.info('Squaring: %s' % number)
return number * number
class Sum(pipeline.Pipeline):
def run(self, *args):
value = sum(list(args))
logging.info('Sum: %s', value)
return value
class FanInPipeline(pipeline.Pipeline):
def run(self, count):
results = []
for i in xrange(0, count):
result = yield SquarePipeline(i)
results.append(result)
# Waits until all SquarePipeline results are complete
yield Sum(*results)
In my own pipeline there is nothing more than this fan-in implementation.
For this example (if the my problem happens to it) I logged throw the loop and found that the loop has called the SquarePipeline successfully for all the items
But some of the SquarePipeline pipelines do not start at all ( the logging I put inside the run method of SquarePipeline has never been called for some missing pipelines).
I can overcome the problem by increasing the number of retries for the queue in queue.yaml but i think this is not a good thing to rely on without understanding the problem.
regarding finalize method I there is no finalize implementation for it.
regarding local environment the pipelines are working fine also in small subsets of data, but in my app the data is more bigger (the loop in FanInPipeline is generating in some cases more than 100 of pipelines) and then the problem arises, which makes me not satisfied with increasing number of retries solution until I understand the source of the problem.
from appengine-pipelines.
Two comments:
- taskqueue.TombstonedTaskError and taskqueue.TaskAlreadyExistsError don't protect against all duplication of messages. IE a message could be received, processed, and then fail to be acknowledged. So the library code still needs to be idempotent. (by updating datastore)
- It would probably be much better to manage retries externally to the task queue. IE: set the taskqueue retry limit to infinite, and manage the sate in datastore. This would avoid the problem of having an inconsistent state between them resulting in silently stopping.
from appengine-pipelines.
Related Issues (20)
- [Python][Bug] Yielded pipeline does not run on the specified task queue. HOT 3
- Files API Still Being Used in the demo app HOT 2
- TaskQueue behavior for duplicate tasks in a list during Queue.add() HOT 3
- App Engine Pipeline not aborting all the way to root HOT 3
- Move pipeline URL from '/_ah/pipeline' to something else HOT 1
- ModulesService#getVersionHostname causes pipelines to fail often HOT 1
- Bumb pypi version? HOT 5
- Target param ignored during retry attempts and barrier notifications. HOT 4
- waitFor doesn't support FutureList HOT 2
- [JAVA] Cannot find the slot: pipeline-slot("56da1731-d616-4c7c-98e3-d21fe9e386b0"). Ignoring the task.
- GAE/GCE "python" runtime support? HOT 1
- exception during /_ah/pipeline/output: ancestor argument should match namespace ("'101970'" != "'102550'") HOT 1
- Sources for 0.2.13
- Use of im_func.
- Starting a task on a local module without an explicit version fails HOT 2
- cloud sdk 123 causes PipelineSetupError in dev server HOT 4
- _fix_path prevents module from importing if PYTHONPATH is not set
- Java: configure PipelineServlet to use a different (datastore) namespace
- Python: Bad target when running in development environments
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from appengine-pipelines.