Giter Site home page Giter Site logo

mrpowers / quinn Goto Github PK

View Code? Open in Web Editor NEW
578.0 19.0 90.0 1.92 MB

pyspark methods to enhance developer productivity ๐Ÿ“ฃ ๐Ÿ‘ฏ ๐ŸŽ‰

Home Page: https://mrpowers.github.io/quinn/

Python 99.70% Makefile 0.30%
pyspark apache-spark

quinn's Introduction

Quinn

image image PyPI - Downloads PyPI version

Pyspark helper methods to maximize developer productivity.

Quinn provides DataFrame validation functions, useful column functions / DataFrame transformations, and performant helper functions.

quinn

Documentation

You can find official documentation here.

Setup

Quinn is uploaded to PyPi and can be installed with this command:

pip install quinn

Quinn Helper Functions

import quinn

DataFrame Validations

validate_presence_of_columns()

Raises an exception unless source_df contains the name, age, and fun column.

quinn.validate_presence_of_columns(source_df, ["name", "age", "fun"])

validate_schema()

Raises an exception unless source_df contains all the StructFields defined in the required_schema.

quinn.validate_schema(source_df, required_schema)

validate_absence_of_columns()

Raises an exception if source_df contains age or cool columns.

quinn.validate_absence_of_columns(source_df, ["age", "cool"])

Functions

single_space()

