Giter Site home page Giter Site logo

weaviate / spark-connector Goto Github PK

View Code? Open in Web Editor NEW
33.0 13.0 12.0 336 KB

Weaviate connector for Apache Spark

Home Page: https://weaviate.io/developers/weaviate/tutorials/spark-connector

License: BSD 3-Clause "New" or "Revised" License

Scala 76.69% Dockerfile 0.11% Shell 1.30% Python 21.90%
spark vector-search weaviate

spark-connector's Introduction

Weaviate Spark Connector

For use in Spark jobs to populate a Weaviate vector database.

Status: Alpha, data might not always get written to Weaviate so verify your data was actually written to Weaviate.

Installation

You can choose one of the following options to install the Weaviate Spark Connector:

Download JAR from GitHub

You can download the latest JAR from GitHub releases here.

Building the JAR yourself

To use in your own Spark job you will first need to build the fat jar of the package by running sbt assembly which will create the artifact in ./target/scala-2.12/spark-connector-assembly-1.2.8.jar

Using the JAR in Spark

You can configure spark-shell or tools like spark-submit to use the JAR like this:

spark-shell --jars spark-connector-assembly-1.3.2.jar

Using the JAR in Databricks

To run on Databricks simply upload the jar file to your cluster in the libraries tab as in the below image.

After installation your cluster page should look something like this.

Using Maven Central Repository

You can also use Maven to include the Weaviate Spark Connector as dependency in your Spark application. See here.

Using sbt

Running cross versions tests:

sbt -v +test

Building Scala 2.12 and Scala 2.13 binaries:

sbt +assembly

Usage

First create a schema in Weaviate as the connector will not create one automatically. See the tutorial for how to do this.

Afterwards loading data from Spark is as easy as this!

(
    my_df
    .write
    .format("io.weaviate.spark.Weaviate")
    .option("scheme", "http")
    .option("host", weaviate_host)
    .option("className", "MyClass")
    .mode("append")
    .save()
)

If you already have vectors available in your dataframe (recommended for batch insert performance) you can easily supply them with the vector option.

(
    my_df
    .write
    .format("io.weaviate.spark.Weaviate")
    .option("scheme", "http")
    .option("host", weaviate_host)
    .option("className", "MyClass")
    .option("vector", vector_column_name)
    .mode("append")
    .save()
)

By default the Weaviate client will create document IDs for you for new documents but if you already have IDs you can also supply those in the dataframe.

(
    my_df
    .write
    .format("io.weaviate.spark.Weaviate")
    .option("scheme", "http")
    .option("host", weaviate_host)
    .option("className", "MyClass")
    .option("id", id_column_name)
    .mode("append")
    .save()
)

For authenticated clusters such as with WCS the apiKey option can be used. Further options including OIDC and custom headers are listed in the tutorial.

(
    my_df
    .write
    .format("io.weaviate.spark.Weaviate")
    .option("scheme", "https")
    .option("host", "demo-endpoint.weaviate.network")
    .option("apiKey", WEAVIATE_API_KEY)
    .option("className", "MyClass")
    .option("batchSize", 100)
    .option("vector", vector_column_name)
    .option("id", id_column_name)
    .mode("append")
    .save()
)

Write Modes

Currently only the append write mode is supported. We do not yet support upsert or error if exists write semantics.

Both batch operations and streaming writes are supported.

Spark to Weaviate DataType mappings

The connector is able to automatically infer the correct Spark DataType based on your schema for the class in Weaviate. Your DataFrame column name needs to match the property name of your class in Weaviate. The table below shows how the connector infers the DataType:

Weaviate DataType Spark DataType Notes
string StringType
string[] Array[StringType]
int IntegerType Weaviate only supports int32 for now. More info here.
int[] Array[IntegerType]
boolean BooleanType
boolean[] Array[BooleanType]
number DoubleType
number[] Array[DoubleType]
date DateType
date[] Array[DateType]
text StringType
text[] StringType
geoCoordinates StringType
phoneNumber StringType
blob StringType Encode your blob as base64 string
vector Array[FloatType]
cross reference string Not supported yet

Please also take a look at the Weaviate data types docs and the Spark DataType docs.

Developer

Compiling

