Giter Site home page Giter Site logo

jelmerk / hnswlib Goto Github PK

View Code? Open in Web Editor NEW
242.0 16.0 53.0 1.2 MB

Java library for approximate nearest neighbors search using Hierarchical Navigable Small World graphs

License: Apache License 2.0

Java 75.03% Scala 24.97%
k-nearest-neighbors knn-search java algorithm spark scala pyspark

hnswlib's Introduction

Build Status

Hnswlib

Java implementation of the the Hierarchical Navigable Small World graphs (HNSW) algorithm for doing approximate nearest neighbour search.

The index is thread safe, serializable, supports adding items to the index incrementally and has experimental support for deletes.

It's flexible interface makes it easy to apply it to use it with any type of data and distance metric.

The following distance metrics are currently pre-packaged :

  • bray curtis dissimilarity
  • canberra distance
  • correlation distance
  • cosine distance
  • euclidean distance
  • inner product
  • manhattan distance

It comes with a scala wrapper that should feel native to scala developers

Apache spark support was moved into the hnswlib-spark project.

To find out more about how to use this library take a look at the hnswlib-examples module or browse the documentation in the readme files of the submodules

Sponsors

YourKIT logo

YourKit is the creator of YourKit Java Profiler, YourKit .NET Profiler, and YourKit YouMonitor.

hnswlib's People

Contributors

ashfaq92 avatar jelmerk avatar nur1popcorn avatar ykaitao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hnswlib's Issues

Deadlock when Errors are thrown

Errors just like runtime exceptions can be thrown without being declared in the method's header explicitly (e.g. AssertionError, ...). I noticed that the addAll method in the Index interface only handles RuntimeExceptions however. So if an an item is added and that causes an Error to be thrown the program deadlocks. Initially I thought that this would be rather easy to fix it turns out however that the caught RuntimeException is later rethrown. Therefore fixing the bug and keeping that functionality would require that addAll throws Throwable which is not very beautiful and also breaks compatibility with existing applications.

Below is an example which reliably deadlocks:

try {
    DistanceFunction<Object, Integer> distanceFunction = (a, b) -> { throw new Error(); };
    HnswIndex<Object, Object, Item<Object, Object>, Integer> index =
        HnswIndex.newBuilder(1, distanceFunction, 2)
            .build();
    class ObjectItem implements Item<Object, Object> {
        private final Object object = new Object();
        @Override
        public Object id() { return object; }
        @Override
        public Object vector() { return object; }
        @Override
        public int dimensions() { return 1; }
    }
    index.addAll(List.of(new ObjectItem(), new ObjectItem()));
} catch (InterruptedException e) {
    e.printStackTrace();
}

not all index partitions are persisted (data loss) on k8s

Hi:)

brute_force = BruteForceSimilarity(
identifierCol='id',
queryIdentifierCol='id',
featuresCol='embedding',
distanceFunction='cosine',
k=200,
numReplicas=0,
numPartitions=15,
excludeSelf=False,
predictionCol='prediction'
)

model = brute_force.fit(q_embeddings)
model.write().overwrite().save(inference_base_path)
each time a different number of partitions is saved - in local mode this doesn't happen
working on k8s cluster pyspark 3.3

any help is very much appreciated! thanks!

Question about maxLevel

Hi,

In this code:

        Node(int id, MutableIntList[] connections, TItem item, boolean deleted) {
            this.id = id;
            this.connections = connections;
            this.item = item;
            this.deleted = deleted;
        }

        int maxLevel() {
            return this.connections.length - 1;
        }

why are you setting maxLevel to the number of connections (neighbors) of the node? The level of the node should be the layer number in which the node is stored. could you explain to me?

and what does this:

private MutableObjectIntMap<TId> lookup;

store? Is there any README explaining the data layout?

How can I add more items to the index?

I ran this:
hnsw = Hnsw(identifierCol='ID', vectorCol='features', distanceFunction='cosine', m=64, ef=5, k=5, efConstruction=200, numPartitions=100)
model = hnsw.fit(df)
Now I want to save 'model' as an index file so that later I can read that file back and add more data into it (add extra_df for example).
Can you please tell me how can I do that? I have looked up in the repo but no example is found.

There is no "select nerghbors heuristic" implemented

From the code, it seems that only "select nerghbors simple" is implemented, but has no "select nerghbors heuristic" - commonly known as "highway mechanism".

Am I right? And from the perspective of test results, it will lead to the situation that the search always lingers on the edge of clustering and the recall rate is reduced.

Do you have a plan to implement "select nerghbors heuristic"?

License

Great Library! can you please add a License to this project, hopefully MIT or similar :)

thanks!

