Giter Site home page Giter Site logo

atc-net / atc-dataplatform Goto Github PK

View Code? Open in Web Editor NEW
8.0 8.0 3.0 1.84 MB

A common set of python libraries for DataBricks

Home Page: https://atc-net.github.io/repository/atc-dataplatform

License: MIT License

Python 99.81% Jinja 0.19%
databricks

atc-dataplatform's People

Contributors

andersbjernaa avatar christianhelle avatar davidkallesen avatar farbo avatar frederikgjensen avatar gnk-delegate avatar gustavnk avatar jeppeblixen avatar jeppedg avatar lajdelegate avatar lasseaj avatar laujohansson avatar martinboge avatar mrmasterplan avatar okami1 avatar perkops avatar radekbuczkowski avatar tbtdg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

atc-dataplatform's Issues

Create merge_df_into with custom AND argument

In the moment, the atc.transformations.merge_df_into_target uses when matched update * when not matched insert *. We should consider make a function for adding custom WHEN MATCHED AND .... AND.... OR and WHEN NOT MATCHED AND...AND.

Appinsight reporting

Set up a class that takes as its one parameter the instrumentation key of an azure app insight.

https://docs.microsoft.com/en-us/azure/azure-monitor/app/opencensus-python

You can use the snippets from here to set up the app-insights and get that key:

https://github.com/atc-net/atc-snippets/tree/main/azure-cli/monitor

The new class, maybe named AtcInsightsReporter, should take any arguments that are serializable, to json, and add them as properties to a log entry that is sent to the appinsights. Importantly, the class should use the job introspection available in atc-dataplatform to also add a link to the correct workspace and job-run, so that one can find the way back to the line that caused the log.

The class should be usable as a context manager, where it catches all exceptions and reports them to log analytics.
This will allow for developer attention, while also letting the job run to successful completion.

Cosmos handle

Create a handle class for cosmos. It should inherit ThHandle, and therefor conform to the methods of DbHandle and SqlHandle.

Dataframe Assertions

To assist unittest we want to be able to assert on the contents of dataframes.
Assertions may concern

  • the column names
  • the full schema (column names and types)
  • the data contents of some, or all columns
  • smart matching of some types
    • timestamps
      • handling of timeszones
      • approximate matching
  • ordered vs unordered matching

Cleanup SPN in deployment

There is the following SPN's:

$mountSpnName                 = "AtcMountSpn"
$dbDeploySpnName              = "AtcDbSpn"
$cicdSpnName                  = "AtcGithubPipe"

Do we need them all?

Dataframe input to Orchestrator

Having the possiblity of inputs on the etl Orchestrator.

I have a case where i need to combine steaming and batch jobs like this:
image

This would require that i can pass a dataframe to the Orchestrator:

class Orchestrator:
    extractor: Union[Extractor, DelegatingExtractor]
    transformer: Union[Transformer, DelegatingTransformer]
    loader: Union[Loader, DelegatingLoader]

    @abstractmethod
    def execute(self, df: DataFrame) -> DataFrame:
        pass

AutoLoader framework

Since fullloading is very time consuming and often expensive, it is neccesary to introduce Autoloader.

Date truncate datetime columns to SQL server

In the simple SQL server transformer, if the format is datetime, it should have. Else the conversion between spark and Azure sql timestamp format will not work properly.

f.date_trunc("second", f.col......))))

Using multiple instances of same generic extractors in elt orchestrator

It will be an issue to use multiple generic extractors in elt orchestrators.

Orchestration
.extract_from(ReadDataFromDeltaTable(name=Table1))
.extract_from(ReadDataFromDeltaTable(name=Table2))
.transform_with(EventTransformer())
.load_into(WriteDataToDeltaTable(name=Table3))
.build()

This will create a dataset like this:
{
"ReadDataFromDeltaTable": df,
"ReadDataFromDeltaTable": df
}

Integration Pipeline

We need a full integration test pipeline with the following properties:

  1. will not run automatically on PR until a reviewer has approved. It is the reviewer's responsibility to check for malicious code.
  2. the pipeline shall have access to a service principal in azure with the necessary rights for all the following steps.
  3. the pipeline shall be based on a deployment script controlled in the .github folder to limit access to changes.
  4. the pipeline shall do the following steps (once approved):
    i. deploy the full integration environment including resource group, storage account, event hubs, sql server, databricks workspace
    ii. set up all keys and secrets so that databricks can interact with all of these
    iii. set up the mounts in databricks so that it is possible to access eventhub capture files
    iv. run all integration tests in an on-cluster test pipeline, return and collect all necessary test results.
    v. tear down and delete the entire deployment so that it does not run up costs when it is not used.

Concerns

  • The pipeline code has access to a lot of keys. This is addressed with the approval step in the pipeline.
  • Running costs. Should be kept low by the fact that we delete the environment every time.

First step is probably to get a service principal and a suitable subscription to do this with.

orchestration parameters

Idea: the orchestrator holds a dict and gives a reference to each step that can be modified and added to.

Upsert loader

Since fullloading is very time consuming and often expensive, it is neccesary to introduce upsert logic.

