Giter Site home page Giter Site logo

sevakavet / spark-session-enricher Goto Github PK

View Code? Open in Web Editor NEW
1.0 2.0 0.0 11 KB

Calculate user sessions & stats on top of them for imaginary ecom site using Spark sql & aggregations

Scala 100.00%
scala spark spark-sql spark-dataset spark-dataframes scala-spark ecommerce petproject pet-project sessionize

spark-session-enricher's Introduction

spark-session-enricher

Enrich events with user sessions & stats on top of them for imaginary ecom site using Spark sql & aggregations.

Config

{
  session = 5 minute
  debug = false
  topN = 10
  inputFile = "example1.csv"
}

session - max amount of time between consecutive events in user session

debug - if true, output will be printed to console rather than dumped to file

topN - number of products to calculate for each category, sorted by time users spent on products page

inputFile - name of input file

SessionEnricherAggregator

This file contains implementation based on Spark Aggregator. The idea is to group incoming events by userId and category, then aggregate each group using SessionAggregator, which has following signature:

class SessionAggregator extends Aggregator[Event, List[SessionBuffer], List[EventSession]]

Input type is Event - raw data, read from input file. Second type parameter stands for buffer class - List[SessionBuffer]. Finally, the last type parameter - List[EventSession], stands for output class.

case class Event(category: String, product: String, userId: String, eventTime: Timestamp, eventType: String)

case class SessionBuffer(events: List[Event], sessionId: String, sessionStartTime: Timestamp, sessionEndTime: Timestamp)

case class EventSession(category: String, product: String, userId: String, eventTime: Timestamp, eventType: String,
                          sessionId: String, sessionStartTime: Timestamp, sessionEndTime: Timestamp)

SessionAggregator implements a few methods, let's take a look at most important ones: reduce and merge.

Reduce

As far as data that comes to reduce method cannot be guarantied to be sorted, it implements an algorithm that does not rely on the order.

Reduce method has following signature:

reduce(b: List[SessionBuffer], a: Event): List[SessionBuffer]

Let's describe all possible states of b:

  1. empty
  2. already contains some data

In #1 we simply add event to buffer, creating a new session. In #2 we go through all sessions and find any that current event a can belong to. That means if current session is s, event is e, then:

e.eventTime in (s.sessionStartTime - N, s.sessionEndTime + N)

where N is session from configuration file described above. If session found, event can be added to events list of that session, sessionStartTime and sessionEndTime are recalculated. In other case we just create a new session.

Merge

Reduced data is passed to merge method, it has following signature:

merge(b1: List[SessionBuffer], b2: List[SessionBuffer]): List[SessionBuffer]

Let's consider a situation when simple concatenation of arguments will not work. Suppose, we have following list of events (only eventTimes for simplicity):

e = [35, 40, 45]

Event 35 comes first, session buffer is empty, new session will be created. Then, 45 comes, session buffer is not empty, but those events are too far away from each other, new session will be created again. Finally, 40 comes, it can belong to both sessions created earlier, but will only be added to one of them. As a result, we'll have two session:

(35, 40) and (45, 45)

The idea of merge method is to:

  1. concatenate incoming list of buffers
  2. sort sessions by sessionStartTime
  3. merge sessions like the ones described above

SessionEnricherSql

This file contains implementation based on Spark SQL.

There're two methods for calculations user sessions: eventWithSession and eventWithProductSession. Rest of the methods used for calculating statistics:

  1. For each category find median session duration
  2. For each category find # of unique users spending less than 1 min, 1 to 5 mins and more than 5 mins
  3. For each category find top 10 products ranked by time spent by users on product pages

eventWithSession

As far as this implementation should group consecutive events standing not further away from each other then session minutes, we need to somehow calculate that time difference. Main idea is to use window over userId and category to calculate previous eventTime. Next step would be calculating isNewSession - column, indicating whether a new sessions has started or old one continued. Finally, we sum up those isNewSession , getting sequential id as a sessionId.

eventWithProductSession

This method is used for calculating topN products. In this case session contains all consecutive events of a user, making some actions on a product page. Idea is similar to eventWithSession with main difference that previous product is calculated instead of eventTime.

medianSessionDuration

For calculating median session duration among each category, let's do following:

  1. group events by category and sessionId
  2. sum up sessionDuration - (sessionEnd - sessionStart)
  3. calculate percentile_approx(sessionDuration, 0.5)

userSpentTimeStat

For each category find # of unique users spending:

  1. less than 1 min
  2. 1 to 5 mins
  3. more than 5 mins.

Let's do following:

  1. group events by category and sessionId
  2. sum up sessionDuration - (sessionEnd - sessionStart)
  3. use countDistinct and when functions for implementing all 3 cases described above.

topNProducts

For each category find top 10 products ranked by time spent by users on product pages. Let's do following:

  1. group events by category and product
  2. sum up sessionDuration - (sessionEnd - sessionStart)
  3. rank records using dense_rank() over category ordered by duration

spark-session-enricher's People

Contributors

sevakavet avatar

Stargazers

 avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.