Giter Site home page Giter Site logo

alejandronotario / bdd-pyspark Goto Github PK

View Code? Open in Web Editor NEW

This project forked from dantelore/bdd-pyspark

0.0 0.0 0.0 26 KB

Simple demo using "behave" and "pyspark" libraries to test data transformations in a human-readable way

Python 54.51% Gherkin 45.49%

bdd-pyspark's Introduction

Spark + Python + Gherkin

This repo is an example of how you can wrap PySpark scripts in BDD, and how you can use Gherkin to bring less technical people along for the ride.

Why PySpark?

PySpark is great! If you're a developer looking at this page, maybe go and have a look before cloning the repo ;)

If you're just generally interested, PySpark provides a means to create and deploy simple, batch data processing jobs quickly and cheaply. As a data engineer, sometimes you need a heavy-weight solution with Spark on pure JVM (Scala, Java etc) but other times you just need to develop an ETL quickly.

  • Perhaps to extract data from a data lake for reporting: merging a day's worth of purchases into a finance report.
  • Perhaps to create models or summaries from large datasets: aggregating millions of rows of transactions into a daily summary view.
  • Maybe for training a machine learning model or keeping your feature store up to date.
  • You might want to throw these jobs into Airflow or Luigi for the ultimate ETL zen

Building jobs quickly is great - you get your data from one place to another really quick. The downside is that it can feel a bit hacky, which is where BDD/Gherkin comes in.

Why Gherkin?

Behaviour Driven Development is pretty ubiquitous these days. For the uninitiated, BDD is about testing the behaviour of a process or app, rather than it's internal workings. While TDD might test across the boundaries of a class or function, BDD focuses on inputs and outputs of a whole application - testing all the code as it does so. BDD is less brittle than TDD (you can change implementation without rewriting your tests) and tends to be more closely related to the acceptance criteria (given... when... then's) on your story cards.

Gherkin is a human readable language for BDD - it abstracts code away behind almost-natural-language statements. These map to code which can be run just like unit tests, giving you instant feedback on whether your app's behaviour meets the spec. Many developers see Gherkin as being 'old hat' these days - needless syntactic-sugar which makes definition of tests slower and more cumbersome. I happen to agree with them... BUT I also feel that Gherkin has some advantages in the data world that are less obvious to people writing 'normal' software:

  • Gherkin is great for defining tables of data for tests
  • Gherkin is accessible to non-coders - data analysts, BAs and suchlike

Define a Scenario:

  Scenario: Simple filter on a table
    Given a spark session
    And a table called "students" containing
      | name:String | age:Int |
      | Fred        | 9       |
      | Mary        | 10      |
      | Bob         | 10      |
    When I select rows from "students" where "age" greater than "10" into table "results"
    Then the table "results" contains
      | name:String | age:Int |
      | Mary        | 10      |
      | Bob         | 10      |

Run the 'tests', get feedback:

$ behave
...
2 features passed, 0 failed, 0 skipped
9 scenarios passed, 0 failed, 0 skipped
41 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m25.693s

Data engineers are often tasked with writing code to transform data for reporting and analytics. However it's often others in the business who have the domain knowledge to define how the transformation should work - the specific joins, filters and aggregations can often be pretty complex. The expert might define the logic in a user story or discuss the task with an engineer round a whiteboard - but there's still much scope for misinterpretation and confusion.

This is where Gherkin is useful - it allows the Expert to define what they want in an unambiguous way and gives the Engineer confidence that their code is doing the right thing.

The Code

The following is a brief walk-through of the code. I'm using the behave and pyspark libraries. Here's a good tip for making behave work in the community edition of IntelliJ.

In most cases I am using temporary tables in Spark's SqlContext to pass data around between steps. This means the business logic can be separated from the read and write operations without the need to create mocks. In your BDD steps you throw the data into tables with a give names, your code runs and creates more tables which you can test. In your production app you read the data from whatever source you have and throw it into the same set of tables - no mocks, class hierarchies or other nastiness.

What? No mocks?

Simple Filters and Joins

Examples of simple filters and joins can be found in spark.feature. Implementation of the steps is spread across the various .py files in the features/steps directory.

Here's an example Scenario a simple join:

  Scenario: Simple join of two tables
    Given a spark session
    And a table called "students" containing
      | id:Int | name:String | subject_id:Int |
      | 1      | Fred        | 1              |
      | 2      | Sally       | 1              |
      | 3      | Susan       | 2              |
    And a table called "subjects" containing
      | id:Int | name:String |
      | 1      | Maths       |
      | 2      | Physics     |
    When I do the join business logic
    Then the table "results" contains
      | id:Int | student_name:String | subject_name:String |
      | 1      | Fred                | Maths               |
      | 2      | Sally               | Maths               |
      | 3      | Susan               | Physics             |

