Giter Site home page Giter Site logo

azure / azure-cosmosdb-spark Goto Github PK

View Code? Open in Web Editor NEW
196.0 69.0 117.0 196.3 MB

Apache Spark Connector for Azure Cosmos DB

License: MIT License

Scala 81.45% Java 11.88% JavaScript 6.67%
azure-cosmos-db cosmos-db spark connector jupyter-notebook databricks azure-databricks lambda-architecture changefeed databricks-notebooks

azure-cosmosdb-spark's Introduction

NOTE: There is a new Cosmos DB Spark Connector for Spark 3 available

--------------------------------------------------------------------

The new Cosmos DB Spark connector has been released. The Maven coordinates (which can be used to install the connector in Databricks) are "com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.0.0"

The source code for the new connector is located here: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/cosmos/azure-cosmos-spark_3_2-12

A migration guide to change applications which used the Spark 2.4 connector is located here: https://aka.ms/azure-cosmos-spark-3-migration

The quick start introduction: https://aka.ms/azure-cosmos-spark-3-quickstart Config Reference: https://aka.ms/azure-cosmos-spark-3-config End-to-end samples: https://aka.ms/azure-cosmos-spark-3-sample-nyc-taxi-data/01_Batch.ipynb

---------------------------------------------------------------------

  Azure Cosmos DB Connector for Apache Spark

Build Status

azure-cosmosdb-spark is the official connector for Azure CosmosDB and Apache Spark. The connector allows you to easily read to and write from Azure Cosmos DB via Apache Spark DataFrames in python and scala. It also allows you to easily create a lambda architecture for batch-processing, stream-processing, and a serving layer while being globally replicated and minimizing the latency involved in working with big data.

Table of Contents

Jump Start

Reading from Cosmos DB

Below are excerpts in Python and Scala on how to create a Spark DataFrame to read from Cosmos DB

# Read Configuration
readConfig = {
  "Endpoint" : "https://doctorwho.documents.azure.com:443/",
  "Masterkey" : "<YourMasterKey>",
  "Database" : "DepartureDelays",
  "preferredRegions" : "Central US;East US2",
  "Collection" : "flights_pcoll",
  "SamplingRatio" : "1.0",
  "schema_samplesize" : "1000",
  "query_pagesize" : "2147483647",
  "query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()
Click for Scala Excerpt

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Configure connection to your collection
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "<YourMasterKey>",
  "Database" -> "DepartureDelays",
  "PreferredRegions" -> "Central US;East US2;",
  "Collection" -> "flights_pcoll",
  "SamplingRatio" -> "1.0",
  "query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
))

// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()

Writing to Cosmos DB

Below are excerpts in Python and Scala on how to write a Spark DataFrame to Cosmos DB

# Write configuration
writeConfig = {
 "Endpoint" : "https://doctorwho.documents.azure.com:443/",
 "Masterkey" : "<YourMasterKey>",
 "Database" : "DepartureDelays",
 "Collection" : "flights_fromsea",
 "Upsert" : "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()
Click for Scala Excerpt

// Configure connection to the sink collection
val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "<YourMasterKey>",
  "Database" -> "DepartureDelays",
  "PreferredRegions" -> "Central US;East US2;",
  "Collection" -> "flights_fromsea",
  "WritingBatchSize" -> "100"
))

// Upsert the dataframe to Cosmos DB
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

 

Requirements

Review supported component versions

Component Versions Supported
Apache Spark 2.2.1, 2.3.X, 2.4.X
Scala 2.11
Python 2.7, 3.6

 

Working with the connector

You can build and/or use the maven coordinates to work with azure-cosmosdb-spark.

Review the connector's maven versions

Spark Scala Latest version
2.4.0 2.11 azure-cosmosdb-spark_lkg_version
2.3.0 2.11 azure-cosmosdb-spark_2.3.0_2.11_1.3.3
2.2.0 2.11 azure-cosmosdb-spark_2.2.0_2.11_1.1.1
2.1.0 2.11 azure-cosmosdb-spark_2.1.0_2.11_1.2.2

Using Databricks notebooks

Please create a library using within your Databricks workspace by following the guidance within the Azure Databricks Guide > Use the Azure Cosmos DB Spark connector

Note, the Databricks documentation at docs.azuredatabricks.net is not up to date. Instead of downloading the six separate jars into six different libraries, you can download the uber jar from maven at https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.3.5/jar) and install this one jar/library.

Using spark-cli

To work with the connector using the spark-cli (i.e. spark-shell, pyspark, spark-submit), you can use the --packages parameter with the connector's maven coordinates.

spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"

Using Jupyter notebooks

If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure cell to specify the connector's maven coordinates.