Connect SQL server with service principal

It should be possible to connect SqlServer with a service principal. It requires some adjustments in the SqlServer class. Also, to test it out, the deployment pipelines should be able to create a db user from the AD. This requires adjustment in the rights given to the SQL server instance and maybe the deploying SPN needs to be admin + the connection to the Invoke-Sql should use accestokens.

https://www.thedataswamp.com/blog/databricks-connect-to-azure-sql-with-service-principal

custom yml format?

What if someone want to create another field in the .yml files?

Is this possible? SHOULD it even be possible?

Something:
name:
partitioning:
customfield:

See also issue #123

Feature suggestion: Integrate library configuration with deployment of secrets values.

F.ex. in the case of a cosmos. The deployed library needs to know what the database is called, needs to know its endpoint and its key. The developer has to select where to store the key, the endpoint, how to pass the two between the deployment pipeline and the deployed library.

suggestion:

  • Add a configuration the CosmosDb item that states its parent resource group, and a default generated name and scope for getting the key and endpoint.
  • Add a feature to atc-dataplatform-tools that reads this same information form your local library and makes the necessary azyre calls to get the values and afterwards makes the calls to databricks secrets to set the values.

Result:
Configure your cosmos once in your library, and deploy, connect and use it directly.

Incremental EventHub json to delta orchestrator

A standard orchestrator can be written that takes as input

  1. a reference to an eventHub capture file location(see og to set up EventHubCapture class)
  2. a reference to a delta table that has certain properties:
    • column EnqueuedTimeUtc as TIMESTAMP
    • column pdate as TIMESTAMP
    • all other columns are expected to be appear as part of the json string that encodes the body of the eventHub event.
    • partitioned by pdate

The orchestrator will incrementally extract the capture files and unpack the json and load to the delta table.

DacPac SQL deployment

Consider using a DacPac deployment of the SQL database.

In the same deployment pipeline an idea is:

  • Let table configurator alter .sql files
  • use dacpac deployment before python deployment

If not the tables are controlled by dacpac, at least the creation of schemas/users could.

Drop support from python3.7 and spark 7.3

We intend to drop support for versions older than the latest LTS version in databricks. This mean dropping support for python 3.7 and spark version 7.3.x-scala2.1

A comment period has hereby started to give users the possibility to object. Unless substantial objections are received, we intend to end supporting these versions with the end of march 2022.

A new major version will be set to mark this change.

SqlExecutor split on semicolon - remove?

The sql executor splits the sql file into mutiple sql queries splitted by semicolon.

Why is this neccesary?

When using the semicolon seperator, I think the executor can just run the entire sql file without sql_statement.split(";").

Cached Loading

In some circumstances we want to only send data to external systems that is

  • new
  • changed
  • or too old (refresh mechanism)

This can be achieved in a general way by hashing the rows, and comparing the hash to the last transmitted hash, stored in a cache table.
Further, the following functions can be achieved:

  • refresh of rows from a certain age
    • soft refresh (respecting a rate limit)
    • hard refresh (rows in danger of timing out wrt TTL)
  • Concurrent runs not sending the same data can be implemented by actively using table versions
  • transmit batch tracking to invalidate the rows that failed later in a transmission pipeline

Table branching support

Use-case

The life-cycle of a table is such that it is produced in a long-running computation once its logic has been developed. Once the table is produced, it gets maintained with incremental updates in periodic intervals. Sometimes, the transformations that produce a set of tables change. The change of the transforming logic can be controlled with python library versions. The subject of this github issue is library support for the versioning of the underlying data product.

Data products (tables) are sometimes used by users in other departments, who may be disconnected from the discussions about possible wipe-and-rebuild plans. Even worse, some users may not be able to accept data product down-times. If such cases apply for data products that take a long time to rebuild, data product versioning may be a solution.

This story describes how table versioning may solve the issue:

  • A table is produced by transformation v1, the table is called version alpha
  • after full-loading, transformation v1 is deployed in a daily job to incrementally update version alpha
  • developer Amy needs to change the transformation to a v2 in such a way that a rebuilding of the table will be necessary. rebuilding takes three days
  • the table is in use in version alpha, and business depends on acces to the table and on the table being up-to-date every day.
  • the daily job is tagged with v1, like the transformation that it executes. Job v1 is not stopped and updates table alpha.
  • Meanwhile, Amy deploys job v2 side-by-side with job v1. Job v2 runs a full-load of table version beta.
  • After three days job v2 is done and now runs daily to maintain the state of table version beta.
  • Amy verifies that table version beta meets the quality requirements.
  • Amy now directs the users of the table to use version beta as the new standard version.
  • Amy now stops and deletes job v1.
  • After a few days the users of the data product are fully satisfied that table version beta meets their needs and that the daily update succeeds. Now Amy may now choose to delete table version alpha which is no longer needed.

TableManager class