How to find nearest neighbors of an item that does not appear in index process?

I have this dataframe:
image
And I ran this:
hnsw = Hnsw(identifierCol='ID', vectorCol='features', distanceFunction='cosine', m=64, ef=5, k=5, efConstruction=200, numPartitions=100)
model = hnsw.fit(new_df)
model.transform(new_df).write.parquet('output', mode='overwrite')

Then I read back .parquet files and get:

image
My question is, assume that I have an item:
(103, [1, 1])
How can I find nearest neighbors of it?

Big K - output empty result

Hi, when I set a big K=50,000, the result of the model is empty.
When I set K=10,000, the result is fine.
My dataset is about 0.1b row of size.
My final goal is to set K=50,000 ~ 100,000.

Setting is below:
hnsw = HnswSimilarity(identifierCol='ITEM_ID', queryIdentifierCol='ITEM_ID',featuresCol='EMBEDDINGS', distanceFunction='eu clidean', m=128, ef=25, k=50000,efConstruction=1000, numPartitions=9000, numReplicas=50, excludeSelf=True, similarityThreshold =0.2, predictionCol='pred')

Load does not work

TypeError: 'JavaPackage' object is not callable
I see it was fixed in previous issue, but I still get it for bruteforce model
com.github.jelmerk:hnswlib-spark_3.3_2.12:1.1.0

java.lang.NoClassDefFoundError: org/eclipse/collections/api/map/primitive/MutableObjectIntMap

I run hnswlib in spark, but this error occurs:

java.lang.NoClassDefFoundError: org/eclipse/collections/api/map/primitive/MutableObjectIntMap
        at com.github.jelmerk.knn.scalalike.hnsw.HnswIndex$.apply(HnswIndex.scala:108)
        at com.github.jelmerk.spark.knn.hnsw.HnswSimilarity.createIndex(HnswSimilarity.scala:150)
        at com.github.jelmerk.spark.knn.hnsw.HnswSimilarity.createIndex(HnswSimilarity.scala:132)
        at com.github.jelmerk.spark.knn.KnnAlgorithm$$anonfun$typedFit$1.apply(KnnAlgorithm.scala:796)
        at com.github.jelmerk.spark.knn.KnnAlgorithm$$anonfun$typedFit$1.apply(KnnAlgorithm.scala:786)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1964)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1964)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.eclipse.collections.api.map.primitive.MutableObjectIntMap
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Poor results for similarity match

seeing very poor results with the algorithm and not sure what i am doing wrong. I have generated a test data set with solid color rectangles and then comparing it with a dataset of mixed images and solid color rectangles. my dimensions are 2048
hnsw = HnswSimilarity(identifierCol='id', featuresCol='decodedFeature', distanceFunction='cosine', m=90, ef=5, k=5,
efConstruction=200, numPartitions=10, similarityThreshold=0.4, excludeSelf=True, predictionCol='approximate')
i have tried it multiple times with different paramters however i am not getting good results. i have tried ef as 200 as well however increasing the ef gives lower accuracy rate than ef closer to k. will appreciate some guidance

Dynamic Graph Creation

Hi, Thanks for great work!
Please guide if there a method to make graph without specifying its maxItemCount in advance.
thanks again. and lots of love

Not working in webapp

I have a Scala application which uses an index created by hnswlib and it works great. When I call the exact same code from a play webapp, the deserialization of the TId fails for some reason

Caused by: java.lang.IllegalArgumentException: Could not read input file.
Caused by: java.lang.ClassNotFoundException: <Tid>

I can deserialize this type from elsewhere in the application, even in a webapp, just not during the HnswIndex.load in the same webapp.

Any ideas? Sometimes it is a ClassLoader problem and I vaguely recall seeing an implementation in the code of this repo or one nearby.

pyspark_hnsw with pyspark 3.0

I tried to use the pyspark_hnsw library and got an error in the illustrative example notebook.

I use python 3.7.6, java 11 and pyspark 3.0.0. I start spark session in local mode from Jupyter notebook. I added .config("spark.jars.packages", 'com.github.jelmerk:hnswlib-spark_2.3.0_2.11:0.0.48') to SparkSession.builder.

I got en error An error occurred while calling None.com.github.jelmerk.spark.conversion.VectorConverter. : java.lang.NoClassDefFoundError: org/apache/spark/ml/param/shared/HasInputCol$class which occured in line converter = VectorConverter(inputCol='features_as_vector', outputCol='features').

Could you please have a look?

traceback is below:

