Giter Site home page Giter Site logo

appengine-pipelines's Introduction

Google App Engine Pipeline API
===========================

Official site: https://github.com/GoogleCloudPlatform/appengine-pipelines

The Google App Engine Pipeline API connects together complex,
workflows (including human tasks). The goals are flexibility,
workflow reuse, and testability.

A primary use-case of the API is connecting together various
App Engine MapReduces into a computational pipeline.

appengine-pipelines's People

Contributors

aizatsky-at-google avatar angrybrock avatar aozarov avatar billy1380 avatar bslatkin avatar csilvers avatar dependabot[bot] avatar eshlox avatar ksookocheff-va avatar lucena avatar ludoch avatar maciekrb avatar mattjo avatar sadovnychyi avatar someone1 avatar soundofjw avatar svpino avatar tkaitchuck avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

appengine-pipelines's Issues

Move pipeline URL from '/_ah/pipeline' to something else

Hello,

The Pipeline library will not work on AppEngine using Managed VMs. I opened a support case on the matter and was immediately shutdown and pointed to this document: https://cloud.google.com/appengine/docs/python/config/appconfig?hl=en#Python_app_yaml_Reserved_URLs

Ironically, on AppEngine this caveat doesn't seem to affect the Pipeline API, however, it does on Managed VM. I don't know if Google will ever allow the Pipeline API to work on Managed VM using the '/_ah/pipeline' path, but for now, I think moving the Pipeline API to default to another path is required.

To get the Pipeline API working on Managed VM, I had to update all my code starting pipelines to pass in the option base_path parameter to the start() function. If the code is updated to reflect a new default path (e.g. I used /_pipeline), then the Pipeline API should work fine in both AppEngine and Managed VMs.

exception during /_ah/pipeline/output: ancestor argument should match namespace ("'101970'" != "'102550'")

This may look like issue #42 but it isn't related. We've been running successfully with pipeline for a very long while now... and this just happened tonight... somehow a task seemed to "hop" namespaces during output? we have many thousands of these happen daily, and have never seen this before. I had to kill the task, and all I have is the stack-trace at the moment

this seems really scary... makes us wonder if there is some possibility another part of the pipeline could change to a different namespace and run some code against the wrong data....

taskName: "20523397474617268241"
21:16:34.624
ancestor argument should match namespace ("'101970'" != "'102550'")
Traceback (most recent call last):
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1511, in __call__
    rv = self.handle_exception(request, response, e)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1505, in __call__
    rv = self.router.dispatch(request, response)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1253, in default_dispatcher
    return route.handler_adapter(request, response)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1077, in __call__
    return handler.dispatch()
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 547, in dispatch
    return self.handle_exception(e, self.app.debug)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch
    return method(*args, **kwargs)
  File "/base/data/home/apps/s~batterii-server-prod3/tasks:v8-26-hf1.392086534787330564/pipeline/pipeline.py", line 2652, in post
    use_barrier_indexes=self.request.get('use_barrier_indexes') == 'True')
  File "/base/data/home/apps/s~batterii-server-prod3/tasks:v8-26-hf1.392086534787330564/pipeline/pipeline.py", line 1553, in notify_barriers
    barrier_index_list = query.fetch(max_to_notify)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 2161, in fetch
    return list(self.run(limit=limit, offset=offset, **kwargs))
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 2080, in run
    iterator = raw_query.Run(**kwargs)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1682, in Run
    itr = Iterator(self.GetBatcher(config=config))
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1661, in GetBatcher
    return self.GetQuery().run(_GetConnection(), query_options)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1535, in GetQuery
    group_by=self.__group_by)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 105, in positional_wrapper
    return wrapped(*args, **kwds)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 1933, in __init__
    ancestor=ancestor)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 105, in positional_wrapper
    return wrapped(*args, **kwds)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 1728, in __init__
    (ancestor.name_space(), namespace))
BadArgumentError: ancestor argument should match namespace ("'101970'" != "'102550'")

callbacks should be namespace aware

