Giter Site home page Giter Site logo

project-fortis-spark's Introduction

This repository is outdated and was migrated to project-fortis.




Travis CI status

project-fortis-spark

A repository for Project Fortis's data processing pipeline, built on Apache Spark.

What's this?

This project contains a Spark Streaming job that ingests data into the Fortis system. Specifically, we:

  1. Ingest data in real time from sources such as Twitter, Facebook, Online Radio, Newspapers, Instagram, TadaWeb, and so forth.
  2. Analyze and augment the raw data with intelligence like sentiment analysis, entity extraction, place recognition, or image understanding.
  3. Narrow down the stream of events based on user-defined geo-areas, target keywords and blacklisted terms.
  4. Perform trend detection and aggregate the metrics that back Project Fortis.

At the end of the ingestion pipeline, we publish the events and various aggregations to Cassandra.

Development setup

# set up variables from deployment environment
export HA_PROGRESS_DIR="..."
export APPINSIGHTS_INSTRUMENTATIONKEY="..."
export FORTIS_FEATURE_SERVICE_HOST="..."
export FORTIS_MODELS_DIRECTORY="..."
export FORTIS_CENTRAL_ASSETS_HOST="..."
export FORTIS_SERVICEBUS_NAMESPACE="..."
export FORTIS_SERVICEBUS_CONFIG_QUEUE="..."
export FORTIS_SERVICEBUS_POLICY_NAME="..."
export FORTIS_SERVICEBUS_POLICY_KEY="..."

# compile scala, run tests, build fat jar
export JAVA_OPTS="-Xmx2048M"
sbt assembly

# run on spark
spark-submit --driver-memory 4g target/scala-2.11/project-fortis-spark-assembly-0.0.1.jar

project-fortis-spark's People

Contributors

c-w avatar erikschlegel avatar jcjimenez avatar kevinhartman avatar nathanielrose avatar smarker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

project-fortis-spark's Issues

Cassandra - Add Cassandra Ingestion Tables

Define table schemas in cassandra to support data ingestion for the following features.

  • Topics
  • Configuration Factories
  • Aggregated Tiles
  • Fortis Events
  • Trusted profiles
  • Facebook Pages
  • Site Configuration Settings

What does complete mean
Creating cassandra tables that support the features mentioned above. These tables should be created at site creation time as part of the deployment process.

Spark Pipeline - Integrate Cassandra Site Config Schema with Spark Filter

This task encompasses the following features.

Implement the Spark integration with the site configuration definition sourced from Cassandra.

The site definition covers the following categories.

  • Topic Whitelist
  • Topic Blacklist
  • The geofence defined in the fortis site config
  • Facebook pagelist
  • Trusted twitter profile

The Topic whitelist should be used for the Facebook, Twitter, Bing and Instagram data connector filters. The geofence should be used for the Facebook, Twitter and Instagram data streaming connector. We need to utilize broadcast variables to help keep the site configuration data structures in Spark synchronized when they're modified in the admin screen of fortis.

The filter will need to be re-applied following the completion of the feature extraction processing.

Spark Documentation

We need to give some much needed TLC to this repos README . The readme should provide clear guidance for new Fortis developers.

Documentation should cover the following criteria

  • 1. Install Spark on a local machine. This should include instructions on where to go for the install and what version a dev should use
  • 2. all the necessary steps for setting up the project. This should mention that we're dependent on IntelliJ(specify versions and download locations). This should also mention what repo to clone, how to create a new project in Intelli, and what project file to link to. What version of SBT and Scala that needs to be installed.
  • Please include as much screen shots as necessary
  • 3. Steps for installing cassandra locally. This should also include steps for starting cassandra
  • 4. Steps for setting up all the cassandra objects on their local instance
  • 5. Steps for testing the Cassandra Spark aggregation push
  • 6. Steps for running / debugging the pipeline on IntelliJ
  • 7. Steps for running all the unit tests (edited)
  • 8. Steps for setting up the kube proxy to monitor the spark job. This should entail URL links, etc
  • 9. Steps for pushing new changes to the pipeline(increment sbt version, cut a new release in GH, links to the travis build, instructions on force starting the build, etc).
  • 10. Deployment of new release to Kubernetes pod.
  • 11. Environment variables that need to be set. We should also have sample site config and trustedsource record insert statements that we can refer developers to.