{ "name":"Spark-to-Cosmos_DB_Connector",
  "conf": {
    "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5",
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
   ...
}

Note, the inclusion of the spark.jars.excludes is specific to remove potential conflicts between the connector, Apache Spark, and Livy.

Build the connector

Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

 

Working with our samples

Included in this GitHub repository are a number of sample notebooks and scripts that you can utilize:

 

More Information

We have more information in the azure-cosmosdb-spark wiki including:

Configuration and Setup

Troubleshooting

Performance

Change Feed

Monitoring

 

Contributing & Feedback

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

See CONTRIBUTING.md for contribution guidelines.

To give feedback and/or report an issue, open a GitHub Issue.

Apache®, Apache Spark, and Spark® are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

azure-cosmosdb-spark's People

Contributors

alfonsorr avatar aliuy avatar anfeldma-ms avatar arramac avatar bharathsreenivas avatar cjsingh8512 avatar dciborow avatar dennyglee avatar dependabot[bot] avatar fabianmeiswinkel avatar firemonk9 avatar heyyjudes avatar kevlangdo avatar khdang avatar manjeetchayel avatar microsoft-github-policy-service[bot] avatar mimig1 avatar moderakh avatar nican avatar nomiero avatar revinjchalil avatar sapinderpalsingh avatar snehagunda avatar tknandu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

azure-cosmosdb-spark's Issues

Invalid resource id 'CollectionName' while using spark-docdb sample

I am running into a “Invalid resource id ‘CollectionName’” exception when I try to execute a simple spark app with the spark-cosmosdb connector going against the local cosmosdb emulator -or- against azure instance of documentdb.
Is there something else I need to do other than what is mentioned in the documentation?
I am able to query the same db/collection using regular scala app (not via spark driver, but using DocumentClient).
My code reads as follows: (i have TestDb and TestCollection created)
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().appName(this.getClass.getName).getOrCreate()
val readConfig2 = Config(Map("Endpoint" -> https://localhost:8081, "Masterkey" -> "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", "Database" -> "TestDb", // “preferredRegions" -> "Central US;East US2;", "Collection" -> "TestCollection", "SamplingRatio" -> "1.0"))

// This line throws
val coll = sparkSession.sqlContext.read.cosmosDB(readConfig2)

Exception:

Java.lang.IllegalArgumentException: Invalid resource id TestCollection

            at com.microsoft.azure.documentdb.internal.ResourceId.parse(ResourceId.java:49)
            at com.microsoft.azure.documentdb.internal.PathsHelper.generatePath(PathsHelper.java:40)
            at com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache.getRoutingMapForCollection(PartitionKeyRangeCache.java:95)
            at com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache.access$000(PartitionKeyRangeCache.java:22)
            at com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache$1.call(PartitionKeyRangeCache.java:52)
            at com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache$1.call(PartitionKeyRangeCache.java:49)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:748)

Upsert documents

I'm using the below command to write documents from spark:

myDataframe.coalesce(40).write.mode(SaveMode.Append).documentDB(readConfig2)

This simply inserts documents into DocDB.

Is it possible to do an UPSERT with this connector (i.e. For each document, insert the doc when not exists, or updates the doc with the same id if it exists.)?

java.net.SocketException: Connection reset errors

When reading or writing to Cosmos DB from the Spark connector, would get intermittent java.net.SocketException: Connection reset errors significantly slowing the overall query / write performance for that one or two threads.

Writing a lot of documents is very slow

I am trying to write a lot of documents to DocDB from a data frame. However, I am getting a very bad performance. I checked DocDB metrics and the database can still handle a lot more ingress; it's just that Spark is not sending in the rows fast enough (I'm only seeing about 150-200 rows per second being written into DB but I have millions of rows in the dataframe).

As a comparison, I'm writing the same dataframe to azure blob with .write.mode(SaveMode.Overwrite).save(...) and it only takes 1 minute. Inserting these million rows to DocDB will take forever (since it only inserts 200 rows / min).

Is this a limitation of the connector, or am I doing something wrong. Here is some excerpt from my code:


    val config2 = Config(Map("Endpoint" -> "https://xxxx",
      "Masterkey" -> "xxxxxxx",
      "Database" -> "Statistics",
      "preferredRegions" -> "West US",
      "Collection" -> "Statistics",
      "SamplingRatio" -> "1.0"))

dataframeRows.write.mode(SaveMode.Append).documentDB(config2)

Sample of azure-documentdb-spark

I don't think the example provided in azure-documentdb-spark works. spark.sqlContext cannot be resolved (and even if it can be, should the example use SparkSession given that sqlContext has been deprecated?)

I'm still confused on how to use this to read and write to DocumentDB. Here's what I'm doing:

val documentDBSpark = DocumentDBSpark.load(sparkSession, readConfig2)
documentDBSpark.rdd.saveAsTextFile("DocumentDB/" + System.currentTimeMillis())

Interestingly, I'm getting the right number of documents (3 in my Doc DB) back but the content is empty.

[]
[]
[]

I don't think I'm using the right command. Could anyone please help advise?

Thank you.

Incompatible Jackson version: 2.8.3

Hi ,
I am using your recent version and I am facing version incompatibility issue during runtime.
below is the list of my included dependencies.
Can you look into this and help me to sort this out.
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "com.eed3si9n" % "sbt-assembly_2.8.1" % "sbt0.10.1_0.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % sparkVersion % "provided"
libraryDependencies += "com.microsoft.azure" % "azure-documentdb-spark" % "0.0.2"
libraryDependencies += "org.mongodb.scala" % "mongo-scala-driver_2.11" % "1.0.1"

Connector does not work with Spark 2.2

Note, the connector currently does not work with Spark 2.2. We will need to update the pom.xml and add to the releases folder the bits specific to Spark 2.2.

jars provided in releases directory don't seem to work.

We could not build jars using mvn clean package because of this issue https://github.com/Azure/azure-cosmosdb-spark/issues/48

So we tried using jars provided in releases directory and get this error while using example given in README.

This is how I use jars.

ubuntu@devesh-170:~/hadoop_experiment/spark-2.1.1-bin-hadoop2.7$ bin/spark-shell --jars /home/ubuntu/hadoop_experiment/azure-cosmosdb-spark/releases/azure-cosmosdb-spark-0.0.3_2.0.2_2.11/azure-cosmosdb-spark-0.0.3-SNAPSHOT.jar,/home/ubuntu/hadoop_experiment/azure-cosmosdb-spark/releases/azure-cosmosdb-spark-0.0.3_2.0.2_2.11/azure-documentdb-1.10.0.jar

This is the error -->

scala> val coll = spark.sqlContext.read.cosmosDB(readConfig2)
java.lang.NoClassDefFoundError: org/json/JSONException
  at java.lang.Class.getDeclaredFields0(Native Method)
  at java.lang.Class.privateGetDeclaredFields(Class.java:2509)
  at java.lang.Class.getDeclaredField(Class.java:1959)

How to query change feed?

Hi Team from the documentation of the changefeed we see that we can query using spark. is there any example that I can look into or if there is any other way to get the latest changes in the documendb.

Thanks,
Ankush Reddy.

Improve push down predicates

Make better use of DocumentDB's native capabilities (e.g. aggregations, ORDER BY, LIMIT, etc.) so a more optimized dataset is returned to Apache Spark. For example:

  • For cumulative aggregations, DocumentDB should return those aggregations and Spark should then aggregate accordingly.
  • For ORDER BY, DocumentDB can return the data ordered
  • For LIMIT, the connector should execute a TOP query to DocumentDB

Schema-less query bug

Incorrect data types will result in Apache Spark returning an exception. For example, when querying the DoctorWho database of flight information, the below query executes properly:

SELECT c.delay+1 FROM c WHERE c.id in ('test', 'test1', 'test2', 'test3')

But the following query results in an error:

SELECT c.delay+1 FROM c WHERE c.id in ('test', 'test1', 'test2', 'test3', 'test4')

with the error being:

Caused by: java.lang.NumberFormatException: For input string: "abc"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Integer.parseInt(Integer.java:580)
  at java.lang.Integer.parseInt(Integer.java:615)
  at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)

