bulldog2011 / bigqueue Goto Github PK
View Code? Open in Web Editor NEWA big, fast and persistent queue based on memory mapped file.
Home Page: http://bulldog2011.github.com/bigqueue
License: Apache License 2.0
A big, fast and persistent queue based on memory mapped file.
Home Page: http://bulldog2011.github.com/bigqueue
License: Apache License 2.0
Is there a way to shuffle the values which are stored in the queue?
Hi,
I was was looking for a persistent queue for my projects, and I found bigqueue with great performance. I was testing with some test program and found following things :
When I call removeAll method of queue, it deletes all the data files. But it doesn't remove the folders created for the queue. So Is there any way through which I can remove all the files and folders of the queue?
A queue needs minimum 160MB memory for working which includes 32MB index file + 128MB data file. Now, In my use case I create a lot of queues and the data in those queues will rarely reach the size of 128MB, I don't want the queue to use this much disk size as in my case this is unnecessary. So Is there any way through which I can configure how much disk size it should use. Or the 128MB data file + 32MB index file is the default size in the queue
Please respond.
Thanks
Calling BigArrayImpl.removeBeforeIndex with the headIndex seems a valid use case to remove all elements from the big array, even tough you could achieve the same thing with a call to removeAll.
The following code:
public static void main(String[] args) throws Exception {
String basePath = new File(System.getProperty("java.io.tmpdir"), "test").getPath();
IBigArray bigArray = new BigArrayImpl(basePath, "demo");
try {
// append some items into the array
for(int i = 0; i < 2; ++i) {
bigArray.append(String.valueOf(i).getBytes());
}
// get current size of the array
System.out.println("size: " + bigArray.size());
for (long index = bigArray.getTailIndex(); index != bigArray.getHeadIndex(); index = (index == Long.MAX_VALUE ? 0 : index + 1)) {
String item = new String(bigArray.get(index));
System.out.println("item[" + index + "]=" + item);
}
bigArray.removeBeforeIndex(bigArray.getHeadIndex());
System.out.println("size: " + bigArray.size());
for (long index = bigArray.getTailIndex(); index != bigArray.getHeadIndex(); index = (index == Long.MAX_VALUE ? 0 : index + 1)) {
String item = new String(bigArray.get(index));
System.out.println("item[" + index + "]=" + item);
}
// empty the big array
bigArray.removeAll();
} finally {
bigArray.close();
}
}
outputs to the console:
size: 2
item[0]=0
item[1]=1
Exception in thread "main" java.lang.IndexOutOfBoundsException
at com.leansoft.bigqueue.BigArrayImpl.validateIndex(BigArrayImpl.java:458)
at com.leansoft.bigqueue.BigArrayImpl.removeBeforeIndex(BigArrayImpl.java:198)
at com.jorge.test.BigArrayMain.main(BigArrayMain.java:27)
Perhaps removeBeforeIndex should be changed to something like:
@Override
public void removeBeforeIndex(long index) throws IOException {
try {
arrayWriteLock.lock();
if (index == arrayHeadIndex.get()) {
this.indexPageFactory.deleteAllPages();
this.dataPageFactory.deleteAllPages();
this.metaPageFactory.deleteAllPages();
this.commonInit();
} else {
validateIndex(index);
long indexPageIndex = Calculator.div(index, INDEX_ITEMS_PER_PAGE_BITS);
ByteBuffer indexItemBuffer = this.getIndexItemBuffer(index);
long dataPageIndex = indexItemBuffer.getLong();
long toRemoveIndexPageTimestamp = this.indexPageFactory.getPageFileLastModifiedTime(indexPageIndex);
long toRemoveDataPageItemstamp = this.dataPageFactory.getPageFileLastModifiedTime(dataPageIndex);
if (toRemoveIndexPageTimestamp > 0L) {
this.indexPageFactory.deletePagesBefore(toRemoveIndexPageTimestamp);
}
if (toRemoveDataPageItemstamp > 0L) {
this.dataPageFactory.deletePagesBefore(toRemoveDataPageItemstamp);
}
// advance the tail to index
this.arrayTailIndex.set(index);
}
} finally {
arrayWriteLock.unlock();
}
}
Hi,
MappedPageImpl current using sun.misc.Cleaner to clean up the mapped byte buffer of the index/data page files.
class sun.misc.Cleaner doest not exist in JDK11, for those app run on JDK11 will have issue on disk release.
with Ubuntu 18/OpenJDK11, use below command you can see the deleted files, and there didn't didn't release, until restart the app.
lsof -p | grep DEL
use htop command will see the process with high VIRT (run to 100G+)
There have sun.misc.Unsafe.invokeCleaner(java.nio.ByteBuffer directBuffer) available able to resolve that issue.
The maven repo mentioned in README.md
<dependency>
<groupId>com.leansoft</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.0</version>
</dependency>
<repository>
<id>github.release.repo</id>
<url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>
the url shold be:
https://raw.githubusercontent.com/bulldog2011/bulldog-repo/master/repo/releases/
Hi,
I am currently using bigqueue in one of our projects. For many "queues" i am getting a "Map Failed" because of OutOfMemory
Now. The data i am trying to queue isnt huge. It is not greater than a few Kbs at the max. My machine has at least A gig of RAM free and disk space is not filled up as well. It is however a windows 7 machine (64 bit). Any clues to what might be causing this.
Following is the snippet of the relevant portions of the stacktrace:
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:748)
at com.leansoft.bigqueue.page.MappedPageFactoryImpl.acquirePage(MappedPageFactoryImpl.java:86)
at com.leansoft.bigqueue.BigArrayImpl.append(BigArrayImpl.java:325)
at com.leansoft.bigqueue.BigQueueImpl.enqueue(BigQueueImpl.java:92)
...
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
... 36 more
LRUCacheImpl's internal executorService is never shutdown. This can block application from stopping fast. So had to get a reference to this variable through reflection and call shutdown() manually
It is not consistent (so, you probably will need to run it several times to catch it):
Error Message
expected: but was:
Stacktrace
java.lang.AssertionError: expected: but was:
....
at com.leansoft.bigqueue.BigArrayUnitTest.removeBeforeTest(BigArrayUnitTest.java:121)
I create a bigqueue, and enqueue many items, then dequeue all.
But the size of bigqueue on disk will become bigger and bigger.
String queueDir = "/Users/ym/data/bigqueue";
String queueName = "demo";
IBigQueue bigQueue = null;
try {
bigQueue = new BigQueueImpl(queueDir, queueName);
for (int i = 0; i < 1000000; i++) {
String item = String.valueOf(i);
bigQueue.enqueue(item.getBytes());
}
// dequeue some items
for (int i = 0; i < 1000000; i++) {
String item = new String(bigQueue.dequeue());
System.out.println(item);
}
} finally {
bigQueue.close();
}
How to release space or reduce file ?
I have tried IBigQueue.gc() , but not working .
IBigQueue.removeAll() will reduce the file, but I only want to clear consumed items.
Hi @bulldog2011, @TobiasMende , @wjw465150 , @illya13
We use bigqueue implementation to buffer incoming data in our code base. We use a background thread to consume the queue (every 1.5 seconds). Recently we did some load test and published some data. We could observe the following exception occurred when we restart our server.
[2016-08-02 10:37:28,676] ERROR {org.wso2.carbon.analytics.dataservice.core.indexing.AnalyticsDataIndexer} - Error in processing index batch operations: null
java.lang.IndexOutOfBoundsException
at com.leansoft.bigqueue.BigArrayImpl.validateIndex(BigArrayImpl.java:458)
at com.leansoft.bigqueue.BigArrayImpl.get(BigArrayImpl.java:399)
at com.leansoft.bigqueue.BigQueueImpl.peek(BigQueueImpl.java:144)
at org.wso2.carbon.analytics.dataservice.core.indexing.LocalIndexDataStore$LocalIndexDataQueue.peekNext(LocalIndexDataStore.java:269)
I would like to know, under what conditions, the above exception can be thrown?
We use the bigqueue implementation here[1]
Appreciate your help!
Thanks,
Will this library work on Linux environment?
Hi there,
We are seeing a lot of 128MB data files left behind by big queue under heavy load. We have a thread that calls queue.gc() every minute, but it doesn't seem to clean up the .dat files. I am wondering if there is a way to check if those files still have undequeued files in them using a shell script?
Thanks!
Even though I really like your library I do get checksum warings when downloading via maven.
Checksums are a good thing but only if they match. :-)
Downloading: https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases//com/leansoft/bigqueue/0.7.0/bigqueue-0.7.0.pom
1K downloaded (bigqueue-0.7.0.pom)
[WARNING] *** CHECKSUM FAILED - Checksum failed on download: local = '21fb9eb5593a93a65f8f02741ed6004b133d268a'; remote = 'ea13c4542869c9d557c3f7581a36bc1082f8025b' - RETRYING
Downloading: https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases//com/leansoft/bigqueue/0.7.0/bigqueue-0.7.0.pom
1K downloaded (bigqueue-0.7.0.pom)
[WARNING] *** CHECKSUM FAILED - Checksum failed on download: local = '21fb9eb5593a93a65f8f02741ed6004b133d268a'; remote = 'ea13c4542869c9d557c3f7581a36bc1082f8025b' - IGNORING
Downloading: https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases//com/leansoft/bigqueue/0.7.0/bigqueue-0.7.0.jar
37K downloaded (bigqueue-0.7.0.jar)
Downloading: https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases//com/leansoft/bigqueue/0.7.0/bigqueue-0.7.0-sources.jar
24K downloaded (bigqueue-0.7.0-sources.jar)```
Hi,
recently I am getting the following warnings while using bigqueue:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.leansoft.bigqueue.page.MappedPageImpl$Cleaner (file:$HOME/.m2/repository/com/leansoft/bigqueue/0.7.0/bigqueue-0.7.0.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of com.leansoft.bigqueue.page.MappedPageImpl$Cleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
After a little bit of searching I learned this might be related to long time open bugs, which have been fixed and back ported:
Please be informed. Do you plan to fix it?
Btw. I am using bigqueue for a couple of years and want to thank you for your work.
Regards.
We use serval program language in our team, except java also use golang. so we develop golang version to support bigqueue which storage file full compatiable with Java. now we open it on github.
Any questions could visit https://github.com/jhunters/bigqueue
I think it's more safe and to work with Maven Central for publishing the JARs of this project. Our company (logz.io) would love to help setup this. WDYT?
We observed some data corruption on bigqueue and I can reproduce this. The workaround is deleting page files with the page file index itself, instead of checking the timestamp. I will send you the PR with the reproducible unit testing.
IBigQueue.applyForEach does not work - my iterator is never called with any element. No errors are thrown it just behaves as if the queue is empty which is definitely not - I tried queue and dequeue items also print the size and isEmpty just before applyForEach. I think the problem is here:
for (long i = index; i < this.innerArray.size(); i++) {
iterator.forEach(this.innerArray.get(i));
}
and more specifically it seems that innerArray.size() could return a negative value. I see it's not used on other places to maybe IBigArray.size() is faulty.
Hello!
Hope you are doing well.
I recently found you project and decided to play around. I built a really simple test:
IBigQueue bigQueue = new BigQueueImpl("bigqueue", "demo");
final String sampleData = "sdfsad hfbsfbjhsdabfha sfbsabfjhns fnhsba jbfsajdb flasb kf" +
"sfmk basn,kbfjas dbhfsd bfs jbsadfg sahg fjhkas gfhjkasb kjhasbf askhk skajdbgfshggasa mkgnkasg jksa" +
"js adhf ljashdjfhsajdk hfsadjh fjisahg fiusd hiusdfg ashjsa hfjkafsgasbhb gfjksagjsa gkj sag" +
"s akjfglsahijusaf hijlsahg iujsalsailkj sagkls jghasjkg asglj" +
"sa jkghlash gjlsahg jlksah dgj sahgjhs jghbsjka gfkjs jsa jksadghjkasghkj ashgfkjas hkjsaghjksa h" +
" sajgbljsag jksa;aposg hijsbgji sah soag ghhas;og asi; gwir g;osagjksa hgewah 'pwaj gas" +
" husgh ousa hgas gho;usahg o;ash fo;asgosagjh askgb ojuias ohgaho iusahgajui sghha;s g" +
"a sjiughuoashg douisah guilsahg ouisahdg ouisa goiuash gh;oas dhghj;sh l sadgh";
final long start = System.currentTimeMillis();
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out);
for (int i = 0; i < 10000000; i++) {
oos.writeObject(sampleData + i);
bigQueue.enqueue(out.toByteArray());
out.reset();
}
long middle = System.currentTimeMillis();
System.out.println((System.currentTimeMillis() - start ) / 1000);
while (bigQueue.size() > 0) {
bigQueue.dequeue();
}
System.out.println(((System.currentTimeMillis() - middle) / 1000));
bigQueue.gc();
and it results in
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.leansoft.bigqueue.page.MappedPageFactoryImpl.acquirePage(MappedPageFactoryImpl.java:105)
at com.leansoft.bigqueue.BigArrayImpl.append(BigArrayImpl.java:323)
at com.leansoft.bigqueue.BigQueueImpl.enqueue(BigQueueImpl.java:79)
at com.queue.App.main(App.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
I had written a program to test the performance of bigqueue, the program have 1 producer thread and 1 consumer thread. I'm adding messages in 1 thread and reading messages in another thread simultaneously. I have got following result :
Producer thread written 9 million messages with speed of around 125 MB/S.
Consumer thread managed to read just over 2 a million record in same time with speed of around 35 MB/S.
Now when I read separately the reading speed is around 95 MB/S.
The significant difference between the reading speed when I read sequentially and when I read randomly due is due to synchronization or this is abnormal and I'm doing it in wrong way.
Kindly let me know
For example, I created 20 java processes and both initialized a big queue instance based on same file (same directory and same "name"), 10 processes is producers and another 10 processes are consumers. Now can the queue handle concurrence between those processes, include performance and data safe?
Hi,
I was trying to write to a bigqueue from a java process and read from it from it from another process. But this resulted in the read process not able to see any data.
Is there any way to accomplish this using bigqueue?
Messages produced by producer are not visible to consumer on other JVM.
In real-time consumer, when the producer from other JVM on same machine enqueues data, it is not visible to the live consumer.
We need to restart /re-initialize the queue, by recreating the 'IBigQueue' instance.
Is it the expected behavior ? ie, we can only use BigQueue in single VM.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.