Giter Site home page Giter Site logo

canimus / cuallee Goto Github PK

View Code? Open in Web Editor NEW
142.0 7.0 18.0 2.08 MB

Possibly the fastest DataFrame-agnostic quality check library in town.

Home Page: https://canimus.github.io/cuallee/

License: Apache License 2.0

Python 97.64% Makefile 0.23% Dockerfile 0.34% Shell 0.16% TeX 1.63%
bigdata performance-metrics pyspark python3 unit-testing pydeequ pandas snowpark dataquality data-quality

cuallee's Introduction

cuallee

PyPI version ci codecov License status DOI

Meaning good in Aztec (Nahuatl), pronounced: QUAL-E

This library provides an intuitive API to describe checks initially just for PySpark dataframes v3.3.0. And extended to pandas, snowpark, duckdb, daft and more. It is a replacement written in pure python of the pydeequ framework.

I gave up in deequ as after extensive use, the API is not user-friendly, the Python Callback servers produce additional costs in our compute clusters, and the lack of support to the newest version of PySpark.

As result cuallee was born

This implementation goes in hand with the latest API from PySpark and uses the Observation API to collect metrics at the lower cost of computation. When benchmarking against pydeequ, cuallee uses circa <3k java classes underneath and remarkably less memory.

Support

cuallee is the data quality framework truly dataframe agnostic.

Provider API Versions
snowflake snowpark 1.11.1, 1.4.0
databricks pyspark & spark-connect 3.5.x, 3.4.0, 3.3.x, 3.2.x
bigquery bigquery 3.4.1
pandas pandas 2.0.2, 1.5.x, 1.4.x
duckdb duckdb 1.0.0, 0.10.2,0.9.2,0.8.0
polars polars 1.0.0, 0.19.6
daft daft 0.2.24, 0.2.19

Logos are trademarks of their own brands.

Install

pip install cuallee

Checks

The most common checks for data integrity validations are completeness and uniqueness an example of this dimensions shown below:

from cuallee import Check, CheckLevel # WARN:0, ERR: 1

# Nulls on column Id
check = Check(CheckLevel.WARNING, "Completeness")
(
    check
    .is_complete("id")
    .is_unique("id")
    .validate(df)
).show() # Returns a pyspark.sql.DataFrame

Dates

Perhaps one of the most useful features of cuallee is its extensive number of checks for Date and Timestamp values. Including, validation of ranges, set operations like inclusion, or even a verification that confirms continuity on dates using the is_daily check function.

# Unique values on id
check = Check(CheckLevel.WARNING, "CheckIsBetweenDates")
df = spark.sql(
    """
    SELECT
        explode(
            sequence(
                to_date('2022-01-01'),
                to_date('2022-01-10'),
                interval 1 day)) as date
    """)
assert (
    check.is_between("date", "2022-01-01", "2022-01-10")
    .validate(df)
    .first()
    .status == "PASS"
)

Membership

Other common test is the validation of list of values as part of the multiple integrity checks required for better quality data.

df = spark.createDataFrame([[1, 10], [2, 15], [3, 17]], ["ID", "value"])
check = Check(CheckLevel.WARNING, "is_contained_in_number_test")
check.is_contained_in("value", (10, 15, 20, 25)).validate(df)

Regular Expressions

When it comes to the flexibility of matching, regular expressions are always to the rescue. cuallee makes use of the regular expressions to validate that fields of type String conform to specific patterns.

df = spark.createDataFrame([[1, "is_blue"], [2, "has_hat"], [3, "is_smart"]], ["ID", "desc"])
check = Check(CheckLevel.WARNING, "has_pattern_test")
check.has_pattern("desc", r"^is.*t$") # only match is_smart 33% of rows.
check.validate(df).first().status == "FAIL"

Anomalies

Statistical tests are a great aid for verifying anomalies on data. Here an example that shows that will PASS only when 40% of data is inside the interquartile range

df = spark.range(10)
check = Check(CheckLevel.WARNING, "IQR_Test")
check.is_inside_interquartile_range("id", pct=0.4)
check.validate(df).first().status == "PASS"

+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|id |timestamp          |check|level  |column|rule                         |value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|1  |2022-10-19 00:09:39|IQR  |WARNING|id    |is_inside_interquartile_range|10000|10  |4         |0.6      |0.4           |PASS  |
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+