`---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_17433/2905031989.py in
----> 1 converter = VectorConverter(inputCol='features_as_vector', outputCol='features')

~/replay_tasks/hnswlib_spark/hnsw_pyspark_3/lib/python3.7/site-packages/pyspark/init.py in wrapper(self, *args, **kwargs)
108 raise TypeError("Method %s forces keyword arguments." % func.name)
109 self._input_kwargs = kwargs
--> 110 return func(self, **kwargs)
111 return wrapper
112

/tmp/spark-d50092f2-09da-45ee-8c96-294516a54348/userFiles-99f8f02a-b0f2-49fe-b9a2-202e097ede92/com.github.jelmerk_hnswlib-spark_2.3.0_2.11-0.0.48.jar/pyspark_hnsw/conversion.py in init(self, inputCol, outputCol, outputType)
22 """
23 super(VectorConverter, self).init()
---> 24 self._java_obj = self._new_java_obj("com.github.jelmerk.spark.conversion.VectorConverter", self.uid)
25 kwargs = self._input_kwargs
26 self.setParams(**kwargs)

~/replay_tasks/hnswlib_spark/hnsw_pyspark_3/lib/python3.7/site-packages/pyspark/ml/wrapper.py in _new_java_obj(java_class, *args)
67 java_obj = getattr(java_obj, name)
68 java_args = [_py2java(sc, arg) for arg in args]
---> 69 return java_obj(*java_args)
70
71 @staticmethod

~/replay_tasks/hnswlib_spark/hnsw_pyspark_3/lib/python3.7/site-packages/py4j/java_gateway.py in call(self, *args)
1567 answer = self._gateway_client.send_command(command)
1568 return_value = get_return_value(
-> 1569 answer, self._gateway_client, None, self._fqn)
1570
1571 for temp_arg in temp_args:

~/replay_tasks/hnswlib_spark/hnsw_pyspark_3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)

~/replay_tasks/hnswlib_spark/hnsw_pyspark_3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling None.com.github.jelmerk.spark.conversion.VectorConverter.
: java.lang.NoClassDefFoundError: org/apache/spark/ml/param/shared/HasInputCol$class
at com.github.jelmerk.spark.conversion.VectorConverter.(VectorConverter.scala:44)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)`

Small number of queries succeeds, whereas larger number fails

I have used hnswlib-pyspark to index reference data consisting of ~200M rows. The PySpark code is running on Amazon EMR Serverless. I am able to obtain results for 10 rows of query data. If I increase the number of rows, however, to a greater number, such as 100, the outputted DataFrame is empty. If I attempt to use a saved version of the fitted model pipeline, the following exception is thrown by one or more executor nodes:

java.lang.IllegalStateException: Workers failed to complete.
	at com.github.jelmerk.spark.knn.KnnModelOps$$anon$2.hasNext(KnnAlgorithm.scala:596)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:183)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:133)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Configuration:

BruteForceSimilarity(
    identifierCol=ref_identifier_col,
    queryIdentifierCol=query_identifier_col,
    featuresCol="features",
    distanceFunction="inner-product",
    k=10,
    numPartitions=30,
    numReplicas=2,
    excludeSelf=False,
    similarityThreshold=0.8,
    predictionCol="knn",
)

Spark configuration:

  • spark.emr-serverless.executor.disk – 20g
  • spark.executor.instances – 40
  • spark.dynamicAllocation.enabled – false

I am seeking advice on possible causes of this issue.

Thank you very much.

TypeError: 'JavaPackage' object is not callable. I am using Pyspark

Hi, I ran
spark-submit --py-files hnswlib-pyspark/dist/pyspark_hnsw-0.1-.egg --jars hnswlib-spark/target/hnswlib-spark--jar-with-dependencies.jar <my_script>
And get this error

File "/home/ubuntu/miniconda/envs/thu_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/init.py", line 105, in wrapper
File "/home/ubuntu/miniconda/envs/thu_env/lib/python3.7/site-packages/pyspark_hnsw/hnsw.py", line 13, in init
self._java_obj = self._new_java_obj("com.github.jelmerk.knn.spark.hnsw.Hnsw", self.uid)
File "/home/ubuntu/miniconda/envs/thu_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 63, in _new_java_obj
TypeError: 'JavaPackage' object is not callable
Exception ignored in: <function JavaParams.del at 0x7f5dbacb7620>
Traceback (most recent call last):
File "/home/ubuntu/miniconda/envs/thu_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 105, in del
File "/home/ubuntu/miniconda/envs/thu_env/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1897, in detach
AttributeError: 'NoneType' object has no attribute '_detach'

Can you please help me with this?

A method for loading a large index

I'm working with a large index (~11G and will most likely grow more in the future). It is inefficient for my use case to load the entire index into memory (as this library requires). Do you have any recommendations on how to handle a large index or any plans to add ways to not load the entire index into memory in the future?