Replaces all multispaces with single spaces (e.g. changes "this has some" to "this has some".

actual_df = source_df.withColumn(
    "words_single_spaced",
    quinn.single_space(col("words"))
)

remove_all_whitespace()

Removes all whitespace in a string (e.g. changes "this has some" to "thishassome".

actual_df = source_df.withColumn(
    "words_without_whitespace",
    quinn.remove_all_whitespace(col("words"))
)

anti_trim()

Removes all inner whitespace, but doesn't delete leading or trailing whitespace (e.g. changes " this has some " to " thishassome ".

actual_df = source_df.withColumn(
    "words_anti_trimmed",
    quinn.anti_trim(col("words"))
)

remove_non_word_characters()

Removes all non-word characters from a string (e.g. changes "si%$#@!#$!@#mpsons" to "simpsons".

actual_df = source_df.withColumn(
    "words_without_nonword_chars",
    quinn.remove_non_word_characters(col("words"))
)

multi_equals()

multi_equals returns true if s1 and s2 are both equal to "cat".

source_df.withColumn(
    "are_s1_and_s2_cat",
    quinn.multi_equals("cat")(col("s1"), col("s2"))
)

approx_equal()

This function takes 3 arguments which are 2 Pyspark DataFrames and one integer values as threshold, and returns the Boolean column which tells if the columns are equal in the threshold.

let the columns be
col1 = [1.2, 2.5, 3.1, 4.0, 5.5]
col2 = [1.3, 2.3, 3.0, 3.9, 5.6]
threshold = 0.2

result = approx_equal(col("col1"), col("col2"), threshold)
result.show()

+-----+
|value|
+-----+
| true|
|false|
| true|
| true|
| true|
+-----+

array_choice()

This function takes a Column as a parameter and returns a PySpark column that contains a random value from the input column parameter

df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["values"])
result = df.select(array_choice(col("values")))

The output is :=
+--------------+
|array_choice()|
+--------------+
|             2|
+--------------+

regexp_extract_all()

The regexp_extract_all takes 2 parameters String s and regexp which is a regular expression. This function finds all the matches for the string which satisfies the regular expression.

print(regexp_extract_all("this is a example text message for testing application",r"\b\w*a\w*\b"))

The output is :=
['a', 'example', 'message', 'application']

Where r"\b\w*a\w*\b" pattern checks for words containing letter a

week_start_date()

It takes 2 parameters, column and week_start_day. It returns a Spark Dataframe column which contains the start date of the week. By default the week_start_day is set to "Sun".

For input ["2023-03-05", "2023-03-06", "2023-03-07", "2023-03-08"] the Output is

result = df.select("date", week_start_date(col("date"), "Sun"))
result.show()
+----------+----------------+
|      date|week_start_date |
+----------+----------------+
|2023-03-05|      2023-03-05|
|2023-03-07|      2023-03-05|
|2023-03-08|      2023-03-05|
+----------+----------------+

week_end_date()

It also takes 2 Paramters as Column and week_end_day, and returns the dateframe column which contains the end date of the week. By default the week_end_day is set to "sat"

+---------+-------------+
      date|week_end_date|
+---------+-------------+
2023-03-05|   2023-03-05|
2023-03-07|   2023-03-12|
2023-03-08|   2023-03-12|
+---------+-------------+

uuid5()

This function generates UUIDv5 in string form from the passed column and optionally namespace and optional extra salt. By default namespace is NAMESPACE_DNS UUID and no extra string used to reduce hash collisions.


df = spark.createDataFrame([("lorem",), ("ipsum",)], ["values"])
result = df.select(quinn.uuid5(F.col("values")).alias("uuid5"))
result.show(truncate=False)

The output is :=
+------------------------------------+
|uuid5                               |
+------------------------------------+
|35482fda-c10a-5076-8da2-dc7bf22d6be4|
|51b79c1d-d06c-5b30-a5c6-1fadcd3b2103|
+------------------------------------+

Transformations

snake_case_col_names()

Converts all the column names in a DataFrame to snake_case. It's annoying to write SQL queries when columns aren't snake cased.

quinn.snake_case_col_names(source_df)

sort_columns()

Sorts the DataFrame columns in alphabetical order, including nested columns if sort_nested is set to True. Wide DataFrames are easier to navigate when they're sorted alphabetically.

quinn.sort_columns(df=source_df, sort_order="asc", sort_nested=True)

DataFrame Helpers

with_columns_renamed()

Rename ALL or MULTIPLE columns in a dataframe by implementing a common logic to rename the columns.

Consider you have the following two dataframes for orders coming from a source A and a source B:

order_a_df.show()

+--------+---------+--------+
|order_id|order_qty|store_id|
+--------+---------+--------+
|     001|       23|    45AB|
|     045|        2|    98HX|
|     021|      142|    09AA|
+--------+---------+--------+

order_b_df.show()

+--------+---------+--------+
|order_id|order_qty|store_id|
+--------+---------+--------+
|     001|       23|    47AB|
|     985|        2|    54XX|
|    0112|       12|    09AA|
+--------+---------+--------+

Now, you need to join these two dataframes. However, in Spark, when two dfs with identical column names are joined, you may start running into ambiguous column name issue due to multiple columns with the same name in the resulting df. So it's a best practice to rename all of these columns to reflect which df they originate from:

def add_suffix(s):
    return s + '_a'

order_a_df_renamed = quinn.with_columns_renamed(add_suffix)(order_a_df)

order_a_df_renamed.show()
+----------+-----------+----------+
|order_id_a|order_qty_a|store_id_a|
+----------+-----------+----------+
|       001|         23|      45AB|
|       045|          2|      98HX|
|       021|        142|      09AA|
+----------+-----------+----------+

column_to_list()

Converts a column in a DataFrame to a list of values.

quinn.column_to_list(source_df, "name")

two_columns_to_dictionary()

Converts two columns of a DataFrame into a dictionary. In this example, name is the key and age is the value.

quinn.two_columns_to_dictionary(source_df, "name", "age")

to_list_of_dictionaries()

Converts an entire DataFrame into a list of dictionaries.

quinn.to_list_of_dictionaries(source_df)

show_output_to_df()

quinn.show_output_to_df(output_str, spark)

Parses a spark DataFrame output string into a spark DataFrame. Useful for quickly pulling data from a log into a DataFrame. In this example, output_str is a string of the form:

+----+---+-----------+------+
|name|age|     stuff1|stuff2|
+----+---+-----------+------+
|jose|  1|nice person|  yoyo|
|  li|  2|nice person|  yoyo|
| liz|  3|nice person|  yoyo|
+----+---+-----------+------+

Schema Helpers

schema_from_csv()

Converts a CSV file into a PySpark schema (aka StructType). The CSV must contain the column name and type. The nullable and metadata columns are optional.

quinn.schema_from_csv("schema.csv")

Here's an example CSV file:

name,type
person,string
address,string
phoneNumber,string
age,int

Here's how to convert that CSV file to a PySpark schema using schema_from_csv():

schema = schema_from_csv(spark, "some_file.csv")

StructType([
    StructField("person", StringType(), True),
    StructField("address", StringType(), True),
    StructField("phoneNumber", StringType(), True),
    StructField("age", IntegerType(), True),
])

Here's a more complex CSV file:

name,type,nullable,metadata
person,string,false,{"description":"The person's name"}
address,string
phoneNumber,string,TRUE,{"description":"The person's phone number"}
age,int,False

Here's how to read this CSV file into a PySpark schema:

another_schema = schema_from_csv(spark, "some_file.csv")

StructType([
    StructField("person", StringType(), False, {"description": "The person's name"}),
    StructField("address", StringType(), True),
    StructField("phoneNumber", StringType(), True, {"description": "The person's phone number"}),
    StructField("age", IntegerType(), False),
])

print_schema_as_code()

Converts a Spark DataType to a string of Python code that can be evaluated as code using eval(). If the DataType is a StructType, this can be used to print an existing schema in a format that can be copy-pasted into a Python script, log to a file, etc.

For example:

# Consider the below schema for fields
fields = [
    StructField("simple_int", IntegerType()),
    StructField("decimal_with_nums", DecimalType(19, 8)),
    StructField("array", ArrayType(FloatType()))
]
schema = StructType(fields)

printable_schema: str = quinn.print_schema_as_code(schema)
print(printable_schema)
StructType(
	fields=[
		StructField("simple_int", IntegerType(), True),
		StructField("decimal_with_nums", DecimalType(19, 8), True),
		StructField(
			"array",
			ArrayType(FloatType()),
			True,
		),
	]
)

Once evaluated, the printable schema is a valid schema that can be used in dataframe creation, validation, etc.

from chispa.schema_comparer import assert_basic_schema_equality

parsed_schema = eval(printable_schema)
assert_basic_schema_equality(parsed_schema, schema) # passes

print_schema_as_code() can also be used to print other DataType objects.

ArrayType

array_type = ArrayType(FloatType())
printable_type: str = quinn.print_schema_as_code(array_type)
print(printable_type)
ArrayType(FloatType())

MapType

map_type = MapType(StringType(), FloatType())
printable_type: str = quinn.print_schema_as_code(map_type)
print(printable_type)
MapType(
       StringType(),
       FloatType(),
       True,
)

IntegerType, StringType etc.

integer_type = IntegerType()
printable_type: str = quinn.print_schema_as_code(integer_type)
print(printable_type)
IntegerType()

Pyspark Core Class Extensions

from quinn.extensions import *

Column Extensions

isFalsy()

Returns True if has_stuff is None or False.

source_df.withColumn("is_stuff_falsy", F.col("has_stuff").isFalsy())

isTruthy()

Returns True unless has_stuff is None or False.

source_df.withColumn("is_stuff_truthy", F.col("has_stuff").isTruthy())

isNullOrBlank()

Returns True if blah is null or blank (the empty string or a string that only contains whitespace).

source_df.withColumn("is_blah_null_or_blank", F.col("blah").isNullOrBlank())

isNotIn()

Returns True if fun_thing is not included in the bobs_hobbies list.

source_df.withColumn("is_not_bobs_hobby", F.col("fun_thing").isNotIn(bobs_hobbies))

nullBetween()

Returns True if age is between lower_age and upper_age. If lower_age is populated and upper_age is null, it will return True if age is greater than or equal to lower_age. If lower_age is null and upper_age is populate, it will return True if age is lower than or equal to upper_age.

source_df.withColumn("is_between", F.col("age").nullBetween(F.col("lower_age"), F.col("upper_age")))

Contributing

We are actively looking for feature requests, pull requests, and bug fixes.

Any developer that demonstrates excellence will be invited to be a maintainer of the project.

Code Style

We are using PySpark code-style and sphinx as docstrings format. For more details about sphinx format see this tutorial. A short example of sphinx-formated docstring is placed below:

"""[Summary]

:param [ParamName]: [ParamDescription], defaults to [DefaultParamVal]
:type [ParamName]: [ParamType](, optional)
...
:raises [ErrorType]: [ErrorDescription]
...
:return: [ReturnDescription]
:rtype: [ReturnType]
"""

quinn's People

Contributors

afranzi avatar ajitkshirsagar avatar bjornjorgensen avatar breaka84 avatar dependabot[bot] avatar dinjazelena avatar ewellinger avatar fpvmorais avatar gund0wn151 avatar indexseek avatar jamesfielder avatar jeffbrennan avatar kunaljubce avatar mrjsj avatar mrpowers avatar naif-w-alharthi avatar nathanlim45 avatar nikhilgupta178 avatar nsphung avatar palmperez avatar puneetsharma04 avatar semyonsinchenko avatar sidharth1805 avatar squerez avatar yevign avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

quinn's Issues

Checking PySpark function argument types to create readable error messages

The quinn library defines a simple function to single spaces all multispaces in a string:

def single_space(col):
    return F.trim(F.regexp_replace(col, " +", " "))

If the function is invoked with a non-column argument (e.g. quinn.single_space(42)), it spits out a very unreadable error message:

def get_return_value(answer, gateway_client, target_id=None, name=None):
    """Converts an answer received from the Java gateway into a Python object.

    For example, string representation of integers are converted to Python
    integer, string representation of objects are converted to JavaObject
    instances, etc.

    :param answer: the string returned by the Java gateway
    :param gateway_client: the gateway client used to communicate with the Java
        Gateway. Only necessary if the answer is a reference (e.g., object,
        list, map)
    :param target_id: the name of the object from which the answer comes from
        (e.g., *object1* in `object1.hello()`). Optional.
    :param name: the name of the member from which the answer comes from
        (e.g., *hello* in `object1.hello()`). Optional.
    """
    if is_error(answer)[0]:
        if len(answer) > 1:
            type = answer[1]
            value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
            if answer[1] == REFERENCE_TYPE:
                raise Py4JJavaError(
                    "An error occurred while calling {0}{1}{2}.\n".
                    format(target_id, ".", name), value)
            else:
                raise Py4JError(
                    "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
                   format(target_id, ".", name, value))

E py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
E py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
E at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
E at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
E at py4j.Gateway.invoke(Gateway.java:274)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.GatewayConnection.run(GatewayConnection.java:214)
E at java.lang.Thread.run(Thread.java:745)

The PySpark withColumn method used explicitly checks the input type with assert isinstance(col, Column), "col should be Column" to give a readable error message.

We could refactor the single_space method as follows:

def single_space(col):
    assert isinstance(col, Column), "col should be Column"
    return F.trim(F.regexp_replace(col, " +", " "))

I added some type hints and I'm still getting the same error:

def single_space(col: Column) -> Column:
    return F.trim(F.regexp_replace(col, " +", " "))

@capdevc @smack1 @eclosson @FavioVazquez - What's the best way to write this function?

Refactor the DataFrame#transform method to be more elegant

This library defines a DataFrame.transform method to chain DataFrame transformations as follows:

from pyspark.sql.functions import lit

def with_greeting(df):
    return df.withColumn("greeting", lit("hi"))

def with_something(df, something):
    return df.withColumn("something", lit(something))

data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])