Workflows (Process Mining)

Besides the common citizen-like checks, cuallee offers out-of-the-box real-life checks. For example, suppose that you are working SalesForce or SAP environment. Very likely your business processes will be driven by a lifecycle:

  • Order-To-Cash
  • Request-To-Pay
  • Inventory-Logistics-Delivery
  • Others. In this scenario, cuallee offers the ability that the sequence of events registered over time, are according to a sequence of events, like the example below:
import pyspark.sql.functions as F
from cuallee import Check, CheckLevel

data = pd.DataFrame({
   "name":["herminio", "herminio", "virginie", "virginie"],
   "event":["new","active", "new", "active"],
   "date": ["2022-01-01", "2022-01-02", "2022-01-03", "2022-02-04"]}
   )
df = spark.createDataFrame(data).withColumn("date", F.to_date("date"))

# Cuallee Process Mining
# Testing that all edges on workflows
check = Check(CheckLevel.WARNING, "WorkflowViolations")

# Validate that 50% of data goes from new => active
check.has_workflow("name", "event", "date", [("new", "active")], pct=0.5)
check.validate(df).show(truncate=False)

+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|id |timestamp          |check             |level  |column                   |rule        |value               |rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|1  |2022-11-07 23:08:50|WorkflowViolations|WARNING|('name', 'event', 'date')|has_workflow|(('new', 'active'),)|4   |2.0       |0.5      |0.5           |PASS  |
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+

Controls

[2023-12-28] โœจ New feature! to simplify the entire validation of a dataframe in a particular dimension.

import pandas as pd
from cuallee import Control
df = pd.DataFrame({"X":[1,2,3], "Y": [10,20,30]})
# Checks all columns in dataframe for using is_complete check
Control.completeness(df)

cuallee VS pydeequ

In the test folder there are docker containers with the requirements to match the tests. Also a perftest.py available at the root folder for interests.

# 1000 rules / # of seconds

cuallee: โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡ 162.00
pydeequ: โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡โ–‡ 322.00

Catalogue

Check Description DataType
is_complete Zero nulls agnostic
is_unique Zero duplicates agnostic
is_primary_key Zero duplicates agnostic
are_complete Zero nulls on group of columns agnostic
are_unique Composite primary key check agnostic
is_composite_key Zero duplicates on multiple columns agnostic
is_greater_than col > x numeric
is_positive col > 0 numeric
is_negative col < 0 numeric
is_greater_or_equal_than col >= x numeric
is_less_than col < x numeric
is_less_or_equal_than col <= x numeric
is_equal_than col == x numeric
is_contained_in col in [a, b, c, ...] agnostic
is_in Alias of is_contained_in agnostic
not_contained_in col not in [a, b, c, ...] agnostic
not_in Alias of not_contained_in agnostic
is_between a <= col <= b numeric, date
has_pattern Matching a pattern defined as a regex string
is_legit String not null & not empty ^\S$ string
has_min min(col) == x numeric
has_max max(col) == x numeric
has_std ฯƒ(col) == x numeric
has_mean ฮผ(col) == x numeric
has_sum ฮฃ(col) == x numeric
has_percentile %(col) == x numeric
has_cardinality count(distinct(col)) == x agnostic
has_infogain count(distinct(col)) > 1 agnostic
has_max_by A utilitary predicate for max(col_a) == x for max(col_b) agnostic
has_min_by A utilitary predicate for min(col_a) == x for min(col_b) agnostic
has_correlation Finds correlation between 0..1 on corr(col_a, col_b) numeric
has_entropy Calculates the entropy of a column entropy(col) == x for classification problems numeric
is_inside_interquartile_range Verifies column values reside inside limits of interquartile range Q1 <= col <= Q3 used on anomalies. numeric
is_in_millions col >= 1e6 numeric
is_in_billions col >= 1e9 numeric
is_t_minus_1 For date fields confirms 1 day ago t-1 date
is_t_minus_2 For date fields confirms 2 days ago t-2 date
is_t_minus_3 For date fields confirms 3 days ago t-3 date
is_t_minus_n For date fields confirms n days ago t-n date
is_today For date fields confirms day is current date t-0 date
is_yesterday For date fields confirms 1 day ago t-1 date
is_on_weekday For date fields confirms day is between Mon-Fri date
is_on_weekend For date fields confirms day is between Sat-Sun date
is_on_monday For date fields confirms day is Mon date
is_on_tuesday For date fields confirms day is Tue date
is_on_wednesday For date fields confirms day is Wed date
is_on_thursday For date fields confirms day is Thu date
is_on_friday For date fields confirms day is Fri date
is_on_saturday For date fields confirms day is Sat date
is_on_sunday For date fields confirms day is Sun date
is_on_schedule For date fields confirms time windows i.e. 9:00 - 17:00 timestamp
is_daily Can verify daily continuity on date fields by default. [2,3,4,5,6] which represents Mon-Fri in PySpark. However new schedules can be used for custom date continuity date
has_workflow Adjacency matrix validation on 3-column graph, based on group, event, order columns. agnostic
is_custom User-defined custom function applied to dataframe for row-based validation. agnostic
satisfies An open SQL expression builder to construct custom checks agnostic
validate The ultimate transformation of a check with a dataframe input for validation agnostic