get_callback_url() contains only the pipeline ID but no information about the current namespace. The default _CallbackHandler will do the pipeline lookup in the default namespace, so if the original pipeline was running within a non-default namespace, the callback handler wont be able to find it

How to run Python tests?

I also saw this question on the Google Groups.

I can't find a way to run the tests for Python:

$ cd python/test
$ python pipeline_test.py

WARNING:root:Could not load Pipeline API. Will fix path for testing. ImportError: No module named yaml
Traceback (most recent call last):
File "pipeline_test.py", line 40, in
from pipeline import common
File "../src/pipeline/init.py", line 49, in
from pipeline import *
File "../src/pipeline/pipeline.py", line 43, in
from google.appengine.api import files
File "/usr/local/google_appengine/google/appengine/api/files/init.py", line 30, in
import blobstore
File "/usr/local/google_appengine/google/appengine/api/files/blobstore.py", line 39, in
from google.appengine.api.files import file as files
ImportError: cannot import name file
"""

_fix_path prevents module from importing if PYTHONPATH is not set

If PYTHONPATH is not set, AttributeError is thrown:

  File "snip", line 8, in <module>
    from pipeline import pipeline
  File "/Users/grw/src/cms/env/lib/python2.7/site-packages/pipeline/__init__.py", line 47, in <module>
    _fix_path()
  File "/Users/grw/src/cms/env/lib/python2.7/site-packages/pipeline/__init__.py", line 26, in _fix_path
    all_paths = os.environ.get('PYTHONPATH').split(os.pathsep)
AttributeError: 'NoneType' object has no attribute 'split'

Bumb pypi version?

There were a number of fixes since the last release that should be of use to developers. I know at least our team requires the target propagation fix.

Thank you.

Noisy logging.error: "Bad state for purpose"

When debugging aborted pipelines, we tend to look to error logs with path:pipeline on the console.

In this search, you will find errors like the following:
Pipeline ID "23377674aae54a8b883cb75636a8fcf6" in bad state for purpose "abort": "aborted"

This specific message seems to me to be a bit verbose, as I'd expect this to be an okay situation.


There are perhaps 2 points of discussion worth having here:

  1. The obvious, why is an aborted pipeline being triggered for abortion?
  2. The more helpful, can we disable this particular log from emitting?

If the community finds it appropriate, I've tackled item 2 in #51 and its ready to be merged.
If not, we can keep the change to our fork at github.com/Loudr/appengine-pipelines . ๐Ÿป

TaskQueue behavior for duplicate tasks in a list during Queue.add()

I have some questions about the behavior of taskqueue.Queue().add(task_list) when 1 of the tasks is duplicate, but the others are not.

I am not aware of any documentation around the taskqueue API that explains this behavior - as such it seems undefined.

My concern is that adding a list of multiple tasks, where 1 may already have been tombstoned/created, may cause the other legitimate tasks to fail.

This affects the pipeline library in the following codeblock:

try:
  taskqueue.Queue(self.queue_name).add(task_list)
except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
  pass

if task_list:
try:
taskqueue.Queue(self.queue_name).add(task_list)
except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
pass

Any clues? This may be a good question for @tkaitchuck.
This was brought up in issue #54.

GAE/GCE "python" runtime support?

I've been investigating the feasibility of migrating from python-compat to python as defined on https://cloud.google.com/appengine/docs/managed-vms/python/migrating-an-existing-app (to get a more standard GCE environment, Google Cloud Debugger, etc).

But doing so means no more google.appengine.*. Was curious if there was any plan/possibility of this library working on GCE (through gcloud.datastore, etc), or if the reliance on push-taskqueues (which are apparently unavailable on GCE?) means this is a nonstarter that will never happen, and I should get used to my GAE-Managed-VM python-compat world for the longterm...

Thanks!

Starting a task on a local module without an explicit version fails

The root cause of this is this GAE dev_server issue: when running a module locally with dev_appserver.py which doesn't have version: in app.yaml, it will have something like "None.175924092260391774 in os.environ['CURRENT_VERSION_ID']. This results in util._get_task_target() returning something like "None.modulename" which then raises a InvalidModuleError when trying to enqueue the task in _PipelineContext.start.

One possible solution would be:

  • hopefully the GAE dev_server bug gets fixed quickly
  • to migrate util._get_task_target() over to modules (since it's using undocumented APIs). Something like:
from google.appengine.api import modules

# ...

def _get_task_target():
  # ...
  module = modules.get_current_module_name()
  version = modules.get_current_version_name()
  # workaround until https://code.google.com/p/googleappengine/issues/detail?id=13178& gets fixed
  if version == "None": version = None 

  if module == "default":
    return version
  if version is None:
    return module
  return "%s.%s" % (version, module)

exception: "ancestor argument should match namespace" during /_ah/pipeline/output

The root of this particular pipeline starts out in the default namespace, then it's immediate children change to a different namespace each (one for each of our customers), and those continue to spawn other pipeline children within that namespace. Something is happening when the pipeline is trying to finalize and roll up to the top... looks like slot_key being used as ancestor in BarrierIndex query has the default namespace ("" from our root pipeline) but the BarrierIndex query is using namespace from the child pipeline ("1" because new tasks, by default, take on the namespace of the task that spawned them)... and thus seeing BadArgumentError

This is my best guess at what is happening... and I don't understand the details of the code in order to try and render a fix... tho I'm going to try...

0.1.0.2 - - [17/Jun/2015:09:59:23 -0700] "POST /_ah/pipeline/output HTTP/1.1" 500 0 "http://1.tasks.some-app-id.appspot.com/_ah/pipeline/run" "AppEngine-Google; (+http://code.google.com/appengine)" "1.tasks.some-app-id.appspot.com" ms=15 cpu_ms=2 queue_name=notify task_name=0992829243632586978 instance=0 app_engine_release=1.9.22 
ancestor argument should match namespace ("''" != "'1'")
Traceback (most recent call last):
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1511, in __call__
    rv = self.handle_exception(request, response, e)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1505, in __call__
    rv = self.router.dispatch(request, response)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1253, in default_dispatcher
    return route.handler_adapter(request, response)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1077, in __call__
    return handler.dispatch()
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 547, in dispatch
    return self.handle_exception(e, self.app.debug)
  File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch
    return method(*args, **kwargs)
  File "/base/data/home/apps/s~some-app-id/tasks:1.385065318471689233/pipeline/pipeline.py", line 2647, in post
    use_barrier_indexes=self.request.get('use_barrier_indexes') == 'True')
  File "/base/data/home/apps/s~some-app-id/tasks:1.385065318471689233/pipeline/pipeline.py", line 1538, in notify_barriers
    barrier_index_list = query.fetch(max_to_notify)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 2161, in fetch
    return list(self.run(limit=limit, offset=offset, **kwargs))
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 2080, in run
    iterator = raw_query.Run(**kwargs)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1681, in Run
    itr = Iterator(self.GetBatcher(config=config))
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1660, in GetBatcher
    return self.GetQuery().run(_GetConnection(), query_options)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 1534, in GetQuery
    group_by=self.__group_by)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 105, in positional_wrapper
    return wrapped(*args, **kwds)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 1934, in __init__
    ancestor=ancestor)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 105, in positional_wrapper
    return wrapped(*args, **kwds)
  File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 1729, in __init__
    (ancestor.name_space(), namespace))
BadArgumentError: ancestor argument should match namespace ("''" != "'1'")```

Create a Mixin to work with testbed in unit tests

Check comments on PullRequest #28 for more information.

Copied from a comment posted from @aozarov:

Would have been nicer if that was a Mixin (and maybe using a constant for providing the list of queues to create [if we care do have different ones in different tests). Regardless we should provide a way deactivate testbed and call it it tearDown (explicitly or by mixin).

App Engine Pipeline not aborting all the way to root

I recently updated to the latest version of AppEngine Pipelines Python library and I'm seeing a problem where a pipeline abortion is not bubbling all the way to to the root. This used to work fine... can anyone shed light on it? I don't know what other info to provide...

screen shot 2015-09-01 at 2 40 33 pm

Create promise from handle

I have a generator task that creates a promise then spawns further generator tasks that rely on the same promise but there is no way to share that promised value with the child jobs. I cannot pass it as a job parameter because that will cause the children to wait for the promise.

I cannot also pass the promise and hold onto it when the child jobs are created because promised values are not serialisable.

Ideally I would like to be able to pass the child jobs a handle to the promise and then be able to convert that handle into a PromisedValue when I need it.

Python: Bad target when running in development environments

Deployment of App Engine resources dictates that a version is no longer specified in app.yaml. As a consequence of this, attempts to get the current version via os.environ["CURRENT_VERSION_ID"] return "None." and extracting the None yields a bad target.

The solution is to conditionally use the version in the task target.

Avoid using the task queue for immediate values.

Currently pipeline triggers a taskqueue request many more times than it really needs.
E.g. creating a child job with N immediate values and 1 future value will trigger N taskqueue "VALUE FILLED" requests where actually none is really needed in this case.

We optimize, this by simply detecting they are immediate results and inlining the call.

_

Edit: Question moved to mapreduce repo.

Missing image in User Guide

The "ComplexJob" child graph image is missing. When I attempt to load the image URL directly, it returns an HTTP 404 not found.

This is the user guide page URL: https://github.com/GoogleCloudPlatform/appengine-pipelines/wiki/Java

This is the missing image URL: https://camo.githubusercontent.com/1890ec0c618ae97fa60048f8f5cc3276fba29b25/68747470733a2f2f617070656e67696e652d706970656c696e652e676f6f676c65636f64652e636f6d2f73766e2f696d616765732f636f6d706c65786a6f622e706e67

UI does not accomodate jobs with many children

We have jobs that can have 500 children, who themselves can have 500 children etc. This leads to a pipeline comprised of up to 100 000 jobs.

In that situation, the UI attempts to load all the objects in the pipeline tree in one go and exceeds the memory available to the App Engine instance.

A better option would be to progressively load the child jobs (and the attached slots, error entities etc) as they are required, starting with the jobs that are direct children of the pipeline. The child jobs would only be loaded when the user tries to expand part of the UI.

For information, the use case for those 100 000 jobs is iterating over all the users in a Google Apps domain. We can iterate by pages of 500 users, and we span a job for each user.

ModulesService#getVersionHostname causes pipelines to fail often

Hi,
we are seeing a number of cases where Pipelines seem to die because their tasks fail (and thus hit the max failuires in the queue). The main offender seems to be calls to ModulesService#getVersionHostname, which are done for each task instead of being set pipeline-wide.
Seems like a similar change (i.e. calling once per pipeline) has been implemented in mapreduce2 already.

We are using the Java version.

Allow users to name jobs

There seems to be a job display name and yet it is always the class name.

Add a method to allow the user to set the job display name rather than having to override the getJobDisplayName in every job

Sources for 0.2.13

Hi!

I just tried to compile current master, which marked as 0.3-SNAPSHOT, but looks like it missing some features from 0.2.13 (latest from maven repos) like JobSetting.OnModuleVersion.

I checked git log for JobSetting.java and haven't found JobSetting.OnModuleVersion either. And there are no version tags in repository to check out and look into specific version.

Is master branch outdated or am I missing something?

When the pipeline info exceeds 32MB the UI fails

Because the UI attempts to load all the information needed to display the outline in a single http request, and AppEngine caps data returned to 32MB if the info required to display the outline exceeds this, it results in: "Unable to parse JSON" being displayed, and showing nothing.

_write_json_blob can fail when default cloud storage bucket is not set

There is a bug in _write_json_blob when app_identity.get_default_gcs_bucket_name() returns None.

This only affects pipelines that need to store large data when yielding children.
This was probably introduced with #18 when removing the Files API.

Traceback

Bad child arguments. AttributeError: 'NoneType' object has no attribute 'startswith'
Traceback (most recent call last):
  File "/base/data/home/apps/s~xx/back:29.385854452397669105/lib/pipeline/pipeline.py", line 2283, in evaluate
    _generate_args(sub_stage, future, self.queue_name, self.base_path)
  File "/base/data/home/apps/s~xx/back:29.385854452397669105/lib/pipeline/pipeline.py", line 1403, in _generate_args
    params_blob = _write_json_blob(params_encoded, pipeline.pipeline_id)
  File "/base/data/home/apps/s~xx/back:29.385854452397669105/lib/pipeline/pipeline.py", line 1256, in _write_json_blob
    file_name = posixpath.join(*path_components)
  File "/base/data/home/runtimes/python27/python27_dist/lib/python2.7/posixpath.py", line 75, in join
    if b.startswith('/'):
AttributeError: 'NoneType' object has no attribute 'startswith'

Use of im_func.

Hi,

It is in src/pipeline/common.py#L325.

Use of staticmethod would be a better way, I suppose:

class Log(pipeline.Pipeline):
    _log_method = staticmethod(logging.log)
     def run(self, level, message, *args):
         # Log._log_method(level, message, *args)
         self._log_method(level, message, *args)

In notify_barriers when queue task_retry_limit exceeded the pipeline never finalize

in this line
https://github.com/GoogleCloudPlatform/appengine-pipelines/blob/master/python/src/pipeline/pipeline.py#L1672

The library simply passes the exception of taskqueue.TombstonedTaskError which happens often in our application at this point of the library.
But when the retries exceeds task_retry_limit the pipelines are not notified with the filled slots and the pipeline stay in the status of Run or Finalize forever.

is this kind of behavior is intended or there is a way to overcome this issue.

cloud sdk 123 causes PipelineSetupError in dev server

I have a mapreduce pipeline that worked in SDK 122 that now fails in the dev server with the following error after updating to SDK 123:

Traceback (most recent call last):
File "/vl/local/google-cloud-sdk/platform/google_appengine/lib/webapp2-2.5.2/webapp2.py", line 570, in dispatch
return method(_args, *_kwargs)
File "/Users/stanton/src/vlshare/appserver/src/vl/base.py", line 645, in wrapper
return handler(_args, *kwargs)
File "/Users/stanton/src/vlshare/appserver/src/vlops/handlers.py", line 319, in any
result = func(self.request)
File "/Users/stanton/src/vlshare/appserver/src/vlops/admin/files.py", line 301, in getZone
result = launchPipeline(GetZonePipeline(zone.key.urlsafe()))
File "/Users/stanton/src/vlshare/appserver/src/vl/pipe_utils.py", line 62, in launchPipeline
pipeline.start(base_path='/ops/pipeline')
File "/Users/stanton/src/vlshare/appserver/src/lib/pipeline/pipeline.py", line 673, in start
self, idempotence_key, str(e)))
PipelineSetupError: Error starting vlops.admin.files.GetZonePipeline(
('aghkZXZ-Tm9uZXIXCxIKWm9uZUVudGl0eRiAgICAgMCvCgyiAQZnbG9iYWw',), *
{})#342c4ceab7084adfb389b35b45960fdd:
As far as I can tell the pipeline is having problems submitting tasks to the task queue. Everything works as expected in the production environment, so this is specifically a problem with the dev server.

Any ideas? What additional logging would be helpful here?

[JAVA] Cannot find the slot: pipeline-slot("56da1731-d616-4c7c-98e3-d21fe9e386b0"). Ignoring the task.

Hello,

Sometimes, when I start several Jobs with the pipeline service, I got this error in my logs :

com.google.appengine.tools.pipeline.impl.PipelineManager querySlotOrAbandonTask: Cannot find the slot: pipeline-slot("56da1731-d616-4c7c-98e3-d21fe9e386b0"). Ignoring the task.
com.google.appengine.tools.pipeline.NoSuchObjectException: pipeline-slot("56da1731-d616-4c7c-98e3-d21fe9e386b0")

This error kills the job.

Thanks for your help

waitFor doesn't support FutureList

If you create a job waiting on a list of other jobs, then it will throw an exception.
Example: futureCall(new myJob(), waitFor(futureList(previousJobsFutureValues))).
Here is the message I get in the UI:
com.google.appengine.tools.pipeline.FutureList cannot be cast to com.google.appengine.tools.pipeline.impl.FutureValueImpl

At a first and very quick glance, i think the issue is located in JobRecord#applySetting function at the following line of code:
FutureValueImpl fv = (FutureValueImpl) wf.getValue();

[BUG] Pipeline.get_status_tree returns no slots for aborted pipeline with no children

We have a following pipeline:

class SomePipeline(pipeline.Pipeline):
    output_names = ['status']

    def run(self, execution_plan):
        status, error = // do external call

        self.fill(self.outputs.status, json.dumps({
            'status': str(status),
            'error': error
        }))

        //...

        if status == ERROR:
            raise Abort(error)

        if status == DONE:
            yield pipeline.common.Return(str(status))

Now, let's assume we're running this single pipeline and the Abort exception is thrown and the pipeline is aborted, but the slot was filled before that happened. Unfortunately, when I try to ask for Pipeline.get_status_tree() I get something like:

{
  "pipelines": {
    "eac5905d74e042aeae9ff30f60d5e09d": {
      "outputs": {
        "default": "ahpkZXZ-ZGV2LWF0bS1ldS1kYXRhbWFuYWdlcnJwCxITX0FFX1BpcGVsaW5lX1JlY29yZCIgZWFjNTkwNWQ3NGUwNDJhZWFlOWZmMzBmNjBkNWUwOWQMCxIRX0FFX1BpcGVsaW5lX1Nsb3QiIGMyNmQ3Nzg0MzNhMTRmYmQ4ZTBkYmI5Njc3NWIyZmZjDA",
        "status": "ahpkZXZ-ZGV2LWF0bS1ldS1kYXRhbWFuYWdlcnJwCxITX0FFX1BpcGVsaW5lX1JlY29yZCIgZWFjNTkwNWQ3NGUwNDJhZWFlOWZmMzBmNjBkNWUwOWQMCxIRX0FFX1BpcGVsaW5lX1Nsb3QiIGI1NTFkYjExOWFiMjQxMTRhZmRlMGQ2NGZiZWMxYmIyDA"
      },
       // other elements here
      "children": [],
      "status": "aborted"
    }
  },
  "slots": {},
  "rootPipelineId": "eac5905d74e042aeae9ff30f60d5e09d"
}

Please, observe, that "slots" is an empty object, although we got appropriate keys in the "outputs" section.

Status issue when running on App Engine module

I am running Mapreduce on a "module" and when I generate a pipeline status URL, I always get this kind of error:

/mapreduce/pipeline/status?root=dc2fb2204e404116ac2a6f2dbff3e8f2

Traceback (most recent call last):
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1535, in call
rv = self.handle_exception(request, response, e)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1529, in call
rv = self.router.dispatch(request, response)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1102, in call
return handler.dispatch()
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 570, in dispatch
return method(_args, *_kwargs)
File "/base/data/home/apps/s~realmassive-mr/tasks:mr.381800281691572294/distlib/pipeline/status_ui.py", line 73, in get
self.redirect(users.create_login_url(self.request.url))
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/users.py", line 258, in create_login_url
raise NotAllowedError
NotAllowedError

However, when I go to an individual task's status URL, everything works fine:

/mapreduce/status
/mapreduce/detail?mapreduce_id=1575755193601C7155E4D

Support post-pending a tag on the urls used in a pipeline

For DevOps purposes, we had added functionality to the deferred library to allow a task to be queued with additional information added to the task url (eg: normal task queues to "_ah/queue/deferred". We add a tag that queues it to the url "_ah/queue/deferred/BackendTaskOne"). This works because the deferred library routes are defined to just accept ".*" on your deferred mapping. With an application that uses a lot of deferred tasks, this makes path a very useful means to determine what deferred work is causing issues.

I would like similar functionality in the pipeline library. All calls to a pipeline result in the path "/mapreduce/pipeline/run" which makes it difficult to track on the app engine dashboard which pipeline is causing errors. The pipeline routes are defined as "/pipeline/run". I would propose they be defined as "/pipeline/run.*" which would allow for a custom tag to be post-pended on the path, making tracking easier.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.