Add pipeline for custom events

Fortis-v1 has a feature where users can upload custom events and have them analyzed via the Fortis pipeline. See publishEvents for reference.

If we want to support this feature in Fortis-v2, we need to add a pipeline via which to ingest these custom events.

One possible implementation of this is to use EventHub with the Spark pipeline acting as a consumer and the resolver that implements publishEvents acting as a producer.

We already have code to interface with EventHub from Spark (EventHubStreamFactory) so it shouldn't take more than 1-1.5 days to build all the required pieces:

(1) Define the schema for the custom events
(2) Define a name for the custom events EventHub
(3) Add a new pipeline that processes events from EventHub
(4) Set up a dummy EventHub in our Azure subscription for testing

Spark Pipeline - Add Kryo as our serialization interface

We should avoid using the default serialization interface that's shipped with spark due to performance issues. Kryo seems to be the industry standard serialization solution across the Java community.

What does complete mean
Add the kryo jar as a dependency in our build, and configure the serialization interface with SparkContext. This work also includes the unit testing and performance tuning with the library.

GraphQL - Create GraphQL Service Abstraction

This is meant to be a thin wrapper for integrating the newly modified graphql service resolver response to translate to the data structure that the V1 fortis dashboard expects.

what complete means
Integrate this abstraction layer into fortis-interfaces services.js file.

Spark Pipeline - Add AppInsights

Goal: Add appinsights to increase visibility within the spark pipeline.

What does complete mean:

  • Add event tracking with properties and metrics
  • Log fatal exceptions
  • Add appinsights webhooks to provide Slack notifications on fatal exceptions.

Integrate Lucene / Disable OpenNER

Problem to address: We're using OPenNER for our NLP solution for keyword and place detection in Fortis. Through running the pipeline in high volume scenarios, we're noticing the performance of OPenNER ranging anywhere from ~100 - 500ms per event. This is causing the batch runtime to exceed the spark streaming interval, hence fortis crashing after an hour.

Solution: We decided to integrate Lucene as our in-memory search engine for detecting mentioned keywords / places.

Implementation

  • Add the 3 Lucene core jars dependencies into the Fortis build.
  • Index initial DStream pipeline events in Lucene.
  • Build the 2 Lucene search queries off of the geofenced list of places and the watchlist keyword terms. The keyword search should support terms across all supported languages. We should leverage the 30+ available language models that come pre-packaged through Lucene https://lucene.apache.org/core/7_0_0/analyzers-common/index.html .
  • The results of the Lucene index search should be ranked on search relevancy, weighted by mention count.

ML - Trend Detection Cassandra Ingestion

Create new cassandra table to hold the resulting dataset from the trend detection linear regression model output. The completion of this task entails 1) standing up the cassanda trends table and 2) aggregating the model output dataset into cassandra.

ML - Trend Detection Modelling

Add linear regression MMLSpark models for trend detection analysis.

The completion of this task entails integrating an MMLSpark linear regression model with the Fortis topical label training data based on the aggregated mention count time series dataset,

Local Environment Health-Check Scripts & ReadMe

Create a health-check script for Windows & Linux that verifies all correct Fortis dependencies are properly installed.

Script should include the following criteria

  • 1. java & javac health check for v1.8 or greater and OpenJDK version.
  • 2. mvn check for v3.0 or greater
  • 3. node & npm check for v5.0 and v6.0 respectively.
  • 4. scala check for v2.0 or greater.
  • 5. sbt check for v 0.13 or greater.
  • 6. cassandra check for v3.0 or greater
  • 7. spark path check and verify v2.7 or greater.
  • 8. kubectl check .
  • 9. helm validation.
  • 10. intellij validation
  • 11. kube config validate it exists and non-empty.
  • 12. Redirect additional issues to Fortis Docs for guidance