Thanks and thanks for making this library.

Error: Cannot up cast `partition` from bigint to int.

When using partitionCol, we get the following error:

    org.apache.spark.sql.AnalysisException: Cannot up cast `partition` from bigint to int.
    The type path of the target object is:
    - field (class: "scala.Int", name: "_1")
    - root class: "scala.Tuple2"
    You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object
        at org.apache.spark.sql.errors.QueryCompilationErrors$.upCastFailureError(QueryCompilationErrors.scala:138)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3565)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37$$anonfun$applyOrElse$196.applyOrElse(Analyzer.scala:3594)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37$$anonfun$applyOrElse$196.applyOrElse(Analyzer.scala:3572)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:323)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:328)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:413)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:411)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:364)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:328)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:328)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChild$2(TreeNode.scala:382)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$4(TreeNode.scala:443)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:443)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:411)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:364)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:328)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:94)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:76)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:85)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37.applyOrElse(Analyzer.scala:3572)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$37.applyOrElse(Analyzer.scala:3568)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:3568)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:3559)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:215)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:212)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:204)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:204)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:197)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:191)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:348)
        at org.apache.spark.sql.Dataset.resolvedEnc$lzycompute(Dataset.scala:252)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolvedEnc(Dataset.scala:251)
        at org.apache.spark.sql.Dataset$.apply(Dataset.scala:82)
        at org.apache.spark.sql.Dataset.as(Dataset.scala:476)
        at com.github.jelmerk.spark.knn.KnnAlgorithm.typedFit(KnnAlgorithm.scala:847)
        at com.github.jelmerk.spark.knn.KnnAlgorithm.fit(KnnAlgorithm.scala:749)
        at org.apache.spark.ml.Pipeline.$anonfun$fit$5(Pipeline.scala:151)
        at org.apache.spark.ml.MLEvents.withFitEvent(events.scala:130)
        at org.apache.spark.ml.MLEvents.withFitEvent$(events.scala:123)
        at org.apache.spark.ml.util.Instrumentation.withFitEvent(Instrumentation.scala:42)
        at org.apache.spark.ml.Pipeline.$anonfun$fit$4(Pipeline.scala:151)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at org.apache.spark.ml.Pipeline.$anonfun$fit$2(Pipeline.scala:147)
        at org.apache.spark.ml.MLEvents.withFitEvent(events.scala:130)
        at org.apache.spark.ml.MLEvents.withFitEvent$(events.scala:123)
        at org.apache.spark.ml.util.Instrumentation.withFitEvent(Instrumentation.scala:42)
        at org.apache.spark.ml.Pipeline.$anonfun$fit$1(Pipeline.scala:133)
        at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
        at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:133)

The error is coming from KnnAlgorithm.scala on line 847.

      if (isDefined(partitionCol)) dataset
        .select(
          col(getPartitionCol).as("partition"),
          struct(col(getIdentifierCol).as("id"), col(getFeaturesCol).as("vector"))
        )
        .as[(Int, TItem)]
        .rdd
        .partitionBy(new PartitionIdPassthrough(getNumPartitions))
        .values
        .toDS

It should be .as[(TId, TItem)] instead of .as[(Int, TItem)]

Option for Selecting Neighbors Simply or by Heuristic

Hi, thanks for great work.
In this implementation, can we use heuristic for selecting neighbors for a point optionally? I mean, there are some cases when user want to use simple method of selecting neighbors instead of using heuristic. I did not see any option in Readme file, documentation or in examples section, please guide. As a humble suggestion, we can set a (flag or option) for which method should be used for selecting nearest neighbors while initializing the index. Thank you!

Question about retrieving 1M topK items from 100M pool?

Thanks a lot for the great library! We have been using the hnswlib for some time and quite satisfied with the retrieval quality. I have a question as posted in the title: is this possible to retrieve 1M topK items from 100M pool (vector dim = 100)? I have been told it is almost impossible to construct the search layer graph. Is this true? If so, I am curious to know where is the true bottleneck. We can sacrifice the recall in this case.

Not able to load persisted index

Hello,

First of all thanks for the library!
Can you please take a look into the following issue? When I try to load persisted index I'm getting the error below:

java.io.InvalidObjectException: ReflectiveOperationException during deserialization
  at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:280)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base/java.lang.reflect.Method.invoke(Method.java:568)
  at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1321)
  at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2251)
  at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1742)
  at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2584)
  at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2442)
  at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2242)
  at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1742)
  at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
  at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
  at com.github.jelmerk.knn.hnsw.HnswIndex.readObject(HnswIndex.java:744)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base/java.lang.reflect.Method.invoke(Method.java:568)
  at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1231)
  at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2408)
  at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2242)
  at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1742)
  at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
  at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
  at com.github.jelmerk.knn.hnsw.HnswIndex.load(HnswIndex.java:867)
  at com.github.jelmerk.knn.hnsw.HnswIndex.load(HnswIndex.java:831)
  at com.github.jelmerk.knn.scalalike.hnsw.HnswIndex$.load(HnswIndex.scala:38)
  ... 36 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumentException: too many arguments
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base/java.lang.reflect.Method.invoke(Method.java:568)
  at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:278)
  ... 63 more
Caused by: java.lang.IllegalArgumentException: too many arguments
  at java.base/java.lang.invoke.LambdaMetafactory.altMetafactory(LambdaMetafactory.java:511)
  at scala.runtime.LambdaDeserializer$.makeCallSite$1(LambdaDeserializer.scala:105)
  at scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:114)
  at scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
  at com.github.jelmerk.knn.scalalike.package$.$deserializeLambda$(package.scala)
  ... 68 more

Here is a minimal code to reproduce the issue:

    import com.github.jelmerk.knn.scalalike._
    import com.github.jelmerk.knn.scalalike.hnsw._
    import java.io.File

    @SerialVersionUID(1L) case class Word(id: String, vector: Array[Float]) extends Item[String, Array[Float]]

    val hnswIndex = HnswIndex[String, Array[Float], Word, Float](floatCosineDistance, 10)
    hnswIndex.save(new File("./indexfile"))
    HnswIndex.load(new File("./indexfile"))

My sbt dependency:

"com.github.jelmerk" %% "hnswlib-scala" % "0.0.19",

Sbt version 1.3.3, scala version 2.12.10

how to save the hnsw_index to local disk? and does the cosine distance is right?

Q1: I have tried many tools to save the "index",but not success, when read the saved "index" from the disk, the "find_neareast"result is "0", can you give the code to save the "index" to disk?

Q2: I test the cosine distance, found that the cosine distance between two same vector is "0", I supose the result should be "1", does it wrong?

scala 2.12 port running very slowly

When this library was used for computing KNN with scala 2.11, spark 2.3.0, it used to take 2.5 hrs.
With the same settings, (input data size, executors, executory memory etc.), with scala 2.12, spark 3.1.1, it takes 53 hrs and still running (killed the job).

I believe there is some .rdd conversion that is causing all the contents to be flushed to disk. Could you please help narrow down the root cause, so we may continue to use this library.

Here is the code being used:

    val converter = new VectorConverter()
      .setInputCol("inputFeatures")
      .setOutputCol("features")

    val normalizer = new Normalizer()
      .setInputCol("features")
      .setOutputCol("normalizedFeatures")

    val hnsw =  new HnswSimilarity()
        .setIdentifierCol("myid")
        .setQueryIdentifierCol("myid")
        .setFeaturesCol("normalizedFeatures")
        .setNumPartitions(numPartitions)
        .setNumReplicas(numReplicas)
        .setM(m)
        .setEf(ef)
        .setEfConstruction(efConstruction)
        .setK(topK)
        .setDistanceFunction(distanceFunction)
        .setSimilarityThreshold(similarityThreshold)
        .setPredictionCol("pred")
        .setExcludeSelf(excludeSelf)

    val pipeline = new Pipeline()
      .setStages(Array(converter, normalizer, hnsw))

    val hnswModel = pipeline.fit(indexItems)
    val output = hnswModel.transform(indexItems)

Language Level and Working Example

Hi! thanks for the great work.
I was implementing this project in intellijIdea but it fails everytime. The error is (something) is not supported at Language Level13. Then I have to change language level (from 7,8) then the problem arises in other parts of code.
These imports don't work too (in intellijIdea):

import org.eclipse.collections.api.list.primitive.MutableIntList;
import org.eclipse.collections.api.map.primitive.MutableObjectIntMap;
import org.eclipse.collections.api.map.primitive.MutableObjectLongMap;
import org.eclipse.collections.api.tuple.primitive.ObjectIntPair;
import org.eclipse.collections.api.tuple.primitive.ObjectLongPair;
import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
import org.eclipse.collections.impl.map.mutable.primitive.ObjectIntHashMap;
import org.eclipse.collections.impl.map.mutable.primitive.ObjectLongHashMap;

Even if we open the hnswlib-core project in eclipse its pom gives error

Kindly specify in which language level this project works and give a small java interface example to algorithm (like this in python) (this in R) so that beginners may use this into their works.

A small compact working example would be highly appreciated.
Thanks.

Loading model in pyspark fails

Using Spark 2.4.7, I get this error when I try to load a pipeline model but hnswlib stage fails:
image