The input tables are parsed and converted to named tables in the SqlContext like this:

@given(u'a table called "{table_name}" containing')
def step_impl(context, table_name):
    df = table_to_spark(context.spark, context.table)
    df.createOrReplaceTempView(table_name)

The code under test, which is executed behind the When I do the join business logic step is pretty simple. Note that data is passed in and out based on named tables only. This convention-over-configuration approach makes the transform logic very easy to decouple from the read (extract) and write (load) operations.

@when(u'I do the join business logic')
def step_impl(context):
    df = context.spark.sql("""
        select s.id, s.name as student_name, c.name as subject_name
        from students s
        join subjects c on (s.subject_id == c.id)
    """)
    df.createOrReplaceTempView("results")

Automatic Data Generation

Writing Gherkin tables is boring, there's no two ways about it. Plus, sometimes you just need some rows - you don't want to dream up fake names and addresses for your test 'customer' table. So I wrote a few handy utilities for generating data automatically.

Wildcards

Here we want to test the age filtering, but don't care about the names of the students.

Scenario: Semi-random records
    Given a spark session
    And a table called "students" containing
      | name:String | age:Int |
      | %RAND%      | 9       |
      | %RAND%      | 10      |
      | %RAND%      | 10      |
    When I select rows from "students" where "age" greater than "10" into table "results"
    Then the table "results" has "2" rows
    And the value "%RAND%" is not present in the field "name" of table "results"

Random Rows

Here we just want 10 rows with a given schema. Random values are inserted into every cell.

  Scenario: Random records
    Given a spark session
    And a table called "random_students" containing "10" rows with schema
      | name       | type   | mode |
      | id         | int    | RAND |
      | name       | string | RAND |
      | subject_id | long   | RAND |
    Then the table "random_students" has "10" rows
    And the table "random_students" has "3" columns

Sequences

Particularly useful for ids. Here we just want some rows with a sequential integer id:

Scenario: Sequences
    Given a spark session
    And a table called "random_students" containing "3" rows with schema
      | name       | type   | mode |
      | id         | int    | SEQ  |
      | name       | string | RAND |
      | subject_id | long   | RAND |
    Then the min of field "id" in table "random_students" is "1"
    And the max of field "id" in table "random_students" is "3"
    And the sum of field "id" in table "random_students" is "6"

Sequences are also possible with strings. English words are used for sequence values ('one', 'two', 'three'...).

  Scenario: String based sequences
    Given a spark session
    And a table called "string_table" containing "1000" rows with schema
      | name       | type   | mode |
      | name       | string | SEQ  |
    Then the table "string_table" has "1000" rows
    And the value "one" is present in the field "name" of table "string_table"
    And the value "one thousand" is present in the field "name" of table "string_table"
    And the value "thirty-six" is present in the field "name" of table "string_table"

Slightly More Complex Example

In real life you might have lots of tables which need to be joined, filtered and aggregated to create the data you need. Here's an example of that - maybe not as complex as real life, but you get the idea...

Scenario: Three way join and filter: Hourly report on sales volume of strong beers in shops
    Given a spark session
    And a table called "beers" containing
      | id:int | name:string | abv:double |
      | 1      | Weak Beer   | 3.2        |
      | 2      | Strong Beer | 6.4        |
      | 3      | Medium Beer | 4.6        |
    And a table called "channels" containing
      | id:int | name:string |
      | 1      | Web         |
      | 2      | Shop        |
      | 3      | Pub         |
    And a table called "sales" containing "1000" rows with schema
      | name       | type   | mode       |
      | id         | int    | SEQ        |
      | beer_id    | int    | RAND(1-3)  |
      | channel_id | int    | RAND(1-3)  |
      | quantity   | int    | RAND(1-5)  |
      | hour       | int    | RAND(0-23) |
    When I generate a summary in table "my_summary" of beers over 5 percent, sold in the shop by hour
    And I execute the following SQL into table "test_results"
    """
    select * from my_summary where channel_name != "Shop" or beer_name != "Strong Beer"
    """
    Then the table "my_summary" is not empty
    And the table "test_results" has "0" rows

bdd-pyspark's People

Contributors

dantelore avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.