GraphQL- Add Crisis Lex integration

Integrate the Crisis lex lexicon category list to fortis as the default topic list that's setup when a site is first deployed.

The source lexicon list should be sourced from the data directory of the crisis lex GH repo https://github.com/sajao/CrisisLex/tree/master/data .

What complete means

  • Normalize all the datasets available from lexicon repo into a unique, consolidated and conditioned dataset available on our blob storage container which is currently hosting our sentiment models.
  • This should be organized in a directory structure following the convention /fortis/{type}/defaultTopics/
  • modify the createOrReplaceSite graphql endpoint to accept the siteType input parameter and reference all topics for the input {type} from blob storage.
  • Push one record in the Topics table in Cassandra for each topic sourced from our blob storage container.

Filter retweets

We should remove any events marked as a retweet through the Twitter Analyzer

Currently, we're processing messages marked as retweets, which is making up a large number of the Fortis events. This creates duplicative events, and floods our spark streaming job with a high volume of content.

We should enhance the Twitter Streaming Factory and filter out retweets, similar to the exiting profile filter that's in place.

Duplicates in keyword extraction

If a keyword/phrase appears multiple times in the event text, the the list of extracted keywords will contain an entry of the keyword/phrase for each occurrence. An AnalyzedItem's tags field (along with several other collection-typed fields) should be a Set. Once that's done, the keyword extractor's interface should be made to work with Sets instead of Lists.

Implement ConfigurationManager interface for Cassandra

The ConfigurationManager provides configuration data used for:

  • Updating the attached streams/connections that feed the Fortis pipeline.
  • Updating transform utilities and broadcasted data (i.e. whitelist and blacklist) used by analysis.

This task captures its implementation for reading from Cassandra.

Spark Pipeline - Aggregate pipeline data into geotiles in Cassandra

Transform the filtered normalized results from the streaming connector pipeline and aggregate into the tiles cassandra table which follows the attached schema.

What does complete mean
Results are aggregated by the composite key as outlined in the attachment and written to cassandra. This also involves

tile_schema

Expose Streams via GraphQL

In v1, Fortis had GraphQL endpoints for Facebook and Twitter stream settings. Now that there are more possible stream connections (e.g. Bing, Reddit), there needs to be a way to perform CRUD operations for these new streams.

Non-base language events are being dropped in the pipeline

It looks like we're writing a mix of both english and spanish terms to cassandra. For example, if ataque is a watchlist term for a Fortis site, where the primary language is spanish with english translation support.
If ataque is mentioned in a spanish tweet we archive that term in cassandra. We do the same if attack is mentioned in an english tweet. A data sample is listed below. This presents a problem in the Fortis interface as the services expect content to be aggregated based on terms in the base language. We need to enhance the keyword extraction analyzer to properly normalize this where attack is detected as ataque.

      month | ataque |                   |                   |    11 |     Twitter |    EmbVZLA_enEsp |   11_917_648 | 2017-10-01 00:00:00.000000+0000 |                     0 |            1
        day | ataque |                   |                   |     6 |     Twitter |  RaicesPeronista |      6_28_20 | 2017-10-03 00:00:00.000000+0000 |                     0 |            1
        day | ataque |                   |                   |    13 |     Twitter |          sutpmcu | 13_3670_2592 | 2017-10-03 00:00:00.000000+0000 |                     0 |            1
      month |       attack |                   |                   |    13 |     Twitter |              all | 13_3670_2581 | 2017-10-01 00:00:00.000000+0000 |                     0 |            1
       hour |       attack |                   |                   |    13 |     Twitter |              all | 13_3670_2581 | 2017-10-03 01:00:00.000000+0000 |                     0 |            1

Enable checkpointing

Some boilerplate code setup is required to enable Spark Streaming checkpointing.

Research is also required to determine how we can guarantee "at most once" behavior of incoming events. It's likely that individual connectors will need to support saving their place in respective streams in HDFS storage.

