Giter Site home page Giter Site logo

zypp-io / df_to_azure Goto Github PK

View Code? Open in Web Editor NEW
1.0 4.0 2.0 291 KB

Repository for Automatically creating pipelines with copy Activity from blob to SQL. The main functionality is for fast uploading pandas DataFrames to a SQL database, using Azure Data Factory.

License: Apache License 2.0

Python 100.00%
bulk-insert blob-storage azure sql incremental dataframe upsert sql-table

df_to_azure's Introduction

logo


Downloads Open Source Code style: black PyPI Latest release

DF to Azure

Python module for fast upload of pandas DataFrame to Azure SQL Database using automatic created pipelines in Azure Data Factory.

Introduction

The purpose of this project is to upload large datasets using Azure Data Factory combined with an Azure SQL Server. In steps the following process kicks off:

1. The data will be uploaded as a .csv file to Azure Blob storage.
2. A SQL table is prepared based on pandas DataFrame types, which will be converted to the corresponding SQLAlchemy types.
3. A pipeline is created in datafactory for uploading the .csv from the Blob storage into the SQL table.
4. The pipeline is triggered, so that the .csv file is bulk inserted into the SQL table.

How it works

Based on the following attributes, it is possible to bulk insert your dataframe into the SQL Database:

from df_to_azure import df_to_azure

df_to_azure(df=df, tablename="table_name", schema="schema", method="create")
  1. df: dataframe you wish to export
  2. tablename: desired name of the table
  3. schema: desired sql schema
  4. method: option for "create" "append" or "upsert"
  5. id_field: id field of the table. Necessary if method is set to "upsert"

Important: the csv's are uploaded to a container called dftoazure, so create this in your storage account before using this module.

Upsert / create or append

It is possible to upsert the SQL table with (new) records, if present in the dataframe you want to upload. Based on the id_field, the SQL table is being checked on overlapping values. If there are new records, the "old" records will be updated in the SQL table. The new records will be uploaded and appended to the current SQL table.

Settings

To use this module, you need to add the azure subscriptions settings and azure data factory settings to your environment variables. We recommend to work with .env files (or even better, automatically load them with Azure Keyvault) and load them in during runtime. But this is optional and they can be set as system variables as well. Use the following template when using .env

Parquet

Since version 0.6.0, functionality for uploading dataframe to parquet is supported. simply add argument parquet=True to upload the dataframe to the Azure storage container parquet. The arguments tablename and schema will be used to create a folder structure. if parquet is set to True, the dataset will not be uploaded to a SQL database.

# --- ADF SETTINGS ---

# data factory settings
rg_name : ""
rg_location: "westeurope"
df_name : ""

# blob settings
ls_blob_account_name : ""
ls_blob_container_name : ""
ls_blob_account_key : ""

# SQL settings
SQL_SERVER: ""
SQL_DB: ""
SQL_USER: ""
SQL_PW: ""

# --- AZURE SETTINGS ---
# azure credentials for connecting to azure subscription.
client_id : ""
secret : ""
tenant : ""
subscription_id : ""

Maintained by Zypp:

Support:

For support on using this module, you can reach us at [email protected]


Testing

To run the test suite, use:

pytest df_to_azure

To run pytest for a single test:

pytest df_to_azure/tests/test_df_to_azure.py::test_duplicate_keys_upsert

df_to_azure's People

Contributors

erfannariman avatar jrnkng avatar melvinfolkers avatar timvdheijden avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar

df_to_azure's Issues

BUG: Number of characters is different between the Copy action CSV and dataframe

