atc-net / atc-dataplatform Goto Github PK
View Code? Open in Web Editor NEWA common set of python libraries for DataBricks
Home Page: https://atc-net.github.io/repository/atc-dataplatform
License: MIT License
A common set of python libraries for DataBricks
Home Page: https://atc-net.github.io/repository/atc-dataplatform
License: MIT License
In the moment, the atc.transformations.merge_df_into_target uses when matched update * when not matched insert *
. We should consider make a function for adding custom WHEN MATCHED AND .... AND.... OR
and WHEN NOT MATCHED AND...AND
.
Set up a class that takes as its one parameter the instrumentation key of an azure app insight.
https://docs.microsoft.com/en-us/azure/azure-monitor/app/opencensus-python
You can use the snippets from here to set up the app-insights and get that key:
https://github.com/atc-net/atc-snippets/tree/main/azure-cli/monitor
The new class, maybe named AtcInsightsReporter
, should take any arguments that are serializable, to json, and add them as properties to a log entry that is sent to the appinsights. Importantly, the class should use the job introspection available in atc-dataplatform to also add a link to the correct workspace and job-run, so that one can find the way back to the line that caused the log.
The class should be usable as a context manager, where it catches all exceptions and reports them to log analytics.
This will allow for developer attention, while also letting the job run to successful completion.
Create a handle class for cosmos. It should inherit ThHandle, and therefor conform to the methods of DbHandle and SqlHandle.
To assist unittest we want to be able to assert on the contents of dataframes.
Assertions may concern
Create as delta handle class
There is the following SPN's:
$mountSpnName = "AtcMountSpn"
$dbDeploySpnName = "AtcDbSpn"
$cicdSpnName = "AtcGithubPipe"
Do we need them all?
Having the possiblity of inputs on the etl Orchestrator.
I have a case where i need to combine steaming and batch jobs like this:
This would require that i can pass a dataframe to the Orchestrator:
class Orchestrator:
extractor: Union[Extractor, DelegatingExtractor]
transformer: Union[Transformer, DelegatingTransformer]
loader: Union[Loader, DelegatingLoader]
@abstractmethod
def execute(self, df: DataFrame) -> DataFrame:
pass
What you dont want to have a name. See also issue #118
Extend or create class for loading table to delivery SQL
Since fullloading is very time consuming and often expensive, it is neccesary to introduce Autoloader.
In the simple SQL server transformer, if the format is datetime, it should have. Else the conversion between spark and Azure sql timestamp format will not work properly.
f.date_trunc("second", f.col......))))
It will be an issue to use multiple generic extractors in elt orchestrators.
Orchestration
.extract_from(ReadDataFromDeltaTable(name=Table1))
.extract_from(ReadDataFromDeltaTable(name=Table2))
.transform_with(EventTransformer())
.load_into(WriteDataToDeltaTable(name=Table3))
.build()
This will create a dataset like this:
{
"ReadDataFromDeltaTable": df,
"ReadDataFromDeltaTable": df
}
Acceptance criteria: Nice documentation for how to use the cosmos class.
We need a full integration test pipeline with the following properties:
.github
folder to limit access to changes.First step is probably to get a service principal and a suitable subscription to do this with.
Idea: the orchestrator holds a dict and gives a reference to each step that can be modified and added to.
Since fullloading is very time consuming and often expensive, it is neccesary to introduce upsert logic.
In order to improve performance when writing to SQL server consider this.
See this run: https://github.com/atc-net/atc-dataplatform/actions/runs/3005506556
HTTPError: 400 Bad Request from https://upload.pypi.org/legacy/
File already exists. See https://pypi.org/help/#file-name-reuse for
more information.
The table configurator introduced in #77 is missing documentation. Please fix.
It should be possible to connect SqlServer with a service principal. It requires some adjustments in the SqlServer class. Also, to test it out, the deployment pipelines should be able to create a db user from the AD. This requires adjustment in the rights given to the SQL server instance and maybe the deploying SPN needs to be admin + the connection to the Invoke-Sql should use accestokens.
https://www.thedataswamp.com/blog/databricks-connect-to-azure-sql-with-service-principal
What if someone want to create another field in the .yml files?
Is this possible? SHOULD it even be possible?
Something:
name:
partitioning:
customfield:
See also issue #123
F.ex. in the case of a cosmos. The deployed library needs to know what the database is called, needs to know its endpoint and its key. The developer has to select where to store the key, the endpoint, how to pass the two between the deployment pipeline and the deployed library.
suggestion:
Result:
Configure your cosmos once in your library, and deploy, connect and use it directly.
A standard orchestrator can be written that takes as input
The orchestrator will incrementally extract the capture files and unpack the json and load to the delta table.
Create sql executor which can be used for both delta tables and Azure SQL database
Consider using a DacPac deployment of the SQL database.
In the same deployment pipeline an idea is:
If not the tables are controlled by dacpac, at least the creation of schemas/users could.
We intend to drop support for versions older than the latest LTS version in databricks. This mean dropping support for python 3.7 and spark version 7.3.x-scala2.1
A comment period has hereby started to give users the possibility to object. Unless substantial objections are received, we intend to end supporting these versions with the end of march 2022.
A new major version will be set to mark this change.
The sql executor splits the sql file into mutiple sql queries splitted by semicolon.
Why is this neccesary?
When using the semicolon seperator, I think the executor can just run the entire sql file without sql_statement.split(";").
In some circumstances we want to only send data to external systems that is
This can be achieved in a general way by hashing the rows, and comparing the hash to the last transmitted hash, stored in a cache table.
Further, the following functions can be achieved:
The life-cycle of a table is such that it is produced in a long-running computation once its logic has been developed. Once the table is produced, it gets maintained with incremental updates in periodic intervals. Sometimes, the transformations that produce a set of tables change. The change of the transforming logic can be controlled with python library versions. The subject of this github issue is library support for the versioning of the underlying data product.
Data products (tables) are sometimes used by users in other departments, who may be disconnected from the discussions about possible wipe-and-rebuild plans. Even worse, some users may not be able to accept data product down-times. If such cases apply for data products that take a long time to rebuild, data product versioning may be a solution.
This story describes how table versioning may solve the issue:
Consider using docstrings in code base: https://peps.python.org/pep-0257/
What we want:
tmp
locationAdd methods to the cosmos class to delte rows using the cosmos api. It is not possible with the sql-api.
We want a quick and easy way to create test dataframes
Since DeltaHandle now has a upsert method, the upsertloader should be removed. It is redundant and will require extra maintenance.
The upsertloader should take a tablehandle.
Then it should execute the .upsert() function
This is a proposal to significantly speed up the PR pipelines. It involves the following steps:
Advantages:
Disadvantages (compared to current implementation of non-concurrent runs):
I am working on a proof of concept for automatic diagram creation using the diagrams package to automatically useful (if not beautiful) diagrams directly from source code. I am not ready to share the (very rough) code yet, but I am looking looking for feedback on features.
The ide is that each transformation can define its inputs and outputs independently. The library then takes the responsibility to connect the stages to a larger flow diagram. Here is a minimal use-case:
File tables.py
:
from atc.doc.diagram import Table
orders = Table("orders")
guitars = Table("guitars")
products = Table("products")
customers = Table("customers")
File GuitarOrchestrator.py
:
from atc.doc import diagram
import tables
@diagram.input(tables.guitars)
@diagram.output(tables.products)
class GuitarOrchestrator:
pass
File CustomerOrchestrator.py
:
from atc.doc import diagram
import tables
@diagram.input(tables.products)
@diagram.input(tables.customers)
@diagram.output(tables.orders)
class CustomerOrchestrator:
pass
The library is then able to parse this and create the following combined diagram:
Of course, the individual diagrams can also be created, and the idea is that one call with default arguments will create each sub-diagram as well as the combined diagram. Example:
These two diagrams were created by my POC implementation which is not ready for a PR, yet.
My plans for future work are these:
inputs
, or perhaps for free using source file folder structure)Please give me some feedback on this.
Using Bicep we can define the infrastructure we want to deploy.
Explained simple; we just tell Azure what we need, and the ressource manager checks that it is possible, and we get it.
https://docs.microsoft.com/en-us/azure/azure-resource-manager/bicep/overview?tabs=bicep
pros:
concise syntax
reliable type safety
support for code reuse
(Should be faster)
After executing sql statement, close the pyodbc connection
Hi @JeppeBlixen and @christianhelle, the PR 54 that you merged had a non-compatible renaming in it:
MultipleExtractOrchestrator
became MultipleExtractorOrchestrator
. This broke our pipelines and our production system. Please reserve breaking changes and renaming for major version updates. I will now need to implement more careful dependency version control in my system. So actually thank you for bringing this to my attention :)
/rant
You can close the issue.
Use pyodbc with retries
In order to use the upsertloader across Delta and Azure SQL, the upsertloader should use ThHandle class.
It already does.
But, before that, we need to have a new "merge" method added to both DeltaHandle and SqlHandle.
We introduce a upsert method to DeltaHandle instead
Introducing this merging, could actually make UpsertLoader redundant - and could might be deprecated.
We need a pipline for unit testing all functions in atc including
We therefore need a complete deployment of databricks in azure, ideally set up using code from atc-snippets.
We only need one environment since we are not going to use it in production.
The environment should contain an event-hub, and a storage accoount so that we can fully test streaming and external tables.
June 3 an updated was made which corresponded to the newest dbx update (#102). This update has been reverted - so we need to revert too.
When making unittests it would be beneficial to get an assert function to compare two dataframe and assess whether they are the same.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.