The exception is number format exception. The reason is because the document with id ‘test4’ has a string value for ‘delay’ property instead of a number.

Save data to DocumentDB

The 0.0.1 version of azure-documentdb-spark can read from DocumentDB; we also need to save data from Apache Spark back to DocumentDB.

`mvn clean package` fails with missing gremlin artifacts

ubuntu@devesh-170:~/hadoop_experiment/azure-cosmosdb-spark$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building azure-cosmosdb-spark 0.0.3-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[WARNING] The POM for org.apache.tinkerpop:spark-gremlin:jar:3.3.0-SNAPSHOT is missing, no dependency information available
[WARNING] The POM for org.apache.tinkerpop:tinkergraph-gremlin:jar:3.3.0-SNAPSHOT is missing, no dependency information available
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.327s
[INFO] Finished at: Tue Jul 11 12:45:13 PDT 2017
[INFO] Final Memory: 14M/173M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project azure-cosmosdb-spark: Could not resolve dependencies for project com.microsoft.azure:azure-cosmosdb-spark:jar:0.0.3-SNAPSHOT: The following artifacts could not be resolved: org.apache.tinkerpop:spark-gremlin:jar:3.3.0-SNAPSHOT, org.apache.tinkerpop:tinkergraph-gremlin:jar:3.3.0-SNAPSHOT: Could not find artifact org.apache.tinkerpop:spark-gremlin:jar:3.3.0-SNAPSHOT -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.