This is the best resource that I found online that attempts to fix this problem but hasn't worked: microsoft/SynapseML#614 (switched the line alt_clazz = clazz.replace('com.microsoft.ml.spark', 'mmlspark') with alt_clazz = clazz.replace('com.github.jelmerk.spark', 'pyspark_hnsw')).

cannot calculate resize correctly leading to SizeLimitExceededException

Hi --

HnswIndex method size returns the size of the underlying data structure lookup , but throwing a SizeLimitExceededException is contingent on whether nodeCount is greater that the maxItemCount when calling add. This is an issue when removals occur since nodeCount stays the same yet the size of lookup decreases.

Callers cannot accurately calculate whether or not to resize an index due to misleading index size.

Error saving Pipeline model on s3 via Pyspark

I get the error below when I try to save the pipeline on s3, when providing a uri with s3:// (note replaced certain info with ellipsis):

[Stage 56:=======================================================>(87 + 1) / 88]21/10/04 18:48:57 WARN TaskSetManager: Lost task 87.0 in stage 56.0 (TID 2541, ip-10-195-26-28.ec2.internal, executor 7): java.lang.IllegalArgumentException: Wrong FS: s3://../stages/3_HnswSimilarity_7c6781618491/indices/0, expected: hdfs://..
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:782)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:213)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536)
at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:527)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:376)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply$mcVJ$sp(KnnAlgorithm.scala:290)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply(KnnAlgorithm.scala:283)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply(KnnAlgorithm.scala:283)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.SparkContext$$anonfun$range$1$$anonfun$28$$anon$1.foreach(SparkContext.scala:765)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

21/10/04 18:48:58 ERROR TaskSetManager: Task 87 in stage 56.0 failed 4 times; aborting job
0%| | 0/1 [00:22<?, ?it/s]


Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_113378/3755559733.py in
1 nn_run(...
----> 2 save_model=True)

/tmp/ipykernel_113378/66834136.py in nn_run(input_cols, tbl_name, nn_dir, output_tbl, save_model, num_neighbors)
40
41 if save_model:
---> 42 nn_model.write().overwrite().save(os.path.join(nn_dir, f'{name}_nn.model'))
43
44 # pivoted_data.write.format("parquet").mode("append").save(os.path.join(nn_dir, f'{output_tbl}.parquet'))

/usr/lib/spark/python/pyspark/ml/util.py in save(self, path)
181 if not isinstance(path, basestring):
182 raise TypeError("path should be a basestring, got type %s" % type(path))
--> 183 self._jwrite.save(path)
184
185 def overwrite(self):

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o768.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 87 in stage 56.0 failed 4 times, most recent failure: Lost task 87.3 in stage 56.0 (TID 2544, ip-10-195-26-28.ec2.internal, executor 7): java.lang.IllegalArgumentException: Wrong FS: s3://../stages/3_HnswSimilarity_7c6781618491/indices/0, expected: hdfs://
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:782)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:213)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536)
at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:527)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:376)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply$mcVJ$sp(KnnAlgorithm.scala:290)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply(KnnAlgorithm.scala:283)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply(KnnAlgorithm.scala:283)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.SparkContext$$anonfun$range$1$$anonfun$28$$anon$1.foreach(SparkContext.scala:765)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:988)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:988)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:988)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2301)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
at com.github.jelmerk.spark.knn.KnnModelWriter.saveImpl(KnnAlgorithm.scala:283)
at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$saveImpl$1.apply(Pipeline.scala:254)
at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$saveImpl$1.apply(Pipeline.scala:253)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline$SharedReadWrite$.saveImpl(Pipeline.scala:253)
at org.apache.spark.ml.PipelineModel$PipelineModelWriter.saveImpl(Pipeline.scala:338)
at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Wrong FS: s3://../3_HnswSimilarity_7c6781618491/indices/0, expected: hdfs://
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:782)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:213)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1524)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1521)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1536)
at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:527)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:376)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:316)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply$mcVJ$sp(KnnAlgorithm.scala:290)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply(KnnAlgorithm.scala:283)
at com.github.jelmerk.spark.knn.KnnModelWriter$$anonfun$saveImpl$1.apply(KnnAlgorithm.scala:283)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.SparkContext$$anonfun$range$1$$anonfun$28$$anon$1.foreach(SparkContext.scala:765)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

It's probably analogous to: JohnSnowLabs/spark-nlp#121

Unable to update index

Hello,

There is a way to store a fitted HnswSimilarity model but I haven't found a way to update it with new vectors.

Is there a way or work around, plans to add this soon?

Thank you!

Pyspark >3.0