actual_df = source_df\
    .transform(lambda df: with_greeting(df))\
    .transform(lambda df: with_something(df, "crazy"))

The Spark Scala API has a built-in transform method that lets users chain DataFrame transformations more elegantly, as described in this blog post.

Here's an interface I'd prefer (this is what we do in Scala and I know this will need to be changed around for Python, but I'd like something like this):

def with_greeting()(df):
    return df.withColumn("greeting", lit("hi"))

def with_something(something)(df):
    return df.withColumn("something", lit(something))

data = [("jose", 1), ("li", 2), ("luisa", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])

actual_df = source_df\
    .transform(with_greeting())\ # the transform method magically knows that self should be passed into the second parameter list
    .transform(with_something("crazy"))

Here is the code that needs to be changed.

If we can figure out a better interface, we should consider making a pull request to the Spark source code. I use the transform method every day when writing Spark/Scala code and think this is a major omission in the PySpark API.

If my ideal interface isn't possible is there anything that's better?! I really don't like my current solution that requires lambda.

@pirate - help!

Python 2.7 Compatibility

Hey @MrPowers,

First off this package looks pretty sweet and something we could definitely work into our analytics pipeline, so nicely done! Unfortunately we have some system requirements that are holding us back in the python 2.7 realm (mostly the cloud infrastructure and a few pesky python 2 only packages) and was wondering what your opinion would be on adapting this code to also work in python 2?

To be clear, I would be doing this work and getting a PR in for adding that functionality, mostly wondering if there are any outstanding difficulties that exist off the top of your head. Let me know as I'd love to get this incorporated into our production processes!

Best,
Erich

Create some "schema safe append" functionality

The function should append the data if the append_df has a schema that matches the df exactly.

If the schema doesn't match exactly, then it should error out.

This "schema safe append" could prevent bad appends.

Document show_output_to_df in README

Document the show_output_to_df method in the README.

Also check through the rest of the public interface and make sure all other methods are documented in the README.

Run tests for different PySpark versions

We should update the GitHub actions to run the tests for both PySpark 2.x and PySpark 3.x.

I am only running the tests with PySpark 3.x locally when I am merging changing. We should also run tests with PySpark 2.x to make sure we're not making any breaking changes.

[Feature request] Functionality for working with FileSystem with PySpark

By default Spark does not provide functionality to read and write files on HDFS/S3/LocalFS. In Scala one can do it via org.apache.hadoop.fs.FileSystem but in PySpark it is not so easy to implement. I want to have a class like HadoopFileSystem or SparkWriter / SparkReader which provides top level read_bytes / write_bytes / glob / mkdir / touch functionality and uses org.apache.hadoop.fs.FileSystem via py4j under the hood. Something like dbutils.fs from Databricks but open source and with zero-dependencies.

I see it as a class with static init methods like fs = quin.fs.FileSystem.fromPath("s3a://...", session=spark) which under the hood uses org.apache.hadoop.fs.Path.toUri() and org.apache.hadoop.fs.FileSystem.get(uri, configuration) to choose the right implementation of FileSystem (HDFS, S3 or Local) automatically. It is important especially in unit testing because allows user do not patch class but just use local FS for tests.

I know how to implement it (I did the same thing with scala already and for PySpark I should just wrap all the scala calls into Py4j). The question is mostly about design and which additional methods should I implement? Maybe some additional things like cp, mv, createIfNotExists and so fourth and so on? Maybe also some top level syntax sugar like writePickle, etc?

Also with such a class we can easily implement functionality which write and read Excel files from FileSystem with PySpark (but under the hood there will be Pandas and it will work only for small files).

Brainstorming functions to make PySpark easier

One of the overall goals of this project is to make PySpark easier. The purpose of this issue is to brainstorm additional functionality that should be added to quinn to make PySpark easier.

Let's look at the Ruby on Rails web framework that has a well known ActiveSupport module to make common web development tasks easy.

When you use Rails and search "rails beginning of week", you're immediately directed to the beginning_of_week function, which is easy.

When you type in "spark beginning of week", you're redirected to this blog post I wrote, which provides a spark-daria Scala function for beginningOfWeek, which is also easy. I can see that users read this blog post every day from the Google Analytics traffic.

When you type in "pyspark beginning of week", you're redirected to this Stackoverflow question, which is scary, complicated, and makes this seem hard. I need to write a blog post that ranks #1 for "pyspark beginning of week" searches that show the quinn.week_start_date() function, so Python users think this is easy. We have a function that's easy, but it's not findable yet.

What are your thoughts for new functions we can add to quinn that will help make PySpark easy?

QA 0.4.0 release

@afranzi - I just released quinn v0.4.0 in PyPI. Can you please let me know how the latest wheel file works for you?

If you have a sec, can you try to add some best practices guidelines on how to use the SparkProvider class in the README? Think it'd be best to help the community as much as possible on this one because it's a common source of confusion!

Make create_df better

quinn lets you do this right now:

spark.create_df(
    [("jose", "a"), ("li", "b"), ("sam", "c")],
    [("name", StringType(), True), ("blah", StringType(), True)]
)

It'd also be cool if this was possible:

spark.create_df(
    [("jose", "a"), ("li", "b"), ("sam", "c")],
    ["name", "blah"]
)

printAthenaCreateTable() support

Howdy! I came across this project via your blog post.

printAthenaCreateTable() looks like a very handy utility function, but it doesn't seem to be implemented here. Are there plans to add it?

More generally, is this project intended to be a wrapper around spark-daria, or a stand-alone project with a similar API?

Deprecation warning due to invalid escape sequences

Deprecation warnings are raised due to invalid escape sequences. This can be fixed by using raw strings or escaping the literals. pyupgrade also helps in automatic conversion : https://github.com/asottile/pyupgrade/

find . -iname '*.py' | grep -Ev 'vendor|example|doc|tools|sphinx' | xargs -P4 -I{} python3.8 -Wall -m py_compile {}
./quinn/scala_to_pyspark.py:49: DeprecationWarning: invalid escape sequence \s
  m = re.search('(\s*)def (\w+).*\((.*)\)', s)

Add pre-commit to the project

Continuation of the #66

  • remove the current flake8 GitHub action
  • add a Ruff GitHub action
  • add Ruff as a pre-commit hook
  • add PyTest as a pre-commit hook

Figure out how to deal with the PySpark 2 extensions

The DataFrame#transform extension is useful for PySpark 2 users but should not be run for PySpark 3 users (cause it's built into the API).

When a user runs from quinn.extensions import * we can either use the spark.version variable to programatically skip over modules that shouldn't be imported for Spark 3 or we can design a separate import interface.

I'm still not sure which approach is better.

Create abstraction to split into multiple columns easily

Suppose you have this DataFrame:

+------------+---------------+-------+
|student_name|graduation_year|  major|
+------------+---------------+-------+
| chrisXXborg|           2025|    bio|
|davidXXcross|           2026|physics|
|sophiaXXraul|           2022|    bio|
|    fredXXli|           2025|physics|
|someXXperson|           2023|   math|
|     liXXyao|           2025|physics|
+------------+---------------+-------+

Here is how to clean the DataFrame:

from pyspark.sql.functions import col, split

clean_df = (
    df.withColumn("student_first_name", split(col("student_name"), "XX").getItem(0))
    .withColumn("student_last_name", split(col("student_name"), "XX").getItem(1))
    .drop("student_name")
)

It'd be nice to have a function that would do this automatically:

quinn.split_col(df, col_name="student_name", delimiter="XX", new_col_names=["student_first_name", "student_last_name"])

The current syntax is tedious.

QA the v0.10.0 release

I just made the v0.10.0 release. The first release in a really long time.

Feel free to QA the release and let me know if there are any issues. Appreciate the help!

Move Spark session code outside of this repo

There's a class that creates a SparkSession that's used by the test suite: https://github.com/MrPowers/quinn/blob/master/quinn/spark.py

The quinn methods don't depend on this SparkSession, which is good. We want quinn to be easily runnable on all different types of platforms and we generally want the execution platform to define the SparkSession, not quinn.

It's probably best to simply move this class to the tests/ directory. That way we can still access it and use it as a good example for the community, but it'll be clear that we shouldn't use it for code in the quinn/ directory.

It might be cool to monkey patch a transform method on the SparkSession class so it's easier to write code that leverages the existing SparkSession without creating a new one (or running getOrCreate). Passing the spark session as an argument and solving the problem with dependency injection is always an option, but if we go this route, we should make conventions, like always having the spark session as the first argument when it's injected.

Update the validate methods to return a boolean value, so they can be used for control flow

The validate_presence_of_columns, validate_absence_of_columns, and validate_schema methods currently throw exceptions when they fail and return nothing otherwise.

It'd be nice to add a flag for these methods to return boolean values. That'd make it easier for folks to add schema safe append logic like this:

if quinn.validate_schema(df, bad_append_df.schema):
    bad_append_df.write.format("parquet").mode("append").save("tmp/parquet1")

Right now, quinn.validate_schema will return None, even if the schema match, and None is falsy in a boolean context.

Add a flatten DataFrame method

This was suggested as an addition to chispa here: MrPowers/chispa#47

I think this would be an even better addition to quinn because it's also useful for general data engineering (not just testing).

This function would help a lot of users.

[Feature Request] Random numbers generation functions

What we have

Currently PySpark provide only two methods of generating random numbers:

  1. rand which generates numbers from $\sim U(0, 1)$
  2. randn which generates numbers from $\sim N(0, 1)$

What I want to have?

  • Random integers generation
  • Samples from $\sim U(\alpha, \beta)$
  • Samples from $\sim N(\alpha, \beta)$
  • Samples from $\sim L(\alpha, \beta)$ (Laplace distribution)
  • Samples from $\sim \Gamma (\alpha, \beta)$ (Gamma distribution)

Motivation

  1. Random integers is important for a lot of reasons and re-partitioning is one of them.
  2. Gaussian, Laplace and Gamma distributions are a key part to implement Differential Privacy Driven aggregations (for example, Additive noise mechanism and especially Laplace mechanism).

How to do it?

All distribution can be generated from the uniform. The idea is to provide a top level functions.

Using GH-Pages for hosting documentation of API

My suggestion is to use GH-Pages for API Docs (instead of README.md).

There are few options:

  1. Use mkdocs: deploying MKDocs to GHP
  2. Use sphinx: GH Action to deploy Sphinx to GHP

Anyway there will be a day when amount of public functions and helpers will be so big that it is not fin into a single README page...

P.S. Also it will solve the problem with outdated documentation because we will use functions/methods signatures and docstrings to update documentation as a CI/CD step after push into main branch.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.