etsy / boundary-layer Goto Github PK
View Code? Open in Web Editor NEWBuilds Airflow DAGs from configuration files. Powers all DAGs on the Etsy Data Platform
License: Apache License 2.0
Builds Airflow DAGs from configuration files. Powers all DAGs on the Etsy Data Platform
License: Apache License 2.0
We have found that under the following conditions, an error occurs:
resource
is created inside the generator workflowUnder these conditions, boundary-layer
by default inserts a sentinel
node downstream of the node(s) matching these conditions, in order to propagate errors past the resource-destroy step. However, inside the generator, the sentinel
node that is created does not have <<item_name>>
appended to its task_id
. This causes errors when that node is connected to the generator's downstream dependencies, because the Airflow set_upstream()
method will be called repeatedly (once per generator element) for the (sentinel_node, generator_downstream_dependency)
pairs. Airflow is proactive about alerting on this, which is nice (although in theory it's not really a problem...).
Anyway this is a bug that we should fix.
Do you provide documentation for external users on the behavior of json schema entries from
boundary_layer/schemas/dag.py? Thanks.
TriggerDagRunOperator
is presently not supportedExternalTaskSensor
do manage to capture dependencies across DAGs, but they fail to provide the same reactive / eager triggering as TriggerDagRunOperator
doesi wrote some (boundary-layer) schemas for my Airflow custom operators. Following same convention as boundary_layer_default_plugin
, i bundled all configs
, plugin.py
and preprocessor.py
file into a single folder.
.
now i was wondering how would boundary-layer pick my custom plugin and generate python code that employs those custom operators.
.
for POC, i used the hack where i manually updated entry_points.txt
& top_level.txt
files in pip's site-packages/boundary_layer-1.6.16.dist-info
directory. And it worked: i was able to generate python DAG-definition file that employed my custom operators.
.
but i still don't understand what is the right way to deploy my boundary-layer plugin. i can do this by modifying setup.py
file, but that defies the purpose of plugin: i will be modifying the source code of boundary-layer itself.
Hi all!
In the last year we have been using this wonderful library.
We love it a lot, and as one of our engineers keep saying "Boundary Layer helps us to not be in the business of generating python code". We definitely prefer YAML.
However, in the last weeks we saw a decrease support in PRs that we need, such as #110 and #112.
Before we take the approach of forking and applying changes we need on our own fork, I thought it will be better to reach out to you guys, and specifically add some of the folks that created the last PRs (such as @vchiapaikeo, @dossett, @gpetroski-etsy and @mchalek).
It will be really great if we can get the minimal support we need (updating configuration, to support more operators...), so other Boundary Layer users can enjoy more operators that exists our there in Airflow...
Thoughts?
Thank you!
Eyal
@eap, @bthomee and @jcraver1021 FYI.
Hi team,
I want to use xcom values in downstream tasks. how to do that?
Hi,
From my testing as well as this schema in the source, seems like only catchup
, max_active_runs
, concurrency
and schedule_interval
are allowed in dag_args
section. However, there are many more other parameters available in DAG constructor (see this). In most of our DAGs in production, we use dagrun_timeout
and on_failure_callback
. I am wondering is it possible to enable more parameters for dag_args
schema without changing the source code, maybe through a plugin? Thanks for the help!
The existing operators list (https://github.com/etsy/boundary-layer/tree/master/boundary_layer_default_plugin/config/operators) there is no support in an EmailOperator
(https://airflow.apache.org/docs/stable/_modules/airflow/operators/email_operator.html).
This PR is for adding the required support in EmailOperator
.
Can we create more than 3 operator groups?
As far as I understand, we can only create 3 groups which is under 'before', 'operators' and 'after' section. How if we want to create let say 4 groups? And can we specifically route each operator ?
These are just the some examples of the use cases (ignore unused line in last image):
With the addition of the GCP Pubsub Publish Operator, the preprocessor for this converts message data
property to bytes. However, bytes
are not supported when generating the actual Python DAGs. The below is the error seen:
...
File "/home/jenkins/workspace/bigdataconf-test-pr/venv/lib/python3.7/site-packages/boundary_layer/builders/util.py", line 132, in format_value
type(value)))
Exception: Cannot format value `b'U1VDQ0VTUw=='`: no handler for type <class 'bytes'>
The solution for this likely lies in boundary_layer/builders/util.py
in the function format_value
. However, I know that byte literals are treated differently in Python 2 and Python 3, so this might be a little more involved of a change. Regardless, there may be future instances when generating a DAG with byte literals is required as well, so it might be worth looking into.
Right now, if you have a parameter that uses the EnsureRenderedStringPattern
pre-processor, and if you pass a verbatim string to this parameter, there is a chance that the parameter value will be rejected as invalid because the characters <
and >
are not supported by the provided regular expression. An example is the dataproc cluster_name parameter. We should recognize this condition and generate a warning, like we do when we encounter a jinja template that we do not know how to render.
airflow 1.10.6 Requirement.parse('jsonschema>=3.0.1<4'), {'flask-appbuilder'})
boundary-layer 1.7.24 has requirement jsonschema<3.0,>=2.6.0, but you'll have jsonschema 3.2.0 which is incompatible.
So any suggestions to by-pass this by relaxing version or what is the version of airflow to work with boundary-layer?
Hello,
I need to define the upstream dependencies from a key upstream from the items of the generators.
Something like this
generators:
- name: bq-jobs
type: list_object_generator
target: bq-job
properties:
items:
- name: job1
upstream: []
- name: job2
upstream:
- job1
---
name: bq-job
operators:
- name: << item['name'] >>
type: kubernetes
upstream_dependencies: << item['upstream']>>
When I try this I have this error
Found errors in sub dag: {'operators': {0: {'upstream_dependencies': ['Not a valid list.']}}}
Do you know how can I make it work ?
Thanks !
See: https://airflow.apache.org/docs/stable/_modules/airflow/models/dag.html#DAG:
:param tags: List of tags to help filtering DAGS in the UI.
:type tags: List[str]
Hello,
I am trying to convert existing DAG to YAML, but i don't see options to add security_context and service_account_name details in YAML file. is there any workaround for adding these options in YAML file? i don't see these options are available in Kubernetes properties.
Thank You,
Syam
What are the best practices when passing non-primitive types as arguments to operators? Is doing so currently supported in boundary-layer?
๐ Just dropping by to let you know that marshmallow v3 is released.
Upgrading will provide a few benefits for boundary-layer:
StrictSchema
. v3 has first-class support for unknown field validation, and the default behavior matches StrictSchema
: https://marshmallow.readthedocs.io/en/stable/quickstart.html#handling-unknown-fieldsList(Nested(...))
has been improved to have the same validation and performance characteristics asNested(many=True)
(List(Nested) was previously slower due to Nested(many=True)
taking an optimized path)boundary-layer/boundary_layer/schemas/dag.py
Lines 87 to 89 in 9b09956
load_from
and dump_to
match, since they've been merged into a single parameter, data_key
.boundary-layer/boundary_layer_default_plugin/oozie_actions.py
Lines 86 to 87 in 5e52e1c
I've only skimmed the boundary-layer code, but it looks like the migration should be relatively straightforward.
data_key
instead instead of load_from
and dump_to
..load
and .dump
to handle ValidationError and expect the (de)serialized data dictionary to be returned. try:
data = OozieWorkflowSchema(context={
'cluster_config': cluster_config,
'oozie_plugin': oozie_config,
'macro_translator': JspMacroTranslator(oozie_config.jsp_macros()),
'production': self.production,
}).load(parsed)
except ma.ValidationError:
raise Exception('Errors parsing file {}: {}'.format(
filename,
loaded.errors))
data_copy = data.copy()
**kwargs
to decorated methods. @validates_schema
def validate_template_undefined(self, data, **kwargs):
# ...
@post_dump
def dagrun_timeout_to_timedelta(self, data, **kwargs):
# ...
A full upgrading guide is here: https://marshmallow.readthedocs.io/en/latest/upgrading.html
It's worth knowing that marshmallow 3 only supports Python 3. So you'd need to drop support for 2.7 in order to upgrade. Join the party https://python3statement.org/ ! ๐
Airflow HTTP operator has an optional parameter named response_check
that gets a function (docs)
Adding a corresponding schema named http.yaml
for a boundary-layer
http operator resulted in the next error:
boundary_layer.exceptions.InvalidConfig: Invalid config spec in file c:\path_to_venv\site-packages\boundary_layer_default_plugin\config\operators\http.yaml: {'parameters_jsonschema': ["Invalid JSON schema: 'function' is not valid under any of the given schemas\n\nFailed validating 'anyOf' in schema['properties']['properties']['additionalProperties']['properties']['type']:\n {'anyOf': [{'$ref': '#/definitions/simpleTypes'},\n {'items': {'$ref': '#/definitions/simpleTypes'},\n 'minItems': 1,\n 'type': 'array',\n 'uniqueItems': True}]}\n\nOn instance['properties']['response_check']['type']:\n 'function'"]}
http.yaml
content is: (with problematic parameter #'d out)
name: http
operator_class: SimpleHttpOperator
operator_class_module: airflow.operators.http_operator
schema_extends: base
parameters_jsonschema:
properties:
http_conn_id:
type: string
endpoint:
type: string
method:
type: object
data:
type: object
headers:
type: object
# response_check:
# type: function
extra_options:
type: object
xcom_push:
type: boolean
log_response:
type: boolean
required:
- http_conn_id
- endpoint
additionalProperties: false
Hi there, I ran into this situation:
Given yaml like:
arguments:
- --textproto_content
- 'query: "\''foo\'' = \"foo\"" x: "bar"'
Boundary layer produces Python code like (whitespace added for clarity):
arguments = [
'--textproto_content',
"""query: "\'foo\' = \"foo\"" x: "bar"""",
There are two problems with the generated """query:...
code above:
""""
causes a syntax error>>> print("""query: "\'foo\' = \"foo\"" x: "bar"""")
File "<stdin>", line 1
print("""query: "\'foo\' = \"foo\"" x: "bar"""")
^
SyntaxError: unterminated string literal (detected at line 1)
\
characters that were part of the original string (and we need so that the resulting string that is passed to the operator is valid textproto)>>> print("""query: "\'foo\' = \"foo\"" x: "bar" """)
query: "'foo' = "foo"" x: "bar"
Expected:
query: "\'foo\' = \"foo\"" x: "bar"
The signature of process_arg(..)
method of Preprocessor
's hints that there's provision for passing and using arguments to preprocessor
@abc.abstractmethod
def process_arg(self, arg, node, raw_args):
pass
.
More precisely, I want to supply some arguments to my preprocessor from within my dag-definition-YAML file
.
I would like to figure out a way if I can inject some Python code at the beginning of my DAG.
The idea is that I want to use the python_callable
field on a PythonOperator
to call my code, however my code is going to be like 5-10 lines of code and I cannot really represent it with lambda (or I would like not to, cause it's going to be ugly!)
Currently I see two options but I would like to get your feedback please :) Thanks!
PythonOperator
and will do the magic. Then, Boundary Layer will load the YAML file I config in the plugin so the output DAG.py is valid. And, I'll also have to include the implementation itself (not the YAML) in the DAGs folder in airflow itself.While exploring these options, I preferred to file a bug and see what you guys think about it.
Thanks!
Hi Etsy Engineers!
I just wanted to start by commending your effort in putting together this project. I had the pleasure of trying it out in our airflow project, and we are very pleased by the result!
Our airflow project has many customized operators/hooks/etc. But with Airflow 2.0 removal of support in importing operators, sensors, and hooks via plugins, how do you suggest we register our custom plugins to be used by boundary layer?
https://airflow.apache.org/docs/apache-airflow/stable/plugins.html#plugins
Alana
Hi,
Thanks for open sourcing boundary-layer, this is really useful! While I am prototyping some plugins for some other open sourced operators and internal operators, I can't find any support for Variables (https://airflow.apache.org/concepts.html#variables). Am I missing something? Is there a plan to support Variable? The use case is that we want to maintain one yaml file across different environments. Thanks for the help!
I took hint from existing date_string_to_datetime
preprocessor and wrote my own preprocessor that converts input string argument into an Enum.
.
While the conversion worked, during dag-build
phase (Python code-generation), I ended up hitting this block and got Cannot format value
{}: no handler for type {}
.
.
We have seen behavior in which resources passed to generators containing some operators that use the resources, and other operators that do not use the resources, are not destroyed until every operator in the generator sub-workflow is complete. But this could mean keeping clusters around much longer than necessary, if the generator workflow contains long-running resource-independent operations.
This probably also applies to sub-dags, because resources are attached to them using the same mechanisms.
In some cases, it may be desired to use a generator but handle the output in batches instead of individually. For example, maybe you want to list the files in a Google Cloud Storage bucket and then create BigQuery load jobs with them in batches of 10 (instead of a single load job per file). It should be possible to specify batching parameters in a generator config to achieve this result.
This issue manifests in two ways:
First off, thank you so much for making this tool available! I am very happy using this library at my work.
For historical reasons, I version-control my generated DAGs and have found it easier to parse the diffs when I sort the operator arguments by name. I have tried this out locally by modifying the DagBuilderBase.render_operator
method. Would there be any reason not to have the builder sort the arguments for each operator?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.