When writing a dataframe to a SQL database it gives an error in ADF because of max number of characters exceeded (Operation on target Copy all_boards to SQL failed: Failure happened on 'Sink' side. ErrorCode=SqlBulkCopyInvalidColumnLength,.... This was the case with a column with a string version of a list of dicts.
The max characters in Python were 1870, so the schema definition in the database became varchar(1870), in the CSV the max length of values was 1870 as wel after checking as a txt file and after loading it again in Python.

However in ADF it is causing the error during the copy action. It does not give the error if we manually enter the text_length parameter in df_to_azure() as 1873

BUG: error for wrong method

We have to add an exception in case a method is used that is not supported by this package:

for instance this method does not exist

method="insert"

instead of throwing an error, it appends the dataset to the sql database.

BUG: invalid characters in resource name

output from console

azure.core.exceptions.HttpResponseError: The specifed resource name contains invalid characters.
RequestId:1fe04a1b-501e-0019-2190-221067000000
Time:2022-02-15T17:20:16.7288425Z
ErrorCode:InvalidResourceName

output from pip list, regarding azure packages:

azure-common           1.1.26
azure-core             1.11.0
azure-identity         1.5.0
azure-keyvault-secrets 4.2.0
azure-mgmt-core        1.2.2
azure-mgmt-datafactory 1.0.0
azure-mgmt-resource    15.0.0
azure-storage-blob     12.7.1

ENH: Export to parquet instead of .csv

in df_to_azure/export.py on line 200 we export the file to a .csv file:

data = self.df.to_csv(index=False, sep="^", quotechar='"', lineterminator="\n")

We should change this to .parquet for optimization. It requires a changes in the Linked Services in adf.py on line 173 :

image

[ENH] create first version

There are already some scripts created in another repository. The plan is to copy these scripts and adjust them here and there

ENH: create container

add functionality for creating a container if it does not exist. The only problem is that you need the account_key for the container.

BUG: container name does not exist

Problem: When the container "dftoazure" does not exist, df_to_azure exits with an ContainerNotFound error.

Code from adf.py:

def create_blob_container():
    blob_service_client = create_blob_service_client()
    try:
        blob_service_client.create_container("dftoazure")
    except:
        logging.info("CreateContainerError: Container already exists.")

suggested solution:

Check in azure if the container "dftoazure" is present. if not, create the container.

blob_service_client = create_blob_service_client()
containers = [x.get("name") for x in blob_service_client.list_containers()]
if "dftoazure" not in containers:
    blob_service_client.create_container(name=container_name)      

BUG: add categorical dtype for type conversion

When there's a Categorical dtype column present in the dataframe, it will error on the type conversion. It needs to be added to the dictionary with type:

type_conversion = {
dtype("O"): string,
StringDtype(): string,
dtype("int64"): Integer(),
dtype("int32"): Integer(),
dtype("int16"): Integer(),
dtype("int8"): Integer(),
Int64Dtype(): Integer(),
dtype("float64"): numeric,
dtype("float32"): numeric,
dtype("float16"): numeric,
dtype("<M8[ns]"): DateTime(),
dtype("bool"): Boolean(),
BooleanDtype(): Boolean(),
}
col_types = {col_name: type_conversion[col_type] for col_name, col_type in self.df.dtypes.to_dict().items()}

ENH: add upsert method for parquet

Now that we have parquet support, we should add logic to do upsert on parquet files.

In general we this will be DataFrame.combine_first, we probably need some good testing for this, how will this deal with NaN for example and datetime indices:

df1 = pd.DataFrame(
    {
        "id": ["A", "B", "C"],
        "val1": [10, 20, 30],
        "val2": [40, 50, 60]
    }
)

df2 = pd.DataFrame(
    {
        "id": ["A", "B", "C", "D"],
        "val1": [15, 20, 30, 35],
        "val2": [40, 52, 60, 70]
    }
)

print(df1, "\n")
print(df2, "\n")
print(df2.set_index("id").combine_first(df1.set_index("id")).reset_index())

  id  val1  val2
0  A    10    40
1  B    20    50
2  C    30    60 

  id  val1  val2
0  A    15    40
1  B    20    52
2  C    30    60
3  D    35    70 

  id  val1  val2
0  A    15    40
1  B    20    52
2  C    30    60
3  D    35    70

ENH: add method to upload dataframe as parquet file storage

The main functionality now is to upload a pandas dataframe to a sql database through datafactory. In many cases we make use of parquet files instead of a database.

The proposal is to add a method to export a dataframe to parquet and upload it to azure storage. When method="append" is used, we need to download the parquet file and append the newest data.

df_to_azure(df=df, tablename="table_name", folder="schema_name", container="containername", method="create", parquet=True)

ENH: move auth azure out of module

Nu wordt de database connectie gemaakt in de module, daarmee moeten de users nog meer environment variabele definiëren voor de module. Ik denk dat het beter zou zijn als we connectie als argument accepteren in onze functies die met de database te maken hebben en dat stuk dus buiten deze module laten.

@melvinfolkers wat denk jij?

[FIX] fix relative import

In order to use this repository as a submodule, changes have to be made to the import statements.
Currently scripts get imported as follow:

from src.functions import my_function

in order for it to work in a submodule, the references have to be changed to:

from .functions import my_function

Documentation method and id_field

Your documentation states that id_field only should be used with upsert. The example on your page shows id_field with the create method:

from df_to_azure import df_to_azure

df_to_azure(df=df, tablename="table_name", schema="schema", method="create", id_field="col_a")

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.