Comments (5)
for now I'm forcing use_barrier_indexes = False so that it uses the legacy code-path... then I can at least get things deployed
from appengine-pipelines.
Good call using use_barrier_indexes
for now - this is a potentially tricky one.
If we move to fix this, it may be surprising behavior for others.
We use namespaces as well, and I think this change should be ok from my perspective, because we don't manipulate the namespaces through the pipeline. For us, a pipeline starts completely in a namespace, and stays on that namespace through appengine_config.namespace_manager_default_namespace_for_request
.
With all of that noted, I want to make sure I understand the problem:
The datastore is complaining because your using an ancestor query, where the namespace of the ancestor does not match the namespace passed to the query (defaults to empty string). (https://cloud.google.com/appengine/docs/python/ndb/queryclass)
I believe the fix may be as simple as changing this line in notify_barriers
:
_BarrierIndex.all(cursor=cursor, keys_only=True)
=>
_BarrierIndex.all(cursor=cursor, keys_only=True, namespace=ancestor_key.namespace())
from appengine-pipelines.
when you say "defaults to empty" I don't think that's what is happening... instead I believe the query is defaulting to the namespace of the task that called it (child task which was in namespace "1" called /output which triggered BarrierHandler), but the ancestor key is from the root pipeline's namespace (which is '')... that's how it looks from the last line printed out in the trace:
(ancestor.name_space(), namespace)) ---> ("''" != "'1'")
when I grab the string keys from the headers and build datastore Keys out of them, it appears the ancestor has the root pipeline ID in it, but that ID doesn't exist in the current namespace (even in the console if you open the detail barrier record, you can't click on the ancestor because it doesn't exist)
guessing, as you explained how you guys normally use the namespaces, that the Key path is just getting generated with a list of kind/id assuming they are all in the same namespace.... but in my case the top-level kind/id isn't... so it makes that key technically invalid
I can probably re-engineer our stuff to kick off a pipeline per namespace... but let me know if you think you can get it working! Thanks!
from appengine-pipelines.
Obviously, longterm: you don't want to kick off one pipeline per namespace permanently, you lose the major benefits of pipeline fan-out abortion and success if you aren't yielding child pipelines.
You are also correct about the default namespace - it would be the namespace of the process that created the task, for any of the pipeline tasks complete
fanout
abort
etc.
A much more comfortable solution, now that I'm seeing a larger scope here, would involve keeping ALL pipeline entities in the namespace of the root pipeline. Then, if you need namespace switching, you would explicitly achieve this per pipeline.
This also simplifies the answer to "How do I find a pipeline with a given pipeline_id
?".
One paradigm I use a lot is class inheritance for pipelines with a common setup function, to prepare for any common variables. This is good for your larger pipeline chains, like I believe you may have.
class MyRootPipeline(Pipeline):
def setup(self, **kwargs):
"""Perform setup for MyRootPipeline and all derivative pipes."""
# Get pipeline information, and do setup.
self.namespace = kwargs.get('namespace', None)
self.kwargs = kwargs.copy() # changes to kwargs shouldn't affect local copy
if self.namespace:
# Sets the namespace for the current HTTP request.
namespace_manager.set_namespace(self.namespace)
def run(self, **kwargs):
# Do your setup
self.setup(**kwargs)
# Do stuff
# Yield child with same kwargs, and any additional args.
kwargs['namespace'] = "other_namespace"
yield ChildPipeline(child_wants_candy=True, **kwargs)
class ChildPipeline(MyRootPipeline):
"""Subclassed from MyRootPipeline for common setup procedure."""
def run(self, child_wants_candy, **kwargs):
# Performs setup, switches namespace, ...
self.setup(**kwargs)
# Stuff and things in the new namespace.
This won't work now, until the pipeline knows to explicitly use the namespace for the yielded child pipeline and callback tasks, etc. - but that's the support I'd consider targeting for this issue.
I hope this all makes sense!!! 🐻
from appengine-pipelines.
Funny... that's pretty much exactly our pipeline subclass paradigm as well
class MigrateFiles(MigrationPipeline):
def run(self):
self.setup()
...
as for our existing structure, we don't need to run a single pipeline across namespaces, we just had it setup that way. we could definitely restructure it to be one per namespace... but as you stated, long term, it would be better to not have to think about it... and it would indeed be easier to figure out where the pipeline records are (and clear out the old ones) if they were always stored in the default namespace
So yeah, this all makes sense... and I really appreciate your input!
from appengine-pipelines.
Related Issues (20)
- [Python][Bug] Yielded pipeline does not run on the specified task queue. HOT 3
- Files API Still Being Used in the demo app HOT 2
- In notify_barriers when queue task_retry_limit exceeded the pipeline never finalize HOT 5
- TaskQueue behavior for duplicate tasks in a list during Queue.add() HOT 3
- App Engine Pipeline not aborting all the way to root HOT 3
- Move pipeline URL from '/_ah/pipeline' to something else HOT 1
- ModulesService#getVersionHostname causes pipelines to fail often HOT 1
- Bumb pypi version? HOT 5
- Target param ignored during retry attempts and barrier notifications. HOT 4
- waitFor doesn't support FutureList HOT 2
- [JAVA] Cannot find the slot: pipeline-slot("56da1731-d616-4c7c-98e3-d21fe9e386b0"). Ignoring the task.
- GAE/GCE "python" runtime support? HOT 1
- exception during /_ah/pipeline/output: ancestor argument should match namespace ("'101970'" != "'102550'") HOT 1
- Sources for 0.2.13
- Use of im_func.
- Starting a task on a local module without an explicit version fails HOT 2
- cloud sdk 123 causes PipelineSetupError in dev server HOT 4
- _fix_path prevents module from importing if PYTHONPATH is not set
- Java: configure PipelineServlet to use a different (datastore) namespace
- Python: Bad target when running in development environments
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from appengine-pipelines.