What we want:

  • a singleton class
  • that know all tables, including
    • their DB
    • name
    • path
    • type
    • schema
    • partitioning
  • can be configured with SQL statements
  • can be configured with pypark statements
  • can automatically modify the path and name for release and debug usees
    • in release situation, the name and path are 'plain' i.e. close to the configured string, (minus markup)
    • in debug situations
      • the path changes to a tmp location
      • the name becomes something unique to allow parallel test runs
  • can assert and validate the deployed properties and schemas
  • can clean up and delete test tables

Comos delete

Add methods to the cosmos class to delte rows using the cosmos api. It is not possible with the sql-api.

Test dataframe creation

We want a quick and easy way to create test dataframes

  • schema specified from the table manager
  • specify data for some columns
  • maybe auto generate data based on random seed

Change UpsertLoader to use tablehandle

Since DeltaHandle now has a upsert method, the upsertloader should be removed. It is redundant and will require extra maintenance.

The upsertloader should take a tablehandle.
Then it should execute the .upsert() function

Faster, parallel pipelines

This is a proposal to significantly speed up the PR pipelines. It involves the following steps:

  1. We need a system of distributed semaphores. That means, a shared state where different users of the azure resources can register their desire to use the resources.
  • A user in this context is one execution of the PR pipeline on github, or a local developer anywhere
  • I imagine we use resource tags for this where each user adds a tag with the following key-value pair
    • key: "user-{my_unique_user_string}"
      • for a human, the user string should be a meaningful name
      • for a github pipeline, it would be useful to be able to give the identity of the executing pipeline
    • value: the timestamp in iso-format for the last expected use time.
      • no more than 60 minutes should be reserved here. It is possible to update the value.
  1. The deployment pipeline shall use the identity string at run-time to set up unique versions of resources that cannot be shared, so as to enable parallel execution.
    • cosmos container
    • sql tables
    • delta tables
  2. An azure function will regularly (every 30 mins) wake up and check if anyone is still using the resources (has a registered claim that has not timed out). If nobody is using the resources. The function will delete the resource group.

Advantages:

  • The step of freeing the resources is no longer executed inside the github pipleine, making it faster.
  • Resource creation for each new run is skipped entirely if the next run starts soon after the last or if the developer manually puts in a claim to keep the resources alive.

Disadvantages (compared to current implementation of non-concurrent runs):

  • Increased cost due to parallel runs.
  • Increased test pipeline complexity
  • Fundamental changes to the way that resources are deployed need to be tested on a different resource base-name to not conflict with concurrent test runs.

Automatic diagrams

I am working on a proof of concept for automatic diagram creation using the diagrams package to automatically useful (if not beautiful) diagrams directly from source code. I am not ready to share the (very rough) code yet, but I am looking looking for feedback on features.

The ide is that each transformation can define its inputs and outputs independently. The library then takes the responsibility to connect the stages to a larger flow diagram. Here is a minimal use-case:

File tables.py:

from atc.doc.diagram import Table

orders = Table("orders")
guitars = Table("guitars")
products = Table("products")
customers = Table("customers")

File GuitarOrchestrator.py:

from atc.doc import diagram
import tables

@diagram.input(tables.guitars)
@diagram.output(tables.products)
class GuitarOrchestrator:
    pass

File CustomerOrchestrator.py:

from atc.doc import diagram
import tables

@diagram.input(tables.products)
@diagram.input(tables.customers)
@diagram.output(tables.orders)
class CustomerOrchestrator:
    pass

The library is then able to parse this and create the following combined diagram:
diagram

Of course, the individual diagrams can also be created, and the idea is that one call with default arguments will create each sub-diagram as well as the combined diagram. Example:
customerorchestrator

These two diagrams were created by my POC implementation which is not ready for a PR, yet.

My plans for future work are these:

  • Support for clustering (e.g. bronze layer box)
  • Support for nested sub diagrams (e.g. each stage in the order flow separate from every stage in the inventory flow) (probably with some extra markup like for inputs, or perhaps for free using source file folder structure)

Please give me some feedback on this.

PR #54 broke my pipeline

Hi @JeppeBlixen and @christianhelle, the PR 54 that you merged had a non-compatible renaming in it:
MultipleExtractOrchestrator became MultipleExtractorOrchestrator. This broke our pipelines and our production system. Please reserve breaking changes and renaming for major version updates. I will now need to implement more careful dependency version control in my system. So actually thank you for bringing this to my attention :)

/rant

You can close the issue.

Upsert method for DeltaHandle

In order to use the upsertloader across Delta and Azure SQL, the upsertloader should use ThHandle class.
It already does.

But, before that, we need to have a new "merge" method added to both DeltaHandle and SqlHandle.
We introduce a upsert method to DeltaHandle instead

Introducing this merging, could actually make UpsertLoader redundant - and could might be deprecated.

Databricks test pipeline

We need a pipline for unit testing all functions in atc including

  • features that need a databricks cluster
  • features not supported by databricks-connnect.

We therefore need a complete deployment of databricks in azure, ideally set up using code from atc-snippets.
We only need one environment since we are not going to use it in production.
The environment should contain an event-hub, and a storage accoount so that we can fully test streaming and external tables.

assert is data frame equal

When making unittests it would be beneficial to get an assert function to compare two dataframe and assess whether they are the same.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.