This repository uses SBT to compile the code. SBT can be installed on MacOS following the instructions here.

You will also need Java 8+ and Scala 2.12 installed. The easiest way to get everything set up is to install IntelliJ.

To compile the package simply run sbt compile to ensure that you have everything needed to run the Spark connector.

Running the Tests

The unit and integration tests can be run via sbt test.

The integration tests stand up a local Weaviate instance running in docker and then run the Apache Spark code in a separate docker container. You will need to have docker running to run all tests.

Trying it out Locally in Docker

sbt assembly
docker build -t spark-with-weaviate .
docker run -it spark-with-weaviate /opt/spark/bin/spark-shell
case class Article (title: String, content: String)
val articles = Seq( Article("Sam", "Sam")).toDF
articles.write.format("io.weaviate.spark.Weaviate")
  .option("scheme", "http")
  .option("host", "localhost:8080")
  .mode("append").save()

spark-connector's People

Contributors

aliszka avatar antas-marcin avatar domchan avatar dusanmilunovic avatar fctczi avatar nemanja-m avatar sam-h-bean avatar samos123 avatar svalbuena avatar trengrj avatar zainhas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-connector's Issues

Class not found - catalog.TableProvider

Hey guys, my company uses spark 2.4, and when I am trying to write to weviate the following error eccurs:

Class not found : org.apache.spark.sql.connector.catalog.TableProvider

Is it because of my spark version? Or I'm doing something wrong

Add support for uploading vector from Spark directly

proposed UX:

case class Article (title: String, content: String, wordCount: Int, vectorCol: Array[Float] )
val articles = Seq( Article("Sam", "Sam and Sam", 3, Array(0.01f, 0.02f))).toDF

articles.write
  .format("io.weaviate.spark.Weaviate")
  .option("scheme", "http")
  .option("host", "localhost:8080")
  .option("className", "Article")
  .option("vector", "vectorCol")
  .mode("append")
  .save()

Do you normally express a vector as Array[Float] in Spark Scala? and how would you express it in pyspark?

pyspark with scala 2.13 jar doesn't work

This error will be thrown if you use the 2.13 fat jar with pyspark:

E                   py4j.protocol.Py4JJavaError: An error occurred while calling o50.save.
E                   : java.lang.NoSuchMethodError: 'scala.collection.convert.AsScalaExtensions$ListHasAsScala scala.jdk.CollectionConverters$.ListHasAsScala(java.util.List)'
E                       at io.weaviate.spark.Weaviate.inferSchema(Weaviate.scala:23)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:90)
E                       at org.apache.spark.sql.DataFrameWriter.getTable$1(DataFrameWriter.scala:280)
E                       at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:296)
E                       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)

Named vectors support

Hi,

we're trying bring our own vectors into Weaviate using the new named vectors functionality (https://weaviate.io/developers/weaviate/release-notes/880_1_24). From looking at the doc it doesn't seem like it's supported yet, as right now only one vector can be referenced through .option("vector", vector_column_name). Is that right? And if so, are there plans to support them at some point?

Thanks in advance and thank you for maintaining this ๐Ÿ™

Cheers, Olivier

Support writing a dataframe that only contains subset of Weaviate class

Assume you have Weaviate class:

Article (title: String, content: String)

However your Spark dataframe also has an author column then I should still be able to write a the dataframe to Weaviate as long as it has the title and content columns.

This might simply not be possible due to the way Spark DataSource and DataWriter interfaces are implemented

fail if id is not a valid uui

currently there is seems to be a silent failure, instead an error message should be shown to the user that the specified ID is not a valid UUID

Create integration tests for pyspark

This issue was reported:

Caused by: java.lang.NegativeArraySizeException
	at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:292)
	at org.apache.spark.unsafe.types.UTF8String.getByteSlice(UTF8String.java:225)
	at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1430)
	at io.weaviate.spark.WeaviateDataWriter.$anonfun$getValueFromField$1(WeaviateDataWriter.scala:91)

It's suspected that it might only be reproducible with pyspark

things to test:

  • array that is null
  • array that is empty
  • streaming write

another issue reported:

org.apache.spark.sql.catalyst.util.ArrayData.toArray(ArrayData.scala:173)org.apache.spark.sql.catalyst.util.ArrayData.toObjectArray(ArrayData.scala:166)io.weaviate.spark.WeaviateDataWriter.getValueFromField(WeaviateDataWriter.scala:91)io.weaviate.spark.WeaviateDataWriter.$anonfun$buildWeaviateObject$1(WeaviateDataWriter.scala:61)io.weaviate.spark.WeaviateDataWriter.$anonfun$buildWeaviateObject$1$adapted(WeaviateDataWriter.scala:57)io.weaviate.spark.WeaviateDataWriter$$Lambda$2105/318474967.apply(Unknown Source)scala.collection.immutable.List.foreach(List.scala:431)io.weaviate.spark.WeaviateDataWriter.buildWeaviateObject(WeaviateDataWriter.scala:57)io.weaviate.spark.WeaviateDataWriter.write(WeaviateDataWriter.scala:19)io.weaviate.spark.WeaviateDataWriter.write(WeaviateDataWriter.scala:14)org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:452)org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$$Lambda$1808/1209048787.apply(Unknown Source)org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1730)org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:490)org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:391)org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$1005/1803912068.apply(Unknown Source)org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)org.apache.spark.scheduler.ResultTask$$Lambda$1006/401748673.apply(Unknown Source)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)org.apache.spark.scheduler.ResultTask$$Lambda$988/1327853449.apply(Unknown Source)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)org.apache.spark.scheduler.Task$$Lambda$970/1059610517.apply(Unknown Source)com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)org.apache.spark.scheduler.Task$$Lambda$956/1640529726.apply(Unknown Source)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.scheduler.Task.run(Task.scala:96)org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)org.apache.spark.executor.Executor$TaskRunner$$Lambda$954/1880351682.apply(Unknown Source)org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)org.apache.spark.executor.Executor$TaskRunner$$Lambda$917/146603570.apply$mcV$sp(Unknown Source)scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)java.lang.Thread.run(Thread.java:750)

Build Idempotence Into Batch Write

In the event that a record fails mid-batch then the partition will be retried which is going to lead to duplicate data in weaviate if users do not bring their own ID. We should build a dedupe mechanism so users don't end up with document dupes.

Spark Connector has errored json/gson after certain batch size.

Hi all,

I'm using the spark connector to import nearly 200M records. While I'd like to use bigger batches and make use of asynchronous importing from weaviate version 1.22, the spark connector seems to have issues in handling batch sizes beyond 200. Specifically, when going beyond 200, I often see errors like the following:


reason=ExceptionFailure(io.weaviate.spark.WeaviateResultError,error getting result and no more retries left. Error from Weaviate: [WeaviateErrorMessage(message=java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $, throwable=com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $), WeaviateErrorMessage(message=Failed ids: 42946687-9c7b-5a99-b5a5-60f2216e894d,...

Any help would be appreciated!

Exporting more then 100 object from dataframe to weaviate gives WeaviateErrorMessage(message=invalid param 'objects': cannot be empty, need at least one object for batching)

The line of code below triggers an error if the number of objects we want to write is greater then 100. Below when I try to export 10000 object into weaviate from spark I get an error.

df.limit(10000).withColumnRenamed("id", "uuid").write.format("io.weaviate.spark.Weaviate") \ .option("batchSize", 200) \ .option("scheme", "http") \ .option("host", "localhost:8080") \ .option("id", "uuid") \ .option("className", "Sphere") \ .option("vector", "vector") \ .mode("append").save()

This usage of the connector results in the following error:

23/01/26 09:56:44 ERROR WeaviateDataWriter: batch error: [WeaviateErrorMessage(message=invalid param 'objects': cannot be empty, need at least one object for batching)] 23/01/26 09:56:46 ERROR WeaviateDataWriter: batch error: [WeaviateErrorMessage(message=invalid param 'objects': cannot be empty, need at least one object for batching)] 23/01/26 09:56:48 ERROR WeaviateDataWriter: batch error: [WeaviateErrorMessage(message=invalid param 'objects': cannot be empty, need at least one object for batching)] 23/01/26 09:56:48 ERROR Utils: Aborting task
image

Document apiKey option

I had to dig through the source code to figure out how to pass in an api key for weaviate cloud. It should be added to the README or docs:

.option("apiKey", WV_TEST_API_KEY)

Getting a `java.lang.NullPointerException` error, which is not very informative

I posted a relatively detailed reproduction for this error in the Weaviate support slack channel, but what I'm seeing is that (with relatively large images), I get a java.lang.NullPointerException for many of the tasks. This seems to be caused by my weaviate cluster not being able to keep up with spark, which I'm going to work on separately, but it would be helpful to have a more informative error when I run into this.

Make format "io.weaviate.spark"

The format to write is currently io.weaviate.spark.Weaviate which is a little wonky. Would be good to try to maintain convention and figure out how to just shave it down to io.weaviate.spark.

Implement DataSource inferSchema with datatype mappings

inferSchema needs to map Weaviate datatypes to Spark datatypes

Weaviate has a list of datatypes for each property which is used for crossReferences, however in all other cases there is only 1 datatype per property. So the general case is to just take the 1st item of the list of datatypes.

Question: How should map CrossReference to Spark datatype? Should we just ignore them?

data is not written out successfully

I tried writing the data out, but the object is completely empty. Why is that?

Screenshot 2023-04-20 at 6 22 47 PM

Screenshot 2023-04-20 at 6 23 18 PM

Screenshot 2023-04-20 at 6 24 21 PM

Here is my class_obj:

class_obj = {
    "class": "News_vectorizer",
    # "description": "Information from a Jeopardy! question",  # description of the class
    "properties": [
            {"name": "topic", "dataType": ["string"]},
            {"name": "link", "dataType": ["string"]},
            {"name": "domain", "dataType": ["string"]},
            {"name": "published_date", "dataType": ["string"]},
            {"name": "title", "dataType": ["string"]},
            {"name": "lang", "dataType": ["string"]}
    ],
    "vectorizer": "text2vec-openai",
}

# client.schema.delete_class("News_vectorizer") 
client.schema.create_class(class_obj) 

When I removed the vectorizer from the class schema, then the data gets populated correctly, but I can't do text similarity search because there's no vectorizer.

Install proper log4j logging

We shouldn't probably be printing stuff and should migrate everything to the sparky way of doing logging via log4j

Having trouble using connector post jar-install

Hey all,

I have set up my cluster to use the spark connector jar, as can be seen below:

image

with the connector code written as

weaviate_df.write.format("io.weaviate.spark.Weaviate") \
    .option("batchSize", 5000) \
    .option("scheme", "https") \
    .option("host", "my-website.com") \
    .option("id", "uuid") \
    .option("className", "PaperMetadata") \
    .option("retries", 2) \
    .option("retriesBackoff", 2) \
    .option("timeout", 300) \
    .mode("append").save()

but when I run this step from databricks, I get the error:

org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find data source: io.weaviate.spark.Weaviate. Please find packages at https://spark.apache.org/third-party-projects.html.

Am I missing a step?

Failed to write when installing via Maven on Databricks

I am using Databricks (12.1 ML Runtime) and installed the connector via Maven with the following coordinates; io.weaviate:spark-connector_2.13:1.2.5. It installed just fine, but I got the following error when trying to write. I also tried downgrading to 1.2.5 but had no luck.

Manually uploading the JAR worked fine, but I would prefer to use Maven as it is easier when configuring job clusters.

Py4JJavaError: An error occurred while calling o639.save.
: java.lang.NoSuchMethodError: scala.collection.immutable.Map$.apply(Lscala/collection/immutable/Seq;)Ljava/lang/Object;
	at io.weaviate.spark.WeaviateOptions.<init>(WeaviateOptions.scala:33)
	at io.weaviate.spark.Weaviate.inferSchema(Weaviate.scala:17)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:91)
	at org.apache.spark.sql.DataFrameWriter.getTable$1(DataFrameWriter.scala:315)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:331)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)

Class name to the Java driver

When setting up ETL job in AWS Glue using Weaviate spark connector, the Class name to the JDBC Java driver needs to be specified.

I couldn't seem to located the class name for Weaviate database. What would be the class name to enter ?

9E4B9911-CCFF-4928-BB38-95534EF53942_1_201_a

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.