Controls pyspark

Check Description DataType
completeness Zero nulls agnostic
information Zero nulls and cardinality > 1 agnostic
intelligence Zero nulls, zero empty strings and cardinality > 1 agnostic
percentage_fill % rows not empty agnostic
percentage_empty % rows empty agnostic

ISO Standard

A new module has been incorporated in cuallee==0.4.0 which allows the verification of International Standard Organization columns in data frames. Simply access the check.iso interface to add the set of checks as shown below.

Check Description DataType
iso_4217 currency compliant ccy string
iso_3166 country compliant country string
df = spark.createDataFrame([[1, "USD"], [2, "MXN"], [3, "CAD"], [4, "EUR"], [5, "CHF"]], ["id", "ccy"])
check = Check(CheckLevel.WARNING, "ISO Compliant")
check.iso.iso_4217("ccy")
check.validate(df).show()
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
| id|          timestamp|        check|  level|column|           rule|               value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
|  1|2023-05-14 18:28:02|ISO Compliant|WARNING|   ccy|is_contained_in|{'BHD', 'CRC', 'M...|   5|       0.0|      1.0|           1.0|  PASS|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+

Snowflake Connection

In order to establish a connection to your SnowFlake account cuallee relies in the following environment variables to be avaialble in your environment:

  • SF_ACCOUNT
  • SF_USER
  • SF_PASSWORD
  • SF_ROLE
  • SF_WAREHOUSE
  • SF_DATABASE
  • SF_SCHEMA

Spark Connect

Just add the environment variable SPARK_REMOTE to your remote session, then cuallee will connect using

spark_connect = SparkSession.builder.remote(os.getenv("SPARK_REMOTE")).getOrCreate()

and convert all checks to select as opposed to Observation API compute instructions.

Databricks Connection

By default cuallee will search for a SparkSession available in the globals so there is literally no need to SparkSession.builder. When working in a local environment it will automatically search for an available session, or start one.

DuckDB

For testing on duckdb simply pass your table name to your check et voilร 

import duckdb
conn = duckdb.connect(":memory:")
check = Check(CheckLevel.WARNING, "DuckDB", table_name="temp/taxi/*.parquet")
check.is_complete("VendorID")
check.is_complete("tpep_pickup_datetime")
check.validate(conn)

   id            timestamp check    level                column         rule value      rows  violations  pass_rate  pass_threshold status
0   1  2022-10-31 23:15:06  test  WARNING              VendorID  is_complete   N/A  19817583         0.0        1.0             1.0   PASS
1   2  2022-10-31 23:15:06  test  WARNING  tpep_pickup_datetime  is_complete   N/A  19817583         0.0        1.0             1.0   PASS

Roadmap

100% data frame agnostic implementation of data quality checks. Define once, run everywhere

  • [x] PySpark 3.5.0
  • [x] PySpark 3.4.0
  • [x] PySpark 3.3.0
  • [x] PySpark 3.2.x
  • [x] Snowpark DataFrame
  • [x] Pandas DataFrame
  • [x] DuckDB Tables
  • [x] BigQuery Client
  • [x] Polars DataFrame
  • [*] Dagster Integration
  • [x] Spark Connect
  • [x] Daft
  • [-] PDF Report
  • Metadata check
  • Help us in a discussion?

Whilst expanding the functionality feels a bit as an overkill because you most likely can connect spark via its drivers to whatever DBMS of your choice. In the desire to make it even more user-friendly we are aiming to make cuallee portable to all the providers above.

Authors

Contributors

Guidelines

Documentation

Paper

cuallee has been published in the Journal of Open Source Software

Vazquez et al., (2024). cuallee: A Python package for data quality checks across multiple DataFrame APIs. Journal of Open Source Software, 9(98), 6684, https://doi.org/10.21105/joss.06684

If you use cuallee please consider citing this work. Citation

License

Apache License 2.0 Free for commercial use, modification, distribution, patent use, private use. Just preserve the copyright and license.

Made with โค๏ธ in Utrecht ๐Ÿ‡ณ๐Ÿ‡ฑ
Maintained over โŒ› from Ljubljana ๐Ÿ‡ธ๐Ÿ‡ฎ
Extended ๐Ÿš€ by contributions all over the ๐ŸŒŽ

cuallee's People

Contributors

canimus avatar dcodeyl avatar dependabot[bot] avatar devarops avatar dsaad68 avatar herminio-iovio avatar jbytecode avatar maltzsama avatar minzastro avatar runkelcorey avatar ryanjulyan avatar stuffbyyuki avatar vestalisvirginis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

cuallee's Issues

Show results when running 1000 rules are not in order

I appears that when running a test with 1000 rules.
Test scenario is with the Taxi NYC data set with 20M Rows.

df = spark.read.parquet("temp/data/*.parquet")
c = Check(CheckLevel.Warning, "NYC")
for i in range(1000):
  c.is_greater_than("fare_amount", i)
c.validate(spark, df).show(n=1000, truncate=False)

# Displayed dataframe contains wrong order in rows
# in 995 there is a discrepancy because 10% of the rows are certainly not with `fare_amount > 995`

[JOSS Review] duckdb v0.9.2 does not work

Describe the bug
With duckdb v0.9.2 I get the error:
Exception: Cuallee is not ready for this data structure. You can log a Feature Request in Github.

To Reproduce
Steps to reproduce the behavior:

  1. Install duckdb v0.9.2
  2. Read the taxi parquet file to duckdb and pandas
  3. Define some check
  4. Make sure that the check runs sucessfully for the pandas df
  5. Try to run the check with duckdb, get the error

Desktop (please complete the following information):

  • OS: Windows
  • python: 3.11.9
  • cuallee: 0.10.2

This issue is part of a JOSS Review

[JOSS Review] Performance Tests

For the joss review, I need to validate the performance test:
Performance: If there are any performance claims of the software, have they been confirmed? (If there are no claims, please check off this item.)
Is the temp/taxi/ parquet files open data? If yes, could you provide them somehow?

This issue is part of a JOSS Review

daft_validation - AttributeError: module 'statistics' has no attribute 'correlation'

Describe the bug
When attempting to use the correlation function within a custom defined function decorated with @daft.udf, an AttributeError is raised due to the absence of the correlation attribute in the statistics module. This error occurs because the correlation function was introduced in Python version 3.10. Nowadays the minor python version supported by cuallee is 3.8, where this error can be reproduced

To Reproduce
To reproduce the behavior:

  1. Use the correlation function within a custom defined function decorated with @daft.udf.
  2. Execute the code.
    @daft.udf(return_dtype=daft.DataType.float64())
    def correlation(x, y):
>       return [statistics.correlation(x.to_pylist(), y.to_pylist())]
E       AttributeError: module 'statistics' has no attribute 'correlation'

cuallee/daft_validation.py:211: AttributeError

Expected behavior
The correlation function should be successfully called within the custom defined function decorated with @daft.udf.

Additional context
This bug arises due to the fact that the correlation function was introduced in Python version 3.10, and the environment where the code is being executed is likely using an older version of Python where this function does not exist.

[JOSS Review] Show documentation page prominently

Great that you have updated your documentation page. Could you now refer to it prominently, for example in the Readme and/or in the project description on github (upper right corner of the github repository page).

This issue is part of a JOSS Review

[JOSS REVIEW] Community guidelines

Hi @canimus,

I can see a link to the contributing guidelines on a sidebar titled Helpful resources when creating a new issue.

However, I need help finding a link to these guidelines within the documentation.
Can you please point me to the section in the README with the guidelines for third parties wishing to:

  1. Contribute to the software.
  2. Report issues or problems with the software.
  3. Seek support

Thank you!

Exception thrown when whole dataset fails validation using Polars

Describe the bug
When using Polars, if the given dataset fails validation, instead of getting a dataframe with the validation results, an exception is thrown: TypeError: '>=' not supported between instances of 'NoneType' and 'float'

The example below shows the use of the is_unique check, but this has been replicated with other checks as well.

To Reproduce
Steps to reproduce the behavior:

  1. Create a new Jupyter notebook / Python file
  2. Paste the following lines in:
import polars as pl
from cuallee import Check, CheckLevel

df = pl.DataFrame(
    {
        "id": [1, 1, 2, 3, 4],
        "bar": [6, 7, 8, 9, 10],
        "ham": ["a", "b", "c", "d", "e"],
    }
)

id_check = Check(CheckLevel.WARNING, "ID unique")
display(id_check.is_unique("id").validate(df))

Expected behavior
Code returns a dataframe with validation results.

Actual behavior
Code throws exception below:


TypeError Traceback (most recent call last)
Cell In[28], line 13
4 df = pl.DataFrame(
5 {
6 "id": [1, 1, 2, 3, 4],
(...)
9 }
10 )
12 id_check = Check(CheckLevel.WARNING, "ID unique")
---> 13 display(id_check.is_unique("id").validate(df))

File ~/Source/replicate-issue/.venv/lib/python3.11/site-packages/cuallee/init.py:586, in Check.validate(self, dataframe)
581 self.compute_engine = importlib.import_module("cuallee.polars_validation")
583 assert self.compute_engine.validate_data_types(
584 self.rules, dataframe
585 ), "Invalid data types between rules and dataframe"
--> 586 return self.compute_engine.summary(self, dataframe)

File ~/Source/replicate-issue/.venv/lib/python3.11/site-packages/cuallee/polars_validation.py:414, in summary(check, dataframe)
410 return "FAIL"
412 rows = len(dataframe)
--> 414 computation_basis = [
415 {
416 "id": index,
417 "timestamp": check.date.strftime("%Y-%m-%d %H:%M:%S"),
418 "check": check.name,
419 "level": check.level.name,
420 "column": str(rule.column),
421 "rule": rule.method,
422 "value": rule.value,
423 "rows": rows,
424 "violations": _calculate_violations(first(unified_results[hash_key]), rows),
425 "pass_rate": _calculate_pass_rate(first(unified_results[hash_key]), rows),
426 "pass_threshold": rule.coverage,
427 "status": _evaluate_status(
428 _calculate_pass_rate(first(unified_results[hash_key]), rows),
429 rule.coverage,
430 ),
431 }
432 for index, (hash_key, rule) in enumerate(check._rule.items(), 1)
433 ]
434 pl.Config.set_tbl_cols(12)
435 return pl.DataFrame(computation_basis)

File ~/Source/replicate-issue/.venv/lib/python3.11/site-packages/cuallee/polars_validation.py:427, in (.0)
410 return "FAIL"
412 rows = len(dataframe)
414 computation_basis = [
415 {
416 "id": index,
417 "timestamp": check.date.strftime("%Y-%m-%d %H:%M:%S"),
418 "check": check.name,
419 "level": check.level.name,
420 "column": str(rule.column),
421 "rule": rule.method,
422 "value": rule.value,
423 "rows": rows,
424 "violations": _calculate_violations(first(unified_results[hash_key]), rows),
425 "pass_rate": _calculate_pass_rate(first(unified_results[hash_key]), rows),
426 "pass_threshold": rule.coverage,
--> 427 "status": _evaluate_status(
428 _calculate_pass_rate(first(unified_results[hash_key]), rows),
429 rule.coverage,
430 ),
431 }
432 for index, (hash_key, rule) in enumerate(check._rule.items(), 1)
433 ]
434 pl.Config.set_tbl_cols(12)
435 return pl.DataFrame(computation_basis)

File ~/Source/replicate-issue/.venv/lib/python3.11/site-packages/cuallee/polars_validation.py:407, in summary.._evaluate_status(pass_rate, pass_threshold)
405 def _evaluate_status(pass_rate, pass_threshold):
--> 407 if pass_rate >= pass_threshold:
408 return "PASS"
410 return "FAIL"

TypeError: '>=' not supported between instances of 'NoneType' and 'float'

Desktop (please complete the following information):

  • OS: Linux
  • cuallee = "^0.4.5"
  • ipykernel = "^6.24.0"
  • polars = "0.18.7"

Additional context
None.

Implementation of `has_workflow` on snowpark

The has_sum method, collapses a column in a dataframe by adding up all the elements. It has been successfully implemented, in pandas, pyspark and duckdb. Missing implementation on snowpark

Executing validation tasks through Spark Connect is failing

Issue

I've been using Spark Connect for both testing and data validation tasks. Despite following the provided documentation closely, I encountered errors with every example I attempted.

These issues occurred on Apache Spark version 3.5.1. Below, I provide detailed steps to reproduce two specific errors, along with the corresponding error messages.

Environment:

  • Python Version: 3.11.8
  • Apache Spark Versions Tested: 3.5.1
  • Scala Version: 2.12
  • Operating System: Windows

Steps to Reproduce:

Setup

  1. Single node Spark cluster initiated via Docker using the command:
docker run -ti --name spark -p 15002:15002 bitnami/spark:latest /opt/bitnami/spark/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1
  1. Spark Session creation:
spark = SparkSession.builder.appName("PySpark Test") \
            .remote("sc://localhost:15002") \
            .getOrCreate()

1. Error with Dates Example

Issue Reproduction:

Executing the Dates Example as provided in the documentation:

# Unique values on id
check = Check(CheckLevel.WARNING, "CheckIsBetweenDates")
df = spark.sql(
    """
    SELECT 
        explode(
            sequence(
                to_date('2022-01-01'), 
                to_date('2022-01-10'), 
                interval 1 day)) as date
    """)
assert (
    check.is_between("date", "2022-01-01", "2022-01-10")
    .validate(df)
    .first()
    .status == "PASS"
)

Error Message:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[8], line 14
      3 df = spark.sql(
      4     """
      5     SELECT 
   (...)
     10                 interval 1 day)) as date
     11     """)
     12 df = df.toPandas()
     13 assert (
---> 14     check.is_between("date", "2022-01-01", "2022-01-10")
     15     .validate(df)
     16     .first()
     17     .status == "PASS"
     18 )

File c:\Users\Dsaad\GitHub\pyspark-tools\.venv\Lib\site-packages\cuallee\__init__.py:403, in Check.is_between(self, column, value, pct)
    401 def is_between(self, column: str, value: Tuple[Any], pct: float = 1.0):
    402     """Validation of a column between a range"""
--> 403     Rule("is_between", column, value, CheckDataType.AGNOSTIC, pct) >> self._rule
    404     return self

File <string>:13, in __init__(self, method, column, value, data_type, coverage, options, status, violations, pass_rate, ordinal)

File c:\Users\Dsaad\GitHub\pyspark-tools\.venv\Lib\site-packages\cuallee\__init__.py:100, in Rule.__post_init__(self)
     99 def __post_init__(self):
--> 100     if (self.coverage <= 0) or (self.coverage > 1):
    101         raise ValueError("Coverage should be between 0 and 1")
    103     if isinstance(self.column, List):

TypeError: '<=' not supported between instances of 'str' and 'int'

2. Error with Completeness Check Example:

Issue Reproduction:

Executing an example that checks for null values and uniqueness:

from datetime import date
from pyspark.sql import Row

df = spark.createDataFrame([
        Row(user_id=1111, order_id=4343, preferred_store='string1', birthdate=date(1999, 1, 1), joined_date=date(2022, 6, 1)),
        Row(user_id=2222, order_id=5454, preferred_store='string2', birthdate=date(2000, 2, 1), joined_date=date(2022, 7, 2)),
        Row(user_id=3333, order_id=6565, preferred_store='string3', birthdate=date(2001, 3, 1), joined_date=date(2022, 8, 3))
        ])

# Nulls on column Id
check = Check(CheckLevel.WARNING, "Completeness")
(   check
    .is_complete("user_id")
    .is_unique("user_id")
    .validate(df)
).show()

Error Message:

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
Cell In[7], line 13
      7 # Nulls on column Id
      8 check = Check(CheckLevel.WARNING, "Completeness")
      9 (   check
     10     .is_complete("user_id")
     11     .is_unique("user_id")
---> 12     .validate(df)
     13 ).show()

File c:\Users\Dsaad\GitHub\pyspark-tools\.venv\Lib\site-packages\cuallee\__init__.py:703, in Check.validate(self, dataframe)
    700     self.compute_engine = importlib.import_module("cuallee.polars_validation")
    702 else:
--> 703     raise Exception(
    704         "Cuallee is not ready for this data structure. You can log a Feature Request in Github."
    705     )
    707 assert self.compute_engine.validate_data_types(
    708     self.rules, dataframe
    709 ), "Invalid data types between rules and dataframe"
    711 return self.compute_engine.summary(self, dataframe)

Exception: Cuallee is not ready for this data structure. You can log a Feature Request in Github.

I would appreciate any guidance or updates on resolving these errors. Thank you for your assistance.

[JOSS REVIEW] Great Expectations and Soda Core references are not rendering properly

We see that in the article, we are trying to cite Soda Core and Great Expectations:

On the other hand, `great-expectations` [@Gong_Great_Expectations] and `soda` [@soda_core] additionaly to an open-source platform also offer commercial options that require registration and issuing of keys for cloud reporting capabilities.

but it is not rendering properly:

image

Maybe the @software option is not working.

cuallee/paper/paper.bib

Lines 141 to 152 in e4c1242

@software{Gong_Great_Expectations,
author = {Gong, Abe and Campbell, James and {Great Expectations}},
license = {Apache-2.0},
title = {{Great Expectations}},
url = {https://github.com/great-expectations/great_expectations}
}
@software{soda_core,
license = {Apache-2.0},
title = {Soda Core},
url = {https://github.com/sodadata/soda-core}
}

We are confident that you can find a solution. Could you please consider using the @misc option or any other alternative that you think might work?


This issue is part of a JOSS REVIEW

[JOSS Review] State of the field in `paper.md`

This is probably my last issue of this review, as I have finished reviewing both the functionality and the documentation page. Only one minor thing about the recent changes in the paper:

  • Both soda and great-expectations have a core component that is open source and additional features / cloud stuff where you need to pay. Could you correct this in your paper where you write:
    On the other hand, great-expectations and soda are commercial options that require registration and issuing of keys for cloud reporting capabilities.
  • Could you add references to the two repos? great-expectation has a citation.cff file for that purpose

This issue is part of a JOSS Review

Number of overall violations divided by number of columns in `are_complete`

Describe the bug
The issue is probably noticeable with other are_* validations. It seems that when passing a number of columns to be checked, the total number of violations found will be divided by the number of columns given. For example, if a check will be done over 3 columns, and 12 violations are found, only 4 will be reported. This is OK if all violations were on the same row, but will underreport when the violations are on different rows.

To Reproduce
Run the following code:

from pyspark.sql import SparkSession
from cuallee import Check, CheckLevel
spark = SparkSession.builder.getOrCreate()

check = Check(CheckLevel.WARNING, "Not NULL").are_complete(["col_a", "col_b", "col_c"])
# This is fine

df1 = spark.createDataFrame([
    {"col_a": 1, "col_b": 1, "col_c": 1}, {"col_a": 2, "col_b": 2, "col_c": 2}, {"col_a": 3, "col_b": 3, "col_c": 3}
], schema="struct<col_a: int, col_b: int, col_c: int>")
df1.show(truncate=False)
check.validate(df1).show(truncate=False)
+-----+-----+-----+
|col_a|col_b|col_c|
+-----+-----+-----+
|1    |1    |1    |
|2    |2    |2    |
|3    |3    |3    |
+-----+-----+-----+

+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+---------+--------------+------+
|id |timestamp          |check   |level  |column                     |rule        |value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+---------+--------------+------+
|1  |2023-08-07 12:23:32|Not NULL|WARNING|('col_a', 'col_b', 'col_c')|are_complete|N/A  |3   |0.0       |1.0      |1.0           |PASS  |
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+---------+--------------+------+
# This is also fine

df2 = spark.createDataFrame([
    {"col_a": None, "col_b": None, "col_c": None}, {"col_a": 2, "col_b": 2, "col_c": 2}, {"col_a": 3, "col_b": 3, "col_c": 3}
], schema="struct<col_a: int, col_b: int, col_c: int>")
df2.show(truncate=False)
check.validate(df2).show(truncate=False)
+-----+-----+-----+
|col_a|col_b|col_c|
+-----+-----+-----+
|null |null |null |
|2    |2    |2    |
|3    |3    |3    |
+-----+-----+-----+

+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
|id |timestamp          |check   |level  |column                     |rule        |value|rows|violations|pass_rate         |pass_threshold|status|
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
|1  |2023-08-07 12:23:32|Not NULL|WARNING|('col_a', 'col_b', 'col_c')|are_complete|N/A  |3   |1.0       |0.6666666666666666|1.0           |FAIL  |
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
# This should show 3 violations, but it seems that the number of violations outputted is divided by the number of columns

df3 = spark.createDataFrame([
    {"col_a": None, "col_b": 1, "col_c": 1}, {"col_a": None, "col_b": 2, "col_c": 2}, {"col_a": None, "col_b": 3, "col_c": 3}
], schema="struct<col_a: int, col_b: int, col_c: int>")
df3.show(truncate=False)
check.validate(df3).show(truncate=False)
+-----+-----+-----+
|col_a|col_b|col_c|
+-----+-----+-----+
|null |1    |1    |
|null |2    |2    |
|null |3    |3    |
+-----+-----+-----+

+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
|id |timestamp          |check   |level  |column                     |rule        |value|rows|violations|pass_rate         |pass_threshold|status|
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
|1  |2023-08-07 12:23:32|Not NULL|WARNING|('col_a', 'col_b', 'col_c')|are_complete|N/A  |3   |1.0       |0.6666666666666666|1.0           |FAIL  |
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
# This should also show 3 violations and a 0% pass rate

df4 = spark.createDataFrame([
    {"col_a": None, "col_b": 1, "col_c": 1}, {"col_a": 2, "col_b": None, "col_c": 2}, {"col_a": 3, "col_b": 3, "col_c": None}
], schema="struct<col_a: int, col_b: int, col_c: int>")
df4.show(truncate=False)
check.validate(df4).show(truncate=False)
+-----+-----+-----+
|col_a|col_b|col_c|
+-----+-----+-----+
|null |1    |1    |
|2    |null |2    |
|3    |3    |null |
+-----+-----+-----+

+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
|id |timestamp          |check   |level  |column                     |rule        |value|rows|violations|pass_rate         |pass_threshold|status|
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+
|1  |2023-08-07 12:23:32|Not NULL|WARNING|('col_a', 'col_b', 'col_c')|are_complete|N/A  |3   |1.0       |0.6666666666666666|1.0           |FAIL  |
+---+-------------------+--------+-------+---------------------------+------------+-----+----+----------+------------------+--------------+------+

Expected behavior
Ideally, the number of violations should reflect the number of offending rows (unless I'm misunderstanding something).

Desktop (please complete the following information):

  • OS: Linux
  • Browser: Visual Studio Code / Jupyter Notebooks
  • Version cuallee==0.4.7

[JOSS Review] Extend Docstring of central class

The class Check seems like the central entry point to the package, however it lacks a documentation. Could you add docstrings to it to explain the purpose of the function and of all parameters?

In case you have not worked with docstrings yet, they are super helpful in modern IDEs to get info about the function when hovering over it. There are some docstring format conventions and also tools like autodocstring that help to create docstrings.

This issue is part of a JOSS Review

[JOSS Review] `Paper` Report on state of the field

JOSS wants the software papers to contain a comparison to the state of the field, i.e.

State of the field: Do the authors describe how this software compares to other commonly-used packages?

Hence, could you add a section in your paper where you shortly compare cuallee to other data testing frameworks, like soda, great_expectations, or others?

This issue is part of a JOSS Review

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.