Spark Pipeline - Persist fortis events into Cassandra

Fortis events that are transformed and passed the logical filters should be persisted into cassandra abiding by the attached schema.

What does complete mean
completing the spark streaming integration layer to write scala-based fortis event objects directly into our Cassandra cluster.

fortis-message-schema

Dashboard - Refactor GraphQL Services to support cassandra schema

The GraphQL schema and resolvers will need to be refactored to support reading / writing it's data from Cassandra. Currently, our graphql servers read / write data from azure table storage and postgres. This needs to be refactored to read / write from only cassandra. Postgres will only be used for news feed queries to collect the feature ids that intersect with the users current map bounds from the heatmap.

We should reorganize the existing schema to consolidate the flow-based type definitions.

What complete means
Here's a listing of the schemas that will need to be refactored

  • Edges Schema(used for terms and locations) - This will also need to support both entities, data source and source publisher.
  • Messages Schema
  • Facts Schema should be consolidated into the Messages Schema
  • Settings Schema
  • Tiles Schema
  • Aggregated Edges - popular locations, popular terms, etc

A listing of the refactored resolvers that will need to be adjusted

  • Edges: terms, locations, removeKeywords, addKeywords, saveLocations, removeLocations, popularLocations, timeSeries and topSources
  • Facts(should be consolidated into Messages): list & get
  • Messages: byBbox, byLocation, byEdges, eventDetails, publishEvents, translate and translateWords
  • Settings(ie the admin interface services): sites, createOrReplaceSite, modifyFacebookPages, modifyTrustedTwitterAccounts, removeFacebookPages, removeTrustedTwitterAccounts, modifyTwitterAccounts, removeTwitterAccounts, twitterAccounts, trustedTwitterAccounts, facebookPages, facebookAnalytics, termBlacklist, modifyBlacklist, removeBlacklist
  • Tiles: fetchTilesByBBox, fetchTilesByLocations, fetchPlacesByBBox, fetchEdgesByLocations and fetchEdgesByBBox

Submit fortis package to maven

It looks like the artifacts for fortis-spark (and possibly others) aren't making it into maven. Definition of done: all artifacts required for one-click deploy should be published to maven.

Add trusted sources to ConnectorConfig

The trustedsources table in Cassandra lists pages that the user wishes to watch. Each entry specifies the connector (i.e. "InstagramTag", "InstagramPost", "TwitterPost") that the page is applicable to. For some connectors, the list of pages to watch is needed when creating the stream.

This task captures the addition of trusted sources to ConnectorConfig so that StreamFactorys can use the list of trusted sources when creating streams/connections.

Add terms blacklist

We need to add a feature to our spark pipeline that allows us to drop any events that contain certain keywords.

The implementation can be as simple as creating a Pipeline filter that uses Tokenizer to split the title/body of the Fortis events into words and then checks each word against a blacklist set.

Like the other shared settings, the keyword blacklist should be populated from the fortis.blacklist table in Cassandra and published to the workers as a broadcast variable.

As mentioned in project-fortis-spark#17, this is required to implement the {modify,remove}Blacklist mutation in the SettingsSchema.

Expand Twitter filtering options

Back in May, we added support for more Twitter stream filtering options to the streaming-twitter package. We should integrate these enhancements with our TwitterStreamFactory.

Specifically, we need the following abilities:

  1. Filter by profile name
  2. Filter by locations

The Apache Bahir release process is quite slow, so if by the time that someone starts working on this, there still isn't a new release greater than 2.1.0, we'll have to use a custom build of the streaming-twitter package and drop it into our lib folder until the official release is done.

As mentioned in project-fortis-spark#17, this is a requirement for implementing the {save,remove}Locations mutation in the EdgesSchema.

Recreate SparkContext if job is cancelled unexpectedly

We hit an out of memory issue due to unstable streaming which resulted in Spark cancelling the job. The driver kept running with a stopped context. We need to recreate the context in the event that it's stopped unexpectedly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.