yahoo / oak Goto Github PK
View Code? Open in Web Editor NEWA Scalable Concurrent Key-Value Map for Big Data Analytics
License: Apache License 2.0
A Scalable Concurrent Key-Value Map for Big Data Analytics
License: Apache License 2.0
Oak/core/src/main/java/com/oath/oak/NativeAllocator/BlocksPool.java
Lines 47 to 50 in 5b3b44c
Double-Checked Locking is widely cited and used as an efficient method for implementing lazy initialization in a multithreaded environment.
Unfortunately, it will not work reliably in a platform independent way when implemented in Java, without additional synchronization.
I use oakMap instead of concurrentSkipListMap in my project, while I find something strange.
When I no longer use an oakMap, java heap memory used by this oakMap is released, but off-heap memory does not seems to be released.
I use NMT to track the use of off-heap memory, and I find something strange.
The heap memory occupied by current process is very small, but through the linux system command "top" found that it occupies huge memory. And I analyze the memory distribution in heap and find that heap memory used by the current process is very small. By the way, therer is no OakMap object in heap memory.
OakMap.containsKey currently calls AbstractMap.containsKey which scans the entire map - it's possible to overcome by using zc().get(key) != null , however it will make more sense if OakMap did that on its own
In all the lines above, the operation on the handle may return false
, meaning that the value is deleted and operation should restart.
In particular, in line 309 if the operation returns false
, it means that the value was deleted, and thus the put should restart in order to actually take effect.
In lines 383 and 421, we the value exists, and in the non-ZC API we need to return the previous value.
If the value managed to be deleted before we read it, the result would be null which does not match with the putIfAbsent
API.
In druid I need to know if the new key was added or aggregated to avoid an additional read for each insert.
Oak/core/src/main/java/com/oath/oak/Chunk.java
Lines 708 to 710 in d369170
In line 708 we read the next entry in the list, and then the handle index.
If the read entry is not NONE, we again read the same handle index in line 694.
If the entry is NONE, however, we read the handle index of an illegal entry, resulting in an invalid handle index which something causes the copying of additional entries.
A simple fix seems to be to comment out line 709.
The first Oak issue (!) ๐ฅ
Rearranging the Oak Memory Management so it supports any Oak size and can get an external Memory Allocator.
Hereby please find the new design:
Memory Management Structure.pdf
The design is still subject to further changes.
Add more tests to increase test coverage. Run jacoco maven plugin, fix the results, add maven constant rule for test coverage. This is an important step for safe contribution in our community.
@liran-funaro can you please add more details to this issue?
For more explanations feel free to add questions in this issue.
I got the following exception when running 16 (independent) instances of Oak concurrently on 16 different threads.
java.lang.NullPointerException
at com.oath.oak.NativeAllocator.BlocksPool.returnBlock(BlocksPool.java:105)
at com.oath.oak.NativeAllocator.OakNativeMemoryAllocator.close(OakNativeMemoryAllocator.java:215)
at com.oath.oak.NovaManager.close(NovaManager.java:31)
at com.oath.oak.InternalOakMap.close(InternalOakMap.java:107)
at com.oath.oak.OakMap.close(OakMap.java:597)
at org.apache.druid.segment.incremental.OakIncrementalIndex$OakFactsHolder.close(OakIncrementalIndex.java:537)
at org.apache.druid.segment.incremental.OakIncrementalIndex.close(OakIncrementalIndex.java:145)
at org.apache.druid.benchmark.indexing.ConcurrentFullScaleIngestionBenchmark.add(ConcurrentFullScaleIngestionBenchmark.java:234)
at org.apache.druid.benchmark.indexing.generated.ConcurrentFullScaleIngestionBenchmark_add_jmhTest.add_AverageTime(ConcurrentFullScaleIngestionBenchmark_add_jmhTest.java:181)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453)
at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Change all tests to use Executers instead of Threads. In order to propagate raised exceptions to the tests. Currently we might miss some exceptions.
@liran-funaro can you please add more details to this issue?
For more explanations feel free to add questions in this issue.
This class is a singleton so keeps a reference to all blocks/ByteBuffers at all time so off heap memory is never released even if oak is closed and not referenced anymore
OakBuffer hierarchical structure involves multiple classes. To take care of the future changes in the OakBuffers structure, we need to measure their performance with each change. Let's start by measuring the performance.
Add OakBuffer usage benchmark for performance evaluation. It should be quit exhaustive usage (for example matrix multiplication). The new benchmark should evaluate the performance of some OakBuffers variants.
@liran-funaro can you please add more details to this issue?
For more explanations feel free to add questions in this issue.
If i create a new builder like this:
OakMapBuilder builder = new OakMapBuilder() .setKeySerializer(stringSer) .setValueSerializer(stringSer) .setComparator(comparator)
I get an exception:
java.lang.NullPointerException at com.oath.oak.Insert$BenchmarkState$1.calculateSize(Insert.java:75) at com.oath.oak.Insert$BenchmarkState$1.calculateSize(Insert.java:52) at com.oath.oak.InternalOakMap.<init>(InternalOakMap.java:66) at com.oath.oak.OakMap.<init>(OakMap.java:74) at com.oath.oak.OakMapBuilder.build(OakMapBuilder.java:95)
If this field is mandatory, it should be part of the OakMapBuilder constructor.
Add a simple example application that uses Oak. It is really open-ended issue. We would love to see any variation of usage and of course this can be an umbrella to many contributors to participate!
For more explanations feel free to add questions in this issue.
It would be nice to allow some of the properties of BlocksPool to be configurable.
Either via a configuration file and/or programmatically.
Specifically, it would be nice that the preallocated blocks would be a different setting than the number of new blocks to allocate, and that the number of excess blocks could be set via a ratio and/or via a specific number.
Also, the excess number of blocks should have a high and low threshold to prevent releasing blocks one at a time.
It might also help to set the number of excess blocks with proportion to the total number of blocks.
in the internalOakMap there is
private final AtomicInteger size;
this size is never decremented
Add design and implementation of new OakHash API. API only, without the underlying HashMap. Probably the merge can go into a separate branch.
Similar to OakMap, OakHash should be extending ConcurrentHashMap, but must also support Zero-Copy API methods. Most likely new ZeroCopyHash should extand or be similar to ZeroCopyMap. And OakHash should return ZeroCopyHash, having all interfaces.
For more explanations feel free to add questions in this issue.
Oak/core/src/test/java/com/oath/oak/IteratorModificationTest.java
Lines 194 to 236 in d369170
In line 211, the next()
sometimes returns null
instead of throwing a ConcurrentModificationException
.
In the picture attached that scenario can be seen.
To make Oak available to other projects, oak's releases should be available in some maven repository.
Here is an example how to publish.
loInclusive is never used and its functionality is not implemented
hiInclusive is used and implemented
i've noticed that the project demands specific standards to be met after every commit (e.g. running mvn clean package
). This can be automated by using either:
CircleCI
here's a guide for creating git actions with maven:
https://docs.github.com/en/actions/guides/building-and-testing-java-with-maven
In InternalOakMap.Iter next() function we do:
public T next() {
try {
memoryManager.startOperation();
return internalNext();
} finally {
memoryManager.stopOperation();
}
}
So why do we need start and stop operation in object constructor and close function?
There are 2 problems with this:
As it seems a user obtains an OakWBufferImpl when calling a compute().
Before returning this buffer to the user a writelock is taken.
However, OakWBuffer offers a transform() operation which acquires a readlock.
This cannot be done since a writelock is already held.
Many functions in oak api return a new resource that is supposed to be closed.
This is not documented and not clear from the user point of view.
For example Oak::subMap() returns a new oakmap that is supposed to be closed
JUnit does not fail the test when exceptions (including assertion errors), occur in spawned threads. The test does have exceptions, which should be addressed:
Exception in thread "Thread-4" java.util.ConcurrentModificationException
at com.oath.oak.InternalOakMap$Iter.initAfterRebalance(InternalOakMap.java:917)
at com.oath.oak.InternalOakMap$Iter.advance(InternalOakMap.java:938)
at com.oath.oak.InternalOakMap$EntryTransformIterator.next(InternalOakMap.java:1164)
at com.oath.oak.OffHeapOakTest$RunThreads.run(OffHeapOakTest.java:119)
at java.lang.Thread.run(Thread.java:748)
Add a flag to Oak, such that if turned on, OakMap is optimized for a single thread performance.
A flag should be added to OakBuilder to signal of creating an OakMap to be used by single thread. The flag should propagate to the internal Oak usage of CAS, where the later should be wrapped and based on the flag either CAS or simple write should be used.
The issue is not very complex but still requires understanding of OakMap design and internal classes structure. In addition to the running and correct version of single threaded Oak, the outcome of the issue should also be Oak benchmarks showing performance of single- versus multi-threaded OakMap.
For more explanations feel free to add questions in this issue.
Currently, Oak code standards are enforced by the maven code-style plugin.
The code-style rules are detailed in codestyle/checkstyle.xml
.
However, these rules are currently not enforced on the tests and benchmarks.
Please change the pom.xml
file to enforce code style check on all modules and fix all the existing code-style issues.
When a Block is released the "allocated" field is not set back to zero, so once a block is returned it cannot be used again ever. after some oak allocation iterations where no block is free (even if they really are) the jvm throws outofmemory exception.
Fix:
in Block.reset() call allocated.set(0);
When the rebalancer scans the source chuck, it skips entries with Handle index of -1, meaning that deleted Handles are copied to the newly created chunk.
If a rebalancer runs concurrently to the remove which caused the deletion of a Handle, this remove will not change the respective entry to have a Handle index of -1.
In such a case the remove would call removeRebalance, and do the main loop again.
The remove then returns without changing the Handle index to -1 because Chunk::lookup treats the entry as removed without ever completing its removal.
Add a test for existing and known bug of accessing a deleted slice from released chunk (causing infinite loop). This will require long and massive concurrent put and delete usage including some delays in the middle. This will help to check the fix easier and never fail into this problem again.
For more explanations feel free to add questions in this issue.
Create a map and call .clear()
:
java.lang.UnsupportedOperationException: remove
at java.base/java.util.Iterator.remove(Iterator.java:102)
at java.base/java.util.AbstractCollection.clear(AbstractCollection.java:431)
at java.base/java.util.AbstractMap.clear(AbstractMap.java:297)
In the current implementation 2 threads may return the same value of
(Thread.currentThread().getId() % Chunk.MAX_THREADS)
Causing everything to break.
Fix:
Each thread will get assigned an index on demand.
getThreadIndex becomes non static, and each thread that enters get its index from a map that maps threadId to index. first time it enters its index is taken from an atomic counter.
Java's collections all don't allow modifying one while iterating it, resulting in a ConcurrentModificationException
. If I want to remove some entries from an OakMap
while iterating through them, what is the proper way of achieving it?
For instance, I created a head map, tried to call clear()
on it, and failed with UnsupportedOperationException
. Iterator.remove
is not implemented for those returned from an OakMap
. And glancing the source code, I'm not quite sure if I can safely invoke remove
on an OakMap
, while iterating over its content.
Currently I'm iterating over the entries and calling remove
immediately after I'm done with each entry. Is it ok to do so?
OakMap requires less memory than other ConcurrentNavigableMap, to emphasize this we would like to have a benchmark presenting memory consumption. This should also help us to compare different variants of memory management (MM).
Add a benchmark that is going to measure the memory allocation for high churn of deletions and allocations, to show the privilege of Oak MM
For more explanations feel free to add questions in this issue.
Currently serializers code is in the test/ source directory, therefore they don't get packaged int the jar - would be great to:
Oak requests its users to provide serializers for their objects representing keys and values.
Oak has internal common package including sub-packages for common serializers for common objects: IntBuffer, Integer and String. It would be good to have more of such. What kind of objects? It is up to implementer decision.
@liran-funaro can you please add more details to this issue?
For more explanations feel free to add questions in this issue.
env
os : macosx aarch64
jdk: OpenJDK 64-Bit Server VM Zulu11.48+21-CA (11.0.11+9-LTS, mixed mode, tiered, compressed oops, g1 gc, bsd-aarch64)
oak version
<dependency>
<groupId>com.yahoo.oak</groupId>
<artifactId>oak</artifactId>
<version>0.2.3.1</version>
</dependency>
reproduce code
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.junit.Assert;
import org.junit.Test;
import com.yahoo.oak.OakMap;
import com.yahoo.oak.OakMapBuilder;
import com.yahoo.oak.common.integer.OakIntComparator;
import com.yahoo.oak.common.integer.OakIntSerializer;
import com.yahoo.oak.common.string.OakStringSerializer;
public class XOakMapTest {
/**
*
*/
@Test
public void test1() {
//
final OakMap<Integer, String> m;
m = oakmap1(256); Assert.assertTrue(m.size() == 0);
m.put(1, "123456789"); Assert.assertTrue(m.size() == 1);
String v1 = m.remove(1); assertEquals (v1, "123456789");
//
m.put(2, "654321"); Assert.assertTrue(m.size() == 1);
m.put(2, "654321"); Assert.assertTrue(m.size() == 1);
String v2 = m.get(2); Assert.assertEquals(v2, "654321");
m.close();
}
/**
*
*/
private static OakMap<Integer, String> oakmap1(int n) {
final int block = 1024 * 1024;
final OakMapBuilder<Integer, String> builder;
final OakIntComparator c = new OakIntComparator();
final OakIntSerializer ks = new OakIntSerializer();
final OakStringSerializer vs = new OakStringSerializer();
builder = new OakMapBuilder<>(c, ks, vs, Integer.MIN_VALUE);
builder.setPreferredBlockSize(block); return builder.build();
}
}
hs_err_pid74815.log
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGBUS (0xa) at pc=0x00000001056ffb8c, pid=74815, tid=9731
#
# JRE version: OpenJDK Runtime Environment Zulu11.48+21-CA (11.0.11+9) (build 11.0.11+9-LTS)
# Java VM: OpenJDK 64-Bit Server VM Zulu11.48+21-CA (11.0.11+9-LTS, mixed mode, tiered, compressed oops, g1 gc, bsd-aarch64)
# Problematic frame:
# V [libjvm.dylib+0x6ffb8c] Unsafe_CompareAndSetLong(JNIEnv_*, _jobject*, _jobject*, long, long, long)+0x110
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
# http://www.azulsystems.com/support/
#
more details see attachment
hs_err_pid74815.log
it works fine in windows 10, linux, macos x86_64
Looking for ideas for Oak usage/improvement. Any idea (hopefully concrete) were else Oak can be used is welcome. Also, please let us know: where do you think is the most important for Oak to improve and why.
To participate in this issue it is enough to learn about Oak and to leave here a comment with your thoughts! Easy!
Add code comments and wiki
Base all of the off-heap memory management on unsafe allocation and access, without ByteBuffer intermediate layer (in addition to existing memory management). The reason for the change is the possibility to gain better throughput without spending CPU cycles and memory on ByteBuffer internals.
That means a new memory manager to be based on using addresses instead of ByteBuffers and allocating the address from unsafe:
Unsafe().allocateMemory(size);
That requires a significant code writing: new (alternative) Block, BlockPool, MemoryAllocator, MemoryManager etc.
Oak/core/src/main/java/com/oath/oak/InternalOakMap.java
Lines 739 to 756 in d369170
Oak/core/src/main/java/com/oath/oak/Handle.java
Lines 205 to 218 in d369170
In the replace functions, the write lock is acquired once when calling Handle::mutatingFunction and another time when calling Handle::put inside.
It works in the production code because the lock is reentrant, however, this assumption is limiting.
Base all of the off-heap memory management on new Java14/15 VarHandle API, without unsafe usage. Other than memory management, the rest of the code should be easily transferred to JDK14. The reason for the change is to allow much wider using for Oak.
Implementing this issue requires a significant code writing: new (alternative) Block, BlockPool, MemoryAllocator, MemoryManager etc.
For more explanations feel free to add questions in this issue.
java version
openjdk 11
oak version
<dependency>
<groupId>com.yahoo.oak</groupId>
<artifactId>oak</artifactId>
<version>0.2.3.1</version>
</dependency>
reproduce code
package cn.nextop.gadget.fma;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
import com.yahoo.oak.OakMap;
import com.yahoo.oak.OakMapBuilder;
import com.yahoo.oak.common.integer.OakIntComparator;
import com.yahoo.oak.common.integer.OakIntSerializer;
import com.yahoo.oak.common.string.OakStringSerializer;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
* @author Jingqi Xu
*/
public class XOakMapTest {
/**
*
*/
@Test
public void test1() {
//
final int size = 1024;
final Random random = ThreadLocalRandom.current();
final OakMap<Integer, String> m1 = oakmap3(size);
final Map<Integer, String> m2 = new HashMap<>(size);
//
for(int i = 0; i < 100000; i++) {
final int k = random.nextInt(size);
final String v = String.valueOf(k);
switch (random.nextInt(12)) {
case 0:
Assert.assertEquals(m2.get(k), m1.get(k)); break;
case 2:
final String w2 = m2.put (k, v);
Assert.assertEquals (w2, m1.put(k, v));
Assert.assertEquals(m2.size(), m1.size()); break;
case 3:
m2.put(k, v); m1.zc().put(k, v);
Assert.assertEquals(m2.size(), m1.size()); break;
case 4:
final String w4 = m2.putIfAbsent(k, v);
assertEquals(w4, m1.putIfAbsent(k, v));
Assert.assertEquals(m2.size(), m1.size()); break;
case 5:
boolean w5 = m2.putIfAbsent(k, v) == null;
assertEquals(w5, m1.zc().putIfAbsent(k, v));
Assert.assertEquals(m2.size(), m1.size()); break;
case 6:
final String w6 = m2.remove(k);
Assert.assertEquals (w6, m1.remove(k));
Assert.assertEquals(m2.size(), m1.size()); break;
case 7:
final boolean w7 = m2.remove(k) != null;
Assert.assertEquals(w7, m1.zc().remove(k));
Assert.assertEquals(m2.size(), m1.size()); break;
case 8:
final String w8 = m1.replace(k, v);
assertEquals(w8, m2.replace(k, v));
Assert.assertEquals(m2.size(), m1.size()); break;
}
}
m1.close();
}
private static OakMap<Integer, String> oakmap3(int n) {
final int block = 1024 * 1024;
final OakMapBuilder<Integer, String> builder;
final OakIntComparator c = new OakIntComparator();
final OakIntSerializer ks = new OakIntSerializer();
final OakStringSerializer vs = new OakStringSerializer();
builder = new OakMapBuilder<>(c, ks, vs, Integer.MIN_VALUE);
return builder.setPreferredBlockSize(block).build();
}
}
exception stack
java.lang.AssertionError
at com.yahoo.oak.ValueUtilsImpl.lockWrite(ValueUtilsImpl.java:249)
at com.yahoo.oak.ValueUtilsImpl.exchange(ValueUtilsImpl.java:156)
at com.yahoo.oak.InternalOakMap.replace(InternalOakMap.java:939)
at com.yahoo.oak.OakMap.replace(OakMap.java:230)
at cn.nextop.gadget.fma.XOakMapTest.test1(XOakMapTest.java:83)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Would be great if you can add a check whether the comparator is null to the OakBuilder.build() method - it's stated in the documentation, but still good to have it to save developer time
Currently the iterators in Oak may return NULL (as value or entry) upon key that is concurrently deleted or not fully inserted. For example, look on class EntryTransformIterator method internalNext(). I am fixing this problem partially under branch OAK--MEMORY_ALLOC--V01. Need to check it fully.
Find and Solve a bug we haven't spotted in existing issues yet!
If while working on Hackathon you encountered a bug, and more than that have a way to solve it, please tell us here!
This is a collective issue where can be more than one contributor/reporter.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.