hi!
Am I right that your package works only with pyspark 2.+? if so, could you recommend any other approximate nearest neighbour search libraries for pyspark 3?

mvn test hnsw-spark NormalizerSpec java.lang.NoClassDefFoundError: org/apache/spark/ml/param/shared/HasInputCol

Can you help me look at the problem?

I just started learning to use spark and spark-hnsw.
When I mvn test hnsw-spark, NormalizerSpec throw out this error java.lang.NoClassDefFoundError: org/apache/spark/ml/param/shared/HasInputCol.
"mvn clean install -dskiptests" has been done.
I have set dependency> spark-mllib_2.11 compile ,but it doesn`t work.

submit nomalizerSpectest.py --packages 'com.github.jelmerk:hnswlib-spark_2.3_2.11:0.0.49' or 0.50
donnt work,throw out above problem

What I shound do?
Thanks!

env

spark 2.3.0 local model
OpenJDK 64-Bit Server VM, 1.8.0_232
Scala version 2.11.12

Repeatability

Is the reason that the index differs when I run it twice on the exact same data the threading which introduces some change in ordering? Is it just impossible to avoid it? Performance is way down if I use just one thread, but at least it (seems to) give the same answer each time.

hnswlib-spark slow on the retrieval of millions items

In our use case, we have 20M ~ 100M items with 100d embedding for query top_k (20 ~ 1000). I am seeking any suggestion for speeding up the spark job.

one example used in our case,

val hnsw = new HnswSimilarity()
        .setIdentifierCol("id")
        .setQueryIdentifierCol("id")
        .setFeaturesCol("normalizedFeatures")
        .setNumPartitions(10)
        .setM(16)
        .setEf(200)
        .setEfConstruction(200)
        .setK(20)
        .setDistanceFunction("inner-product")
        .setSimilarityThreshold(0.3)
        .setPredictionCol("approximate")
        .setExcludeSelf(true)

Thanks a lot!

Unable to load the HnswSimilarity model

Hi can you please help me loading the model . I am using pyspark --packages 'com.github.jelmerk:hnswlib-spark_3.3_2.12:1.0.1' command

creating instance

hnsw = HnswSimilarity(identifierCol='id', queryIdentifierCol='id' , featuresCol='company_industry_embeddings', distanceFunction='cosine', m=48, ef=5, k=200, efConstruction=200, numPartitions=50, excludeSelf=True)
test = hnsw.fit(df)
test.save(path)

reading the model

model = hnsw.load(path) ## gives error

Traceback (most recent call last):
File "", line 1, in
File "/usr/lib/spark/python/pyspark/ml/util.py", line 353, in load
return cls.read().load(path)
File "/usr/lib/spark/python/pyspark/ml/util.py", line 365, in read
return JavaMLReader(cls)
File "/usr/lib/spark/python/pyspark/ml/util.py", line 296, in init
self._jread = self._load_java_obj(clazz).read()
TypeError: 'JavaPackage' object is not callable

Illustrative Example

Hi, thank you for the great work (especially adding license :)

We are conducting research for a software testing method that works on the top of your HNSW implementation. I know this seems like an odd request, but, I was wondering if you could please show the inner working of the algorithm via an illustrative example.
For example: When an HNSW index is initialized:

HnswIndex<Integer, double[], Point, Double> hnswIndex = HnswIndex
                .newBuilder(DistanceFunctions.DOUBLE_EUCLIDEAN_DISTANCE, graphSize).withM(givenM).withEf(ef)
                .withEfConstruction(efConst).build();

We would be really grateful if you provide what is being initialised under the hood. We have tried to use debug and print statements to print parameters in your HnswIndex.java file, but each time, we add a node to it, it gets refreshed and previous record is gone.
Also, how the hnswIndex.add(node) and hnswIndex.findNearest(node.vector(), 1).get(0).distance() work ?
It would be nice if you add some nodes to HnswIndex (say up to 3) and then perform a nearest neighbor search for a query point and reveal the internal working of the algorithm.

Note: The paper proposing the HNSW describes the theory and the algorithm, but there is not practical illustration on how the data flows between algorithms for adding/searching nodes to the graph

Cannot understand the definition of curdist in getNeighborsByHeuristic2?

HnswIndex.java > getNeighborsByHeuristic2:

for (NodeIdAndDistance<TDistance> secondPair : returnList) {
                TDistance curdist = distanceFunction.distance(
                        nodes.get(secondPair.nodeId).item.vector(),
                        nodes.get(currentPair.nodeId).item.vector()
                );

                if (lt(curdist, distToQuery)) {
                    good = false;
                    break;
                }
            }

according to HNSW's paper Algorithm 4
i think,curdist = distance(q, every element in returnList),not curdist = distance(e, every element in returnList),the definition of q and e is same as Algorithm 4, am i right?

java.lang.NegativeArraySizeException

when i found the algo on 1.5 billion items, I got the following exception

Caused by: java.lang.NegativeArraySizeExceptionat 
com.github.jelmerk.knn.hnsw.HnswIndex.add(HnswIndex.java:212)at 
com.github.jelmerk.knn.Index.lambda$addAll$0(Index.java:125)at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

My configuration is

    val hnsw = new HnswSimilarity()
      .setIdentifierCol("item_id")
      .setQueryIdentifierCol("item_id")
      .setFeaturesCol("embeddings")
      .setNumPartitions(150)
      .setNumReplicas(5)
      .setK(5)
      .setEf(20)
      .setSimilarityThreshold(0.5)
      .setDistanceFunction("inner-product")
      .setPredictionCol("approximate")
      .setExcludeSelf(true)
      .setM(16)
      .setEfConstruction(200)

excludeSelf parameter in hnswlib class not working

When I set excludeSelf to true, it still shows the id's of self in the results. Below is a sample (converted to a pandas dataframe) from what I'm using to experiment:

image

From above, the id's are on the left while the right contains a list of tuples (id and distance as returned from hnswlib). You'll notice id's of self appearing in the results.

how to delete an item from the index

hi ,thanks for your greate work!
how to delete an item from the index,i know there is a method remove(),but i dont know the second parameters means and how can i get the parametres :version

hnswindex,remove(String ids,Long version)

PySpark OOM advice

I have the following situation in PySpark:
Cluster with 256GB RAM per node (9GB reserved with rest available to spark) running on 10 nodes with 1 container per node,
~8M rows (candidates df) with a 1024-dim feature vector column with ~10k partitions,
~500M rows (query df) with 1024-dim feature vector column with ~10k partitions,
(edit: they each have ~20k partitions)

An hnsw index using parameters m=12, efConstruction=16, ef=25, the index that gets created is about ~30GB.
I am consistently losing nodes due to memory issues at [KnnAlgorithm.scala:510] or [KnnAlgorithm.scala:521].
It appears it's at the query stage based on the code at those line numbers.
I increased partition size, reduced the m and efConstruction quite low, and gave generous amounts of RAM.

My only next thought of what to try next was to split up the query df into 1000 dfs and model.transform(df) each one and union them, but I would have thought the large partitioning (10k) would have had the same effect. I also think that would be quite slow if it would even work. After that, I was going to abandon trying to do ANN purely in spark -- perhaps I should create a service that has an nmslib hnsw index and call it with pandas_udf from spark. But before I abandon it, I figured I would see if you have any ideas.

Do you have any suggestions?
Thank you

Uncorrect distance for hnswlib-spark

Hi, I use hnswlib-spark for ANN search, and use inner-product for my normalized 128-dim embedding. But I found if I fit the full-size query item(> 10M) to the model, the similarity (1-inner-product) returned by the algorithm is not correct.

cosine_sim actual_cosine_sim
0.9999999 0.8632334353951592
1.0 0.6545347158171353
0.99999976 0.5148564692935906
0.9999996 0.9999995576617948

But when only fit one query item , the similarity is right, and the model return totally different top-K items

The following code is my model configuration

new HnswSimilarity()
      .setIdentifierCol("item_id")
      .setQueryIdentifierCol("item_id")
      .setFeaturesCol("embeddings")
      .setNumPartitions(150)
      .setNumReplicas(5)
      .setK(30)
      .setEf(128)
      .setSimilarityThreshold(0.90)
      .setDistanceFunction("inner-product")
      .setPredictionCol("approximate")
      .setExcludeSelf(false)
      .setM(64)
      .setEfConstruction(200)

The version is hnswlib-spark_2.3.0_2.11:0.0.46

Brute Force vs hnsw Problem

When I tried to compared the recall between brute force function and hnsw, I found the recall is zero.

I randomly picked 1000 data for the testing for both brute force and hnsw and tried to see how many pairs are the same from both functions.

I am curious about your brute force function. Is it calculating all of the pairs and then rank them right?

Here's my code
hnsw = HnswSimilarity(identifierCol='ITEM_ID',featuresCol='EMBEDDINGS', distanceFunction='euclidean', m=32, ef=5, k=10,ef Construction=200, numPartitions=100, excludeSelf=False, similarityThreshold=-1.0, predictionCol='pred')
brute_force = BruteForceSimilarity(identifierCol='ITEM_ID', featuresCol='EMBEDDINGS',distanceFunction='euclidean',k=10, n umPartitions=100, excludeSelf=False, similarityThreshold=-1.0, predictionCol='pred')

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.