Giter Site home page Giter Site logo

openhft / chronicle-queue Goto Github PK

View Code? Open in Web Editor NEW
3.2K 169.0 521.0 28.79 MB

Micro second messaging that stores everything to disk

Home Page: http://chronicle.software/products/chronicle-queue/

License: Apache License 2.0

Java 99.84% Shell 0.16%
chronicle queue java performance low-latency persistance

chronicle-queue's Introduction

Chronicle Queue

About Chronicle Software

Overview

Chronicle Queue is a persisted low-latency messaging framework for high performance applications.

This project covers the Java version of Chronicle Queue. A C++ version of this project is also available and supports Java/C++ interoperability plus additional language bindings e.g. Python. If you are interested in evaluating the C++ version please contact [email protected].

At first glance Chronicle Queue can be seen as simply another queue implementation. However, it has major design choices that should be emphasised. Using off-heap storage, Chronicle Queue provides an environment where applications do not suffer from Garbage Collection (GC). When implementing high-performance and memory-intensive applications (you heard the fancy term "bigdata"?) in Java, one of the biggest problems is garbage collection.

Chronicle Queue allows messages to be added to the end of a queue ("appended"), read from the queue ("tailed"), and also supports random-access seek.

What Is Chronicle Queue?

You could consider a Chronicle Queue to be similar to a low latency broker-less durable/persisted topic that can contain messages of different types and sizes. Chronicle Queue is a distributed unbounded persisted queue that:

  • supports asynchronous RMI and Publish/Subscribe interfaces with microsecond latencies.

  • passes messages between JVMs in under a microsecond

  • passes messages between JVMs on different machines via replication in under 10 microseconds (Enterprise feature)

  • provides stable, soft real-time latencies into the millions of messages per second for a single thread to one queue; with total ordering of every event.

When publishing 40-byte messages, a high percentage of the time we achieve latencies under 1 microsecond. The 99th percentile latency is the worst 1 in 100, and the 99.9th percentile is the worst 1 in 1000 latency.

Table 1. Latency to send/receive on the same machine.
Batch Size 10 million events per minute 60 million events per minute 100 million events per minute

99%ile

0.78 µs

0.78 µs

1.2 µs

99.9%ile

1.2 µs

1.3 µs

1.5 µs

Table 2. Latency to send/receive on a second machine.
Batch Size 10 million events per minute 60 million events per minute 100 million events per minute

99%ile

20 µs

28 µs

176 µs

99.9%ile

901 µs

705 µs

5,370 µs

Note
100 million events per minute is sending an event every 660 nanoseconds; replicated and persisted.
Important
This performance is not achieved using a large cluster of machines. This is using one thread to publish, and one thread to consume.

Design Motivation and Features

Chronicle Queue is designed to:

  • be a "record everything store" which can read with microsecond real-time latency. This supports even the most demanding High Frequency Trading systems. However, it can be used in any application where the recording of information is a concern.

  • support reliable replication with notification to either the appender (writer of message) or a tailer (reader of message), when a message has been successfully replicated.

Persistence

Chronicle Queue assumes disk space is cheap compared with memory. Chronicle Queue makes full use of the disk space you have, and so you are not limited by the main memory of your machine. If you use spinning HDD, you can store many TBs of disk space for little cost.

The only extra software that Chronicle Queue needs to run is the operating system. It doesn’t have a broker; instead it uses your operating system to do all the work. If your application dies, the operating system keeps running for seconds longer, so no data is lost; even without replication.

As Chronicle Queue stores all saved data in memory-mapped files, this has a trivial on-heap overhead, even if you have over 100 TB of data.

Efficiency

Chronicle put significant effort into achieving very low latency. In other products which focus on support of web applications, latencies of less than 40 milliseconds are fine as they are faster than you can see; for example, the frame rate of cinema is 24 Hz, or about 40 ms.

Chronicle Queue aims to achieve latencies of under 40 microseconds for 99% to 99.99% of the time. Using Chronicle Queue without replication, we support applications with latencies below 40 microseconds end-to-end across multiple services. Often the 99% latency of Chronicle Queue is entirely dependent on the choice of operating system and hard disk sub-system.

Compression

Replication for Chronicle Queue supports Chronicle Wire Enterprise. This supports a real-time compression which calculates the deltas for individual objects, as they are written. This can reduce the size of messages by a factor of 10, or better, without the need for batching; that is, without introducing significant latency.

Chronicle Queue also supports LZW, Snappy, and GZIP compression. These formats however add significant latency. These are only useful if you have strict limitations on network bandwidth.

Delivery mode semantics

Chronicle Queue supports a number of semantics:

  • Every message is replayed on restart.

  • Only new messages are played on restart.

  • Restart from any known point using the index of the entry.

  • Replay only the messages you have missed. This is supported directly using the methodReader/methodWriter builders.

Using high resolution timings across machines

On most systems System.nanoTime() is roughly the number of nanoseconds since the system last rebooted (although different JVMs may behave differently). This is the same across JVMs on the same machine, but wildly different between machines. The absolute difference when it comes to machines is meaningless. However, the information can be used to detect outliers; you can’t determine what the best latency is, but you can determine how far off the best latencies you are. This is useful if you are focusing on the 99th percentile latencies. We have a class called RunningMinimum to obtain timings from different machines, while compensating for a drift in the nanoTime between machines. The more often you take measurements, the more accurate this running minimum is.

Compacting logs

Chronicle Queue manages storage by cycle. You can add a StoreFileListener which will notify you when a file is added, and when it is no longer retained. You can move, compress, or delete all the messages for a day, at once. NOTE : Unfortunately on Windows, if an IO operation is interrupted, it can close the underlying FileChannel.

Avoid Interrupts

Due to performance reasons, we have removed checking for interrupts in the chronicle queue code. Because of this, we recommend that you avoid using chronicle queue with code that generates interrupts. If you can not avoid generating interrupts then we suggest that you create a separate instance of Chronicle Queue per thread.

Usage

Chronicle Queue is most often used for producer-centric systems where you need to retain a lot of data for days or years. For statistics see Usage of Chronicle-Queue

Important
Chronicle Queue does not support operating off any network file system, be it NFS, AFS, SAN-based storage or anything else. The reason for this is those file systems do not provide all the required primitives for memory-mapped files Chronicle Queue uses. If any networking is needed (e.g. to make the data accessible to multiple hosts), the only supported way is Chronicle Queue Replication (Enterprise feature).

What is a producer-centric system?

Most messaging systems are consumer-centric. Flow control is implemented to avoid the consumer ever getting overloaded; even momentarily. A common example is a server supporting multiple GUI users. Those users might be on different machines (OS and hardware), different qualities of network (latency and bandwidth), doing a variety of other things at different times. For this reason it makes sense for the client consumer to tell the producer when to back off, delaying any data until the consumer is ready to take more data.

Chronicle Queue is a producer-centric solution and does everything possible to never push back on the producer, or tell it to slow down. This makes it a powerful tool, providing a big buffer between your system, and an upstream producer over which you have little, or no, control.

Market data

Market data publishers don’t give you the option to push back on the producer for long; if at all. A few of our users consume data from CME OPRA. This produces peaks of 10 million events per minute, sent as UDP packets without any retry. If you miss, or drop a packet, then it is lost. You have to consume and record those packets as fast as they come to you, with very little buffering in the network adapter. For market data in particular, real time means in a few microseconds; it doesn’t mean intra-day (during the day).

Chronicle Queue is fast and efficient, and has been used to increase the speed that data is passed between threads. In addition, it also keeps a record of every message passed allowing you to significantly reduce the amount of logging that you need to do.

Compliance systems

Compliance systems are required by more and more systems these days. Everyone has to have them, but no one wants to be slowed down by them. By using Chronicle Queue to buffer data between monitored systems and the compliance system, you don’t need to worry about the impact of compliance recording for your monitored systems. Again, Chronicle Queue can support millions of events per-second, per-server, and access data which has been retained for years.

Latency sensitive micro-services

Chronicle Queue supports low latency IPC (Inter Process Communication) between JVMs on the same machine in the order of magnitude of 1 microsecond; as well as between machines with a typical latency of 10 microseconds for modest throughputs of a few hundred thousands. Chronicle Queue supports throughputs of millions of events per second, with stable microsecond latencies.

Log replacement

A Chronicle Queue can be used to build state machines. All the information about the state of those components can be reproduced externally, without direct access to the components, or to their state. This significantly reduces the need for additional logging. However, any logging you do need can be recorded in great detail. This makes enabling DEBUG logging in production practical. This is because the cost of logging is very low; less than 10 microseconds. Logs can be replicated centrally for log consolidation. Chronicle Queue is being used to store 100+ TB of data, which can be replayed from any point in time.

Lambda Stream Processing

Non-batching streaming components are highly performant, deterministic, and reproducible. You can reproduce bugs which only show up after a million events played in a particular order, with accelerated realistic timings. This makes using Stream processing attractive for systems which need a high degree of quality outcomes.

Downloading Chronicle Queue

Releases are available on Maven Central as:

<dependency>
  <groupId>net.openhft</groupId>
  <artifactId>chronicle-queue</artifactId>
  <version><!--replace with the latest version, see below--></version>
</dependency>
Note
Classes that reside in either of the packages 'internal', 'impl', and 'main' (the latter containing various runnable main methods) and any sub-packages are not a part of the public API and may become subject to change at any time for any reason. See the respective package-info.java files for details.

Chronicle Queue Versions and Remarkable Changes

Changes from Version 4 to Version 5

In Chronicle Queue v5 tailers are now read-only, in Chronicle Queue v4 we had the concept of lazy indexing, where appenders would not write indexes but instead the indexing could be done by the tailer. We decided to drop lazy indexing in v5; making tailers read-only not only simplifies Chronicle Queue but also allows us to add optimisations elsewhere in the code.

The locking model of Chronicle Queue was changed in v5, in Chronicle Queue v4 the write lock (to prevent concurrent writes to the queue) exists in the .cq4 file. In v5 this was moved to a single file called a table store (metadata.cq4t). This simplifies the locking code internally as only the table store file has to be inspected.

You can use Chronicle Queue v5 to read messages written with Chronicle Queue v4, but this is not guaranteed to always work - if, for example, you created your v4 queue with wireType(WireType.FIELDLESS_BINARY) then Chronicle Queue v5 will not be able to read the queue’s header. We have some tests for v5 reading v4 queues but these are limited and all scenarios may not be supported.

You cannot use Chronicle Queue v5 to write to Chronicle Queue v4 queues.

Changes from Version 3 to Version 4

Chronicle Queue v4 is a complete re-write of Chronicle Queue that solves the following issues that existed in v3.

  • Without self-describing messages, users had to create their own functionality for dumping messages and long term storage of data. With v4 you don’t have to do this, but you can if you wish to.

  • Vanilla Chronicle Queue would create a file per thread. This is fine if the number of threads is controlled, however, many applications have little or no control over how many threads are used and this caused usability problems.

  • The configuration for Indexed and Vanilla Chronicle was entirely in code so the reader had to have the same configuration as the writers and it wasn’t always clear what that was.

  • There was no way for the producer to know how much data had been replicated to the a second machine. The only workaround was to replicate data back to the producers.

  • You needed to specify the size of data to reserve before you started to write your message.

  • You needed to do your own locking for the appender when using Indexed Chronicle.

Migrating from Chronicle Queue v2 and v3

In Chronicle Queue v3, everything was in terms of bytes, not wire. There are two ways to use byte in Chronicle Queue v4. You can use the writeBytes and readBytes methods, or you can get the bytes() from the wire. For example:

Writing and reading bytes using a lambda
appender.writeBytes(b -> b.writeInt(1234).writeDouble(1.111));

boolean present = tailer.readBytes(b -> process(b.readInt(), b.readDouble()));
Writing to a queue without using a lambda
try (DocumentContext dc = appender.writingDocument()) {
    Bytes<?> bytes = dc.wire().bytes();
    // write to bytes
}

try (DocumentContext dc = tailer.readingDocument()) {
    if (dc.isPresent()) {
        Bytes<?> bytes = dc.wire().bytes();
        // read from bytes
    }
}

Chronicle Queue Enterprise Edition

Chronicle Queue Enterprise Edition is a commercially supported version of our successful open source Chronicle Queue. The open source documentation is extended by the following documents to describe the additional features that are available when you are licenced for Enterprise Edition. These are:

  • Encryption of message queues and messages. For more information see Encryption.

  • TCP/IP (and optionally UDP) Replication between hosts to ensure real-time backup of all your queue data. For more information see Replication, the queue replication protocol is covered in TCP/IP Replication Protocol.

  • Timezone support for daily queue rollover scheduling. For more information see Timezone support.

  • Async mode support to give improved performance at high throughput on slower filesystems. For more information see async mode and also performance.

  • Pretoucher For improved outliers, see Pretoucher and its configuration

In addition, you will be fully supported by our technical experts.

For more information on Chronicle Queue Enterprise Edition, please contact [email protected].

User Guide

A Chronicle Queue is defined by SingleChronicleQueue.class that is designed to support:

  • rolling files on a daily, weekly or hourly basis,

  • concurrent writers on the same machine,

  • concurrent readers on the same machine or across multiple machines via TCP replication (With Chronicle Queue Enterprise),

  • concurrent readers and writers between Docker or other containerised workloads

  • zero copy serialization and deserialization,

  • millions of writes/reads per second on commodity hardware.

Approximately 5 million messages/second for 96-byte messages on a i7-4790 processor. A queue directory structure is as follows:

base-directory /
   {cycle-name}.cq4       - The default format is yyyyMMdd for daily rolling.

The format consists of size-prefixed bytes which are formatted using BinaryWire or TextWire. Chronicle Queue is designed to be driven from code. You can easily add an interface which suits your needs.

Note
Due to fairly low-level operation, Chronicle Queue read/write operations can throw unchecked exceptions. In order to prevent thread death, it might be practical to catch RuntimeExceptions and log/analyze them as appropriate.
Note
For demonstrations of how Chronicle Queue can be used see Chronicle Queue Demo and for Java documentation see Chronicle Queue JavaDocs

In the following sections, first we introduce some terminology and a quick reference to use Chronicle Queue. Then, we provide a more detailed guide.

Key Concepts and Terminology

Chronicle Queue is a persisted journal of messages which supports concurrent writers and readers even across multiple JVMs on the same machine. Every reader sees every message, and a reader can join at any time and still see every message.

Note
We deliberately avoid the term consumer and instead use reader as messages are not consumed/destroyed by reading.

Chronicle queue has the following main concepts:

  • Excerpt

Excerpt is the main data container in a Chronicle Queue. In other words, each Chronicle Queue is composed of excerpts. Writing message to a Chronicle Queue means starting a new excerpt, writing message into it, and finishing the excerpt at the end.

  • Appender

An appender is the source of messages; something like an iterator in Chronicle environment. You add data appending the current Chronicle Queue. It can perform sequential writes by appending to the end of queue only. There is no way to insert, or delete excerpts.

  • Tailer

A tailer is an excerpt reader optimized for sequential reads. It can perform sequential and random reads, both forwards and backwards. Tailers read the next available message each time they are called. The followings are guaranteed in Chronicle Queue:

  • for each appender, messages are written in the order the appender wrote them. Messages by different appenders are interleaved,

  • for each tailer, it will see every message for a topic in the same order as every other tailer,

  • when replicated, every replica has a copy of every message.

Chronicle Queue is broker-less. If you need an architecture with a broker, please contact [email protected].

  • File rolling and queue files

Chronicle Queue is designed to roll its files depending on the roll cycle chosen when queue is created (see RollCycles). In other words, a queue file is created for each roll cycle which has extension cq4. When the roll cycle reaches the point it should roll, appender will atomically write EOF mark at the end of current file to indicate that no other appender should write to this file and no tailer should read further, and instead everyone should use new file.

If the process was shut down, and restarted later when the roll cycle should be using a new file, an appender will try to locate old files and write an EOF mark in them to help tailers reading them.

  • Topics

Each topic is a directory of queue files. If you have a topic called mytopic, the layout could look like this:

mytopic/
    20160710.cq4
    20160711.cq4
    20160712.cq4
    20160713.cq4

To copy all the data for a single day (or cycle), you can copy the file for that day on to your development machine for replay testing.

  • Restrictions on topics and messages

Topics are limited to being strings which can be used as directory names. Within a topic, you can have sub-topics which can be any data type that can be serialized. Messages can be any serializable data.

Chronicle Queue supports:

  • Serializable objects, though this is to be avoided as it is not efficient

  • Externalizable objects is preferred if you wish to use standard Java APIs.

  • byte[] and String

  • Marshallable; a self describing message which can be written as YAML, Binary YAML, or JSON.

  • BytesMarshallable which is low-level binary, or text encoding.

Quick start

This section provides a quick reference for using Chronicle Queue to briefly show how to create, write/read into/from a queue.

  • Chronicle Queue construction

Creating an instance of Chronicle Queue is different from just calling a constructor. To create an instance you have to use the ChronicleQueueBuilder.

String basePath = OS.getTarget() + "/getting-started"
ChronicleQueue queue = SingleChronicleQueueBuilder.single(basePath).build();

In this example we have created an IndexedChronicle which creates two RandomAccessFiles; one for indexes, and one for data having names relatively:

${java.io.tmpdir}/getting-started/{today}.cq4
  • Writing to a queue

// Obtains an ExcerptAppender
ExcerptAppender appender = queue.acquireAppender();

// Writes: {msg: TestMessage}
appender.writeDocument(w -> w.write("msg").text("TestMessage"));

// Writes: TestMessage
appender.writeText("TestMessage");
  • Reading from a queue

// Creates a tailer
ExcerptTailer tailer = queue.createTailer();

tailer.readDocument(w -> System.out.println("msg: " + w.read(()->"msg").text()));

assertEquals("TestMessage", tailer.readText());

Also, the ChronicleQueue.dump() method can be used to dump the raw contents as a string.

queue.dump();
  • Cleanup

Chronicle Queue stores its data off-heap, and it is recommended that you call close() once you have finished working with Chronicle Queue, to free resources.

Note
No data will be lost if you do this. This is only to clean up resources that were used.
queue.close();
  • Putting it all together

try (ChronicleQueue queue = SingleChronicleQueueBuilder.single("queue-dir").build()) {
    // Obtain an ExcerptAppender
    ExcerptAppender appender = queue.acquireAppender();

    // Writes: {msg: TestMessage}
    appender.writeDocument(w -> w.write("msg").text("TestMessage"));

    // Writes: TestMessage
    appender.writeText("TestMessage");

    ExcerptTailer tailer = queue.createTailer();

    tailer.readDocument(w -> System.out.println("msg: " + w.read(()->"msg").text()));

    assertEquals("TestMessage", tailer.readText());
}

Detailed Guide

You can configure a Chronicle Queue using its configuration parameters or system properties. In addition, there are different ways of writing/reading into/from a queue such as the use of proxies and using MethodReader and MethodWriter.

Queue configuration

Chronicle Queue (CQ) can be configured via a number of methods on the SingleChronicleQueueBuilder class. A few of the parameters that were most queried by our customers are explained below .

  • RollCycle

The RollCycle parameter configures the rate at which CQ will roll the underlying queue files. For instance, using the following code snippet will result in the queue files being rolled (i.e. a new file created) every hour:

ChronicleQueue.singleBuilder(queuePath).rollCycle(RollCycles.HOURLY).build()

Once a queue’s roll-cycle has been set, it cannot be changed at a later date. Any further instances of SingleChronicleQueue configured to use the same path should be configured to use the same roll-cycle, and if they are not, then the roll-cycle will be updated to match the persisted roll-cycle. In this case, a warning log message will be printed in order to notify the library user of the situation:

// Creates a queue with roll-cycle MINUTELY
try (ChronicleQueue minuteRollCycleQueue = ChronicleQueue.singleBuilder(queueDir).rollCycle(MINUTELY).build()) {

    // Creates a queue with roll-cycle HOURLY
    try (ChronicleQueue hourlyRollCycleQueue = ChronicleQueue.singleBuilder(queueDir).rollCycle(HOURLY).build()) {

        try (DocumentContext documentContext = hourlyRollCycleQueue.acquireAppender().writingDocument()) {
            documentContext.wire().write("somekey").text("somevalue");
        }
    }
    // Now try to append using the queue configured with roll-cycle MINUTELY
    try (DocumentContext documentContext2 = minuteRollCycleQueue.acquireAppender().writingDocument()) {
        documentContext2.wire().write("otherkey").text("othervalue");
    }
}

console output:

[main] WARN SingleChronicleQueueBuilder - Overriding roll cycle from HOURLY to MINUTELY.

The maximum number of messages that can be stored in a queue file depends on roll cycle. See FAQ for more information on this.

In Chronicle Queue, the rollover time is based on UTC. The Timezone Rollover Enterprise feature extends Chronicle Queue’s ability to specify the time and periodicity of queue rollovers, rather than UTC. For more information see Timezone Queue Rollover.

The Chronicle Queue FileUtil class provides useful methods for managing queue files. See Managing Roll Files Directly.

  • wireType

It’s possible to configure how Chronicle Queue will store the data by explicitly set the WireType:

// Creates a queue at "queuePath" and sets the WireType
SingleChronicleQueueBuilder.builder(queuePath, wireType)

For example:

// Creates a queue with default WireType: BINARY_LIGHT
ChronicleQueue.singleBuilder(queuePath)

// Creates a queue and sets the WireType as FIELDLESS_BINARY
SingleChronicleQueueBuilder.fieldlessBinary(queuePath)

// Creates a queue and sets the WireType as DEFAULT_ZERO_BINARY
SingleChronicleQueueBuilder.defaultZeroBinary(queuePath)

// Creates a queue and sets the WireType as DELTA_BINARY
SingleChronicleQueueBuilder.deltaBinary(queuePath)

Although it’s possible to explicitly provide WireType when creating a builder, it is discouraged as not all wire types are supported by Chronicle Queue yet. In particular, the following wire types are not supported:

  • TEXT (and essentially all based on text, including JSON and CSV)

  • RAW

  • READ_ANY

  • blockSize

When a queue is read/written, part of the file currently being read/written is mapped to a memory segment. This parameter controls the size of the memory mapping block. You can change this parameter using the method SingleChronicleQueueBuilder.blockSize(long blockSize) if it is necessary.

Note
You should avoid changing blockSize unnecessarily.

If you are sending large messages then you should set a large blockSize i.e. the blockSize should be at least four times the message size.

Warning
If you use small blockSize for large messages you receive an IllegalStateException and the write is aborted.

We recommend that you use the same blockSize for each queue instance when replicating queues, the blockSize is not written to the queue’s metadata, so should ideally be set to the same value when creating your instances of chronicle queue (this is recommended but if you wish to run with a different blocksize you can).

Tip
Use the same blockSize for each instance of replicated queues.
  • indexSpacing

This parameter shows the space between excerpts that are explicitly indexed. A higher number means higher sequential write performance but slower random access read. The sequential read performance is not affected by this property. For example, the following default index spacing can be returned:

  • 16 (MINUTELY)

  • 64 (DAILY)

You can change this parameter using the method SingleChronicleQueueBuilder.indexSpacing(int indexSpacing).

  • indexCount

The size of each index array, as well as the total number of index arrays per queue file.

Note
indexCount2 is the maximum number of indexed queue entries.
Note
See Section Excerpt indexing in Chronicle Queue of this User Guide for more information and examples of using indexes.
  • readBufferMode, writeBufferMode

These parameters define BufferMode for reads or writes that have the following options:

  • None - The default (and the only one available for open source users), no buffering;

  • Copy - used in conjunction with encryption;

  • Asynchronous - use an asynchronous buffer when reading and/or writing, provided by Chronicle Async Mode.

  • bufferCapacity

RingBuffer capacity in bytes when using bufferMode: Asynchronous


Writing to a queue using an appender

In Chronicle Queue we refer to the act of writing your data to the Chronicle Queue, as storing an excerpt. This data could be made up from any data type, including text, numbers, or serialised blobs. Ultimately, all your data, regardless of what it is, is stored as a series of bytes.

Just before storing your excerpt, Chronicle Queue reserves a 4-byte header. Chronicle Queue writes the length of your data into this header. This way, when Chronicle Queue comes to read your excerpt, it knows how long each blob of data is. We refer to this 4-byte header, along with your excerpt, as a document. Strictly speaking Chronicle Queue can be used to read and write documents.

Note
Within this 4-byte header we also reserve a few bits for a number of internal operations, such as locking, to make Chronicle Queue thread-safe across both processors and threads. The important thing to note is that because of this, you can’t strictly convert the 4 bytes to an integer to find the length of your data blob.

As stated before, Chronicle Queue uses an appender to write to the queue and a tailer to read from the queue. Unlike other java queuing solutions, messages are not lost when they are read with a tailer. This is covered in more detail in the section below on "Reading from a queue using a tailer". To write data to a Chronicle Queue, you must first create an appender:

try (ChronicleQueue queue = ChronicleQueue.singleBuilder(path + "/trades").build()) {
   final ExcerptAppender appender = queue.acquireAppender();
}

Chronicle Queue uses the following low-level interface to write the data:

try (final DocumentContext dc = appender.writingDocument()) {
      dc.wire().write().text(“your text data“);
}

The close on the try-with-resources, is the point when the length of the data is written to the header. You can also use the DocumentContext to find out the index that your data has just been assigned (see below). You can later use this index to move-to/look up this excerpt. Each Chronicle Queue excerpt has a unique index.

try (final DocumentContext dc = appender.writingDocument()) {
    dc.wire().write().text(“your text data“);
    System.out.println("your data was store to index="+ dc.index());
}

The high-level methods below such as writeText() are convenience methods on calling appender.writingDocument(), but both approaches essentially do the same thing. The actual code of writeText(CharSequence text) looks like this:

/**
 * @param text the message to write
 */
void writeText(CharSequence text) {
    try (DocumentContext dc = writingDocument()) {
        dc.wire().bytes().append8bit(text);
    }
}

So you have a choice of a number of high-level interfaces, down to a low-level API, to raw memory.

This is the highest-level API which hides the fact you are writing to messaging at all. The benefit is that you can swap calls to the interface with a real component, or an interface to a different protocol.

// using the method writer interface.
RiskMonitor riskMonitor = appender.methodWriter(RiskMonitor.class);
final LocalDateTime now = LocalDateTime.now(Clock.systemUTC());
riskMonitor.trade(new TradeDetails(now, "GBPUSD", 1.3095, 10e6, Side.Buy, "peter"));

You can write a "self-describing message". Such messages can support schema changes. They are also easier to understand when debugging or diagnosing problems.

// writing a self describing message
appender.writeDocument(w -> w.write("trade").marshallable(
        m -> m.write("timestamp").dateTime(now)
                .write("symbol").text("EURUSD")
                .write("price").float64(1.1101)
                .write("quantity").float64(15e6)
                .write("side").object(Side.class, Side.Sell)
                .write("trader").text("peter")));

You can write "raw data" which is self-describing. The types will always be correct; position is the only indication as to the meaning of those values.

// writing just data
appender.writeDocument(w -> w
        .getValueOut().int32(0x123456)
        .getValueOut().int64(0x999000999000L)
        .getValueOut().text("Hello World"));

You can write "raw data" which is not self-describing. Your reader must know what this data means, and the types that were used.

// writing raw data
appender.writeBytes(b -> b
        .writeByte((byte) 0x12)
        .writeInt(0x345678)
        .writeLong(0x999000999000L)
        .writeUtf8("Hello World"));

Below, the lowest level way to write data is illustrated. You get an address to raw memory and you can write whatever you want.

// Unsafe low level
appender.writeBytes(b -> {
    long address = b.address(b.writePosition());
    Unsafe unsafe = UnsafeMemory.UNSAFE;
    unsafe.putByte(address, (byte) 0x12);
    address += 1;
    unsafe.putInt(address, 0x345678);
    address += 4;
    unsafe.putLong(address, 0x999000999000L);
    address += 8;
    byte[] bytes = "Hello World".getBytes(StandardCharsets.ISO_8859_1);
    unsafe.copyMemory(bytes, Jvm.arrayByteBaseOffset(), null, address, bytes.length);
    b.writeSkip(1 + 4 + 8 + bytes.length);
});

You can print the contents of the queue. You can see the first two, and last two messages store the same data.

// dump the content of the queue
System.out.println(queue.dump());

prints:

# position: 262568, header: 0
--- !!data #binary
trade: {
  timestamp: 2016-07-17T15:18:41.141,
  symbol: GBPUSD,
  price: 1.3095,
  quantity: 10000000.0,
  side: Buy,
  trader: peter
}
# position: 262684, header: 1
--- !!data #binary
trade: {
  timestamp: 2016-07-17T15:18:41.141,
  symbol: EURUSD,
  price: 1.1101,
  quantity: 15000000.0,
  side: Sell,
  trader: peter
}
# position: 262800, header: 2
--- !!data #binary
!int 1193046
168843764404224
Hello World
# position: 262830, header: 3
--- !!data #binary
000402b0       12 78 56 34 00 00  90 99 00 90 99 00 00 0B   ·xV4·· ········
000402c0 48 65 6C 6C 6F 20 57 6F  72 6C 64                Hello Wo rld
# position: 262859, header: 4
--- !!data #binary
000402c0                                               12                 ·
000402d0 78 56 34 00 00 90 99 00  90 99 00 00 0B 48 65 6C xV4····· ·····Hel
000402e0 6C 6F 20 57 6F 72 6C 64                          lo World

Reading from a queue using a tailer

Reading the queue follows the same pattern as writing, except there is a possibility there is not a message when you attempt to read it.

Start Reading
try (ChronicleQueue queue = ChronicleQueue.singleBuilder(path + "/trades").build()) {
   final ExcerptTailer tailer = queue.createTailer();
}

You can turn each message into a method call based on the content of the message, and have Chronicle Queue automatically deserialize the method arguments. Calling reader.readOne() will automatically skip over (filter out) any messages that do not match your method reader.

// reading using method calls
RiskMonitor monitor = System.out::println;
MethodReader reader = tailer.methodReader(monitor);
// read one message
assertTrue(reader.readOne());

You can decode the message yourself.

Note
The names, type, and order of the fields doesn’t have to match.
assertTrue(tailer.readDocument(w -> w.read("trade").marshallable(
        m -> {
            LocalDateTime timestamp = m.read("timestamp").dateTime();
            String symbol = m.read("symbol").text();
            double price = m.read("price").float64();
            double quantity = m.read("quantity").float64();
            Side side = m.read("side").object(Side.class);
            String trader = m.read("trader").text();
            // do something with values.
        })));

You can read self-describing data values. This will check the types are correct, and convert as required.

assertTrue(tailer.readDocument(w -> {
    ValueIn in = w.getValueIn();
    int num = in.int32();
    long num2 = in.int64();
    String text = in.text();
    // do something with values
}));

You can read raw data as primitives and strings.

assertTrue(tailer.readBytes(in -> {
    int code = in.readByte();
    int num = in.readInt();
    long num2 = in.readLong();
    String text = in.readUtf8();
    assertEquals("Hello World", text);
    // do something with values
}));

or, you can get the underlying memory address and access the native memory.

assertTrue(tailer.readBytes(b -> {
    long address = b.address(b.readPosition());
    Unsafe unsafe = UnsafeMemory.UNSAFE;
    int code = unsafe.getByte(address);
    address++;
    int num = unsafe.getInt(address);
    address += 4;
    long num2 = unsafe.getLong(address);
    address += 8;
    int length = unsafe.getByte(address);
    address++;
    byte[] bytes = new byte[length];
    unsafe.copyMemory(null, address, bytes, Jvm.arrayByteBaseOffset(), bytes.length);
    String text = new String(bytes, StandardCharsets.UTF_8);
    assertEquals("Hello World", text);
    // do something with values
}));
Note
Every tailer sees every message.

An abstraction can be added to filter messages, or assign messages to just one message processor. However, in general you only need one main tailer for a topic, with possibly, some supporting tailers for monitoring etc.

As Chronicle Queue doesn’t partition its topics, you get total ordering of all messages within that topic. Across topics, there is no guarantee of ordering; if you want to replay deterministically from a system which consumes from multiple topics, we suggest replaying from that system’s output.


Tailers and file handlers clean up

Chronicle Queue tailers may create file handlers, the file handlers are cleaned up whenever the associated chronicle queue’s close() method is invoked or whenever the Jvm runs a Garbage Collection. If you are writing your code not have GC pauses and you explicitly want to clean up the file handlers, you can call the following:

((StoreTailer)tailer).releaseResources()

Using ExcerptTailer.toEnd()

In some applications, it may be necessary to start reading from the end of the queue (e.g. in a restart scenario). For this use-case, ExcerptTailer provides the toEnd() method. When the tailer direction is FORWARD (by default, or as set by the ExcerptTailer.direction method), then calling toEnd() will place the tailer just after the last existing record in the queue. In this case, the tailer is now ready for reading any new records appended to the queue. Until any new messages are appended to the queue, there will be no new DocumentContext available for reading:

// this will be false until new messages are appended to the queue
boolean messageAvailable = tailer.toEnd().readingDocument().isPresent();

If it is necessary to read backwards through the queue from the end, then the tailer can be set to read backwards:

ExcerptTailer tailer = queue.createTailer();
tailer.direction(TailerDirection.BACKWARD).toEnd();

When reading backwards, then the toEnd() method will move the tailer to the last record in the queue. If the queue is not empty, then there will be a DocumentContext available for reading:

// this will be true if there is at least one message in the queue
boolean messageAvailable = tailer.toEnd().direction(TailerDirection.BACKWARD).
        readingDocument().isPresent();

Restartable tailers

AKA named tailers.

It can be useful to have a tailer which continues from where it was up to on restart of the application.

try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(tmp).build()) {
    ExcerptTailer atailer = cq.createTailer("a");
    assertEquals("test 0", atailer.readText());
    assertEquals("test 1", atailer.readText());
    assertEquals("test 2", atailer.readText()); // (1)

    ExcerptTailer btailer = cq.createTailer("b");
    assertEquals("test 0", btailer.readText()); // (3)
}

try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(tmp).build()) {
    ExcerptTailer atailer = cq.createTailer("a");
    assertEquals("test 3", atailer.readText()); // (2)
    assertEquals("test 4", atailer.readText());
    assertEquals("test 5", atailer.readText());

    ExcerptTailer btailer = cq.createTailer("b");
    assertEquals("test 1", btailer.readText()); // (4)
}
  1. Tailer "a" last reads message 2

  2. Tailer "a" next reads message 3

  3. Tailer "b" last reads message 0

  4. Tailer "b" next reads message 1

This is from the RestartableTailerTest where there are two tailers, each with a unique name. These tailers store their index within the Queue itself and this index is maintained as the tailer uses toStart(), toEnd(), moveToIndex() or reads a message.

Note
The direction() is not preserved across restarts, only the next index to be read.
Note
The index of a tailer is only progressed when the DocumentContext.close() is called. If this is prevented by an error, the same message will be read on each restart.

Command line tools - reading and writing a Chronicle Queue

Chronicle Queue stores its data in binary format, with a file extension of cq4:

\��@π�header∂�SCQStoreÇE���»wireType∂�WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂�SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•��ÃindexSpacing�Àindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé������ߡˇˇˇˇˇˇˇ»recovery∂�TimedStoreRecoveryÇ����…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

This can often be a bit difficult to read, so it is better to dump the cq4 files as text. This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.

You can dump the queue to the terminal using net.openhft.chronicle.queue.main.DumpMain or net.openhft.chronicle.queue.ChronicleReaderMain. DumpMain performs a simple dump to the terminal while ChronicleReaderMain handles more complex operations, e.g. tailing a queue. They can both be run from the command line in a number of ways described below.


DumpMain

If you have a project pom file that includes the Chronicle-Queue artifact, you can read a cq4 file with the following command:

$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"

In the above command myqueue is the directory containing your .cq4 files

You can also set up any dependent files manually. This requires the chronicle-queue.jar, from any version 4.5.3 or later, and that all dependent files are present on the class path. The dependent jars are listed below:

$ ls -ltr
total 9920
-rw-r--r--  1 robaustin  staff   112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r--  1 robaustin  staff   209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r--  1 robaustin  staff   136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r--  1 robaustin  staff    33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r--  1 robaustin  staff    33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r--  1 robaustin  staff   324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r--  1 robaustin  staff    35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r--  1 robaustin  staff   344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r--  1 robaustin  staff   124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r--  1 robaustin  staff  4198400 28 Jul 15:06 19700101-02.cq4
Tip
To find out which version of jars to include please, refer to the chronicle-bom.

Once the dependencies are present on the class path, you can run:

$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4

This will dump the 19700101-02.cq4 file out as text, as shown below:

!!meta-data #binary
header: !SCQStore {
  wireType: !WireType BINARY,
  writePosition: 0,
  roll: !SCQSRoll {
    length: !int 3600000,
    format: yyyyMMdd-HH,
    epoch: !int 3600000
  },
  indexing: !SCQSIndexing {
    indexCount: !short 4096,
    indexSpacing: 4,
    index2Index: 0,
    lastIndex: 0
  },
  lastAcknowledgedIndexReplicated: -1,
  recovery: !TimedStoreRecovery {
    timeStamp: 0
  }
}

...
# 4198044 bytes remaining
Note
The example above does not show any user data, because no user data was written to this example file.

There is also a script named dump_queue.sh located in the Chonicle-Queue/bin-folder that gathers the needed dependencies in a shaded jar and uses it to dump the queue with DumpMain. The script can be run from the Chronicle-Queue root folder like this:

$ ./bin/dump_queue.sh <file path>

Reading a queue using ChronicleReaderMain

The second tool for logging the contents of the chronicle queue is the ChronicleReaderMain (in the Chronicle Queue project). As mentioned above, it is able to perform several operations beyond printing the file content to the console. For example, it can be used to tail a queue to detect whenever new messages are added (rather like $tail -f).

Below is the command line interface used to configure ChronicleReaderMain:

usage: ChronicleReaderMain
 -d <directory>       Directory containing chronicle queue files
 -e <exclude-regex>   Do not display records containing this regular
                      expression
 -f                   Tail behaviour - wait for new records to arrive
 -h                   Print this help and exit
 -i <include-regex>   Display records containing this regular expression
 -l                   Squash each output message into a single line
 -m <max-history>     Show this many records from the end of the data set
 -n <from-index>      Start reading from this index (e.g. 0x123ABE)
 -r <interface>       Use when reading from a queue generated using a MethodWriter
 -s                   Display index
 -w <wire-type>       Control output i.e. JSON

Just as with DumpQueue you need the classes in the example above present on the class path. This can again be achieved by manually adding them and then run:

$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>

Another option is to create an Uber Jar using the Maven shade plugin. It is configured as follows:

 <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <includes>
                                    <include>net/openhft/**</include>
                                    <include>software/chronicle/**</include>
                                </includes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Once the Uber jar is present, you can run ChronicleReaderMain from the command line via:

java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"

Lastly, there is a script for running the reader named queue_reader.sh which again is located in the Chonicle-Queue/bin-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain. The script can be run from the Chronicle-Queue root folder like this:

$ ./bin/queue_reader.sh <options>

Writing into a queue using ChronicleWriter

If using MethodReader and MethodWriter then you can write single-argument method calls to a queue using net.openhft.chronicle.queue.ChronicleWriterMain or the shell script queue_writer.sh e.g.

usage: ChronicleWriterMain files.. -d <directory> [-i <interface>] -m <method>
Missing required options: m, d
 -d <directory>   Directory containing chronicle queue to write to
 -i <interface>   Interface to write via
 -m <method>      Method name

If you want to write to the below "doit" method

public interface MyInterface {
    void doit(DTO dto);
}
public class DTO extends SelfDescribingMarshallable {
    private int age;
    private String name;
}

Then you can call ChronicleWriterMain -d queue doit x.yaml with either (or both) of the below Yamls:

{
  age: 19,
  name: Henry
}

or

!x.y.z.DTO {
  age: 42,
  name: Percy
}

If DTO makes use of custom serialisation then you should specify the interface to write to with -i


High level interface for reading/writing

Chronicle v4.4+ supports the use of proxies to write and read messages. You start by defining an asynchronous interface, where all methods have:

  • arguments which are only inputs

  • no return value or exceptions expected.

A simple asynchronous interface
import net.openhft.chronicle.wire.SelfDescribingMarshallable;
interface MessageListener {
    void method1(Message1 message);

    void method2(Message2 message);
}

static class Message1 extends SelfDescribingMarshallable {
    String text;

    public Message1(String text) {
        this.text = text;
    }
}

static class Message2 extends SelfDescribingMarshallable {
    long number;

    public Message2(long number) {
        this.number = number;
    }
}

To write to the queue you can call a proxy which implements this interface.

SingleChronicleQueue queue1 = ChronicleQueue.singleBuilder(path).build();

MessageListener writer1 = queue1.acquireAppender().methodWriter(MessageListener.class);

// call method on the interface to send messages
writer1.method1(new Message1("hello"));
writer1.method2(new Message2(234));

These calls produce messages which can be dumped as follows.

# position: 262568, header: 0
--- !!data #binary
method1: {
  text: hello
}
# position: 262597, header: 1
--- !!data #binary
method2: {
  number: !int 234
}

To read the messages, you can provide a reader which calls your implementation with the same calls that you made.

// a proxy which print each method called on it
MessageListener processor = ObjectUtils.printAll(MessageListener.class)
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1.createTailer().methodReader(processor);

assertTrue(reader1.readOne());
assertTrue(reader1.readOne());
assertFalse(reader1.readOne());

Running this example prints:

method1 [!Message1 {
  text: hello
}
]
method2 [!Message2 {
  number: 234
}
]

Detailed tracing of timings

Chronicle Queue supports explicit, or implicit, nanosecond resolution timing for messages as they pass end-to-end over across your system. We support using nano-time across machines, without the need for specialist hardware. To enable this, set the sourceId of the queue.

Enabling high resolution timings
ChronicleQueue out = ChronicleQueue.singleBuilder(queuePath)
        ...
        .sourceId(1)
        .build();

SidedMarketDataListener combiner = out.acquireAppender()
        .methodWriterBuilder(SidedMarketDataListener.class)
        .get();

combiner.onSidedPrice(new SidedPrice("EURUSD1", 123456789000L, Side.Sell, 1.1172, 2e6));

A timestamp is added for each read and write as it passes from service to service.

Downstream message triggered by the event above
--- !!data #binary
history: {
  sources: [
    1,
    0x426700000000 # (4)
  ]
  timings: [
    1394278797664704, # (1)
    1394278822632044, # (2)
    1394278824073475  # (3)
  ]
}
onTopOfBookPrice: {
  symbol: EURUSD1,
  timestamp: 123456789000,
  buyPrice: NaN,
  buyQuantity: 0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
  1. First write

  2. First read

  3. Write of the result of the read.

  4. What triggered this event.


Excerpt indexing in Chronicle Queue

In the following section you will find how to work with the excerpt index.

  • Finding the index at the end of a Chronicle Queue

Chronicle Queue appenders are thread-local. In fact when you ask for:

final ExcerptAppender appender = queue.acquireAppender();

the acquireAppender() uses a thread-local pool to give you an appender which will be reused to reduce object creation. As such, the method call to:

long index = appender.lastIndexAppended();

will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written to the queue, then you have to call:

queue.lastIndex()

Which will return the index of the last excerpt present in the queue (or -1 for an empty queue). Note that if the queue is being written to concurrently it’s possible the value may be an under-estimate, as subsequent entries may have been written even before it was returned.

  • The number of messages between two indexes

To count the number of messages between two indexes you can use:

((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
Note
You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system.

for more information on this see :

net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts
  • Move to a specific message and read it

The following example shows how to write 10 messages, then move to the 5th message to read it

@Test
public void read5thMessageTest() {
    try (final ChronicleQueue queue = singleBuilder(getTmpDir()).build()) {

        final ExcerptAppender appender = queue.acquireAppender();

        int i = 0;
        for (int j = 0; j < 10; j++) {

            try (DocumentContext dc = appender.writingDocument()) {
                dc.wire().write("hello").text("world " + (i++));
                long indexWritten = dc.index();
            }
        }

        // Get the current cycle
        int cycle;
        final ExcerptTailer tailer = queue.createTailer();
        try (DocumentContext documentContext = tailer.readingDocument()) {
            long index = documentContext.index();
            cycle = queue.rollCycle().toCycle(index);
        }

        long index = queue.rollCycle().toIndex(cycle, 5);
        tailer.moveToIndex(index);
        try (DocumentContext dc = tailer.readingDocument()) {
            System.out.println(dc.wire().read("hello").text());
        }
 }
}

File retention

You can add a StoreFileListener to notify you when a file is added, or no longer used. This can be used to delete files after a period of time. However, by default, files are retained forever. Our largest users have over 100 TB of data stored in queues.

Appenders and tailers are cheap as they don’t even require a TCP connection; they are just a few Java objects. The only thing each tailer retains is an index which is composed from:

  • a cycle number. For example, days since epoch, and

  • a sequence number within that cycle.

In the case of a DAILY cycle, the sequence number is 32 bits, and the index = ((long) cycle << 32) | sequenceNumber providing up to 4 billion entries per day. if more messages per day are anticipated, the XLARGE_DAILY cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number. Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.

Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space. This is much more scalable than being limited to the amount of memory space that you have. You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.


Monitoring that you have sufficient disk space

Chronicle Queue runs a background thread to watch for low disk space (see net.openhft.chronicle.threads.DiskSpaceMonitor class) as the JVM can crash when allocating a new memory mapped file if disk space becomes low enough. The disk space monitor checks (for each FileStore you are using Chronicle Queues on): that there is less than 200MB free. If so you will see:

Jvm.warn().on(getClass(), "your disk " + fileStore + " is almost full, " +
        "warning: chronicle-queue may crash if it runs out of space.");

otherwise it will check for the threshold percentage and log out this message:

Jvm.warn().on(getClass(), "your disk " + fileStore
        + " is " + diskSpaceFull + "% full, " +
        "warning: chronicle-queue may crash if it runs out of space.");

The threshold percentage is controlled by the chronicle.disk.monitor.threshold.percent system property. The default value is 0.


File handles and flushing data to the disk

As mentioned previously Chronicle Queue stores its data off-heap in a ‘.cq4’ file. So whenever you wish to append data to this file or read data into this file, chronicle queue will create a file handle . Typically, Chronicle Queue will create a new ‘.cq4’ file every day. However, this could be changed so that you can create a new file every hour, every minute or even every second.

If we create a queue file every second, we would refer to this as SECONDLY rolling. Of course, creating a new file every second is a little extreme, but it’s a good way to illustrate the following point. When using secondly rolling, If you had written 10 seconds worth of data and then you wish to read this data, chronicle would have to scan across 10 files. To reduce the creation of the file handles, chronicle queue cashes them lazily and when it comes to writing data to the queue files, care-full consideration must be taken when closing the files, because on most OS’s a close of the file, will force any data that has been appended to the file, to be flushed to disk, and if we are not careful this could stall your application.


Pretoucher and its configuration

Pretoucher is a class designed to be called from a long-lived thread. The purpose of the Pretoucher is to accelerate writing in a queue. Upon invocation of the execute() method, this object will pre-touch pages in the queue’s underlying store file, so that they are resident in the page-cache (i.e. loaded from storage) before they are required by appenders to the queue. Resources held by this object will be released when the underlying queue is closed. Alternatively, the shutdown() method can be called to close the supplied queue and release any other resources. Invocation of the execute() method after shutdown() has been called will cause an IllegalStateException to be thrown.

The Pretoucher’s configuration parameters (set via the system properties) are as follows:

  • SingleChronicleQueueExcerpts.earlyAcquireNextCycle (defaults to false): Causes the Pretoucher to create the next cycle file while the queue is still writing to the current one in order to mitigate the impact of stalls in the OS when creating new files.

Warning
earlyAcquireNextCycle is off by default and if it is going to be turned on, you should very carefully stress test before and after turning it on. Basically what you experience is related to your system.
  • SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs (defaults to 2,000 milliseconds) The pretoucher will create new cycle files this amount of time in advanced of them being written to. Effectively moves the Pretoucher’s notion of which cycle is "current" into the future by pretoucherPrerollTimeMs.

  • SingleChronicleQueueExcerpts.dontWrite (defaults to false): Tells the Pretoucher to never create cycle files that do not already exist. As opposed to the default behaviour where if the Pretoucher runs inside a cycle where no excerpts have been written, it will create the "current" cycle file. Obviously enabling this will prevent earlyAcquireNextCycle from working.

    • Pretoucher usage example

The configuration parameters of Pretoucher that were described above should be set via system properties. For example, in the following excerpt earlyAcquireNextCycle is set to true and pretoucherPrerollTimeMs to 100ms.

System.setProperty("SingleChronicleQueueExcerpts.earlyAcquireNextCycle", "true");
System.setProperty("SingleChronicleQueueExcerpts.pretoucherPrerollTimeMs", "100");

The constructor of Pretoucher takes the name of the queue that this Pretoucher is assigned to and creates a new Pretoucher. Then, by invoking the execute() method the Pretoucher starts.

// Creates the queue q1 (or q1 is a queue that already exists)
try(final SingleChronicleQueue q1 = SingleChronicleQueueBuilder.binary("queue-storage-path").build();

    final Pretoucher pretouch = PretouchUtil.INSTANCE.createPretoucher(q1)){
    try {
        pretouch.execute();

    } catch (InvalidEventHandlerException e) {
        throw Jvm.rethrow(e);
    }
}

The method close(), closes the Pretoucher and releases its resources.

pretouch.close();
Note
The Pretoucher is an Enterprise feature

Performance and Benchmarking

Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).

Latency Test for Chronicle Queue Replication

The following charts show how long it takes to:

  • write a 40 byte message to a Chronicle Queue

  • have the write replicated over TCP

  • have the second copy acknowledge receipt of the message

  • have a thread read the acknowledged message

The test was run for ten minutes, and the distribution of latencies plotted.

Latency to 993

Note
There is a step in latency at around 10 million message per second; it jumps as the messages start to batch. At rates below this, each message can be sent individually.

The 99.99 percentile and above are believed to be delays in passing the message over TCP. Further research is needed to prove this. These delays are similar, regardless of the throughput. The 99.9 percentile and 99.93 percentile are a function of how quickly the system can recover after a delay. The higher the throughput, the less headroom the system has to recover from a delay.

Latency from 993

Double-buffering for contended writes

When double-buffering is disabled, all writes to the queue will be serialized based on the write lock acquisition. Each time ExcerptAppender.writingDocument() is called, appender tries to acquire the write lock on the queue, and if it fails to do so it blocks until write lock is unlocked, and in turn locks the queue for itself.

When double-buffering is enabled, if appender sees that the write lock is acquired upon call to ExcerptAppender.writingDocument() call, it returns immediately with a context pointing to the secondary buffer, and essentially defers lock acquisition until the context.close() is called (normally with try-with-resources pattern it is at the end of the try block), allowing user to go ahead writing data, and then essentially doing memcpy on the serialized data (thus reducing cost of serialization). By default, double-buffering is disabled. You can enable double-buffering by calling

SingleChronicleQueueBuilder.doubleBuffer(true);
Note
During a write that is buffered, DocumentContext.index() will throw an IndexNotAvailableException. This is because it is impossible to know the index until the buffer is written back to the queue, which only happens when the DocumentContext is closed.

This is only useful if (majority of) the objects being written to the queue are big enough AND their marshalling is not straight-forward (e.g. BytesMarshallable’s marshalling is very efficient and quick and hence double-buffering will only slow things down), and if there’s a heavy contention on writes (e.g. 2 or more threads writing a lot of data to the queue at a very high rate).

  • Results:

Below are the benchmark results for various data sizes at the frequency of 10 KHz for a cumbersome message (see net.openhft.chronicle.queue.bench.QueueContendedWritesJLBHBenchmark), YMMV - always do your own benchmarks:

  • 1 KB

    • Double-buffer disabled:

      -------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:            90.40        90.59        91.17         0.42
      90:           179.52       180.29        97.50        36.14
      99:           187.33       186.69       186.82         0.05
      99.7:         213.57       198.72       217.28         5.86
      ------------------------------------------------------------------------------------------------------------------
      -------------------------------- SUMMARY (Concurrent2) -----------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:           179.14       179.26       180.93         0.62
      90:           183.49       183.36       185.92         0.92
      99:           192.19       190.02       215.49         8.20
      99.7:         240.70       228.16       258.88         8.24
      ------------------------------------------------------------------------------------------------------------------
    • Double-buffer enabled:

      -------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:            86.05        85.60        86.24         0.50
      90:           170.18       169.79       170.30         0.20
      99:           176.83       176.58       177.09         0.19
      99.7:         183.36       185.92       183.49         0.88
      ------------------------------------------------------------------------------------------------------------------
      -------------------------------- SUMMARY (Concurrent2) -----------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:            86.24        85.98        86.11         0.10
      90:            89.89        89.44        89.63         0.14
      99:           169.66       169.79       170.05         0.10
      99.7:         175.42       176.32       176.45         0.05
      ------------------------------------------------------------------------------------------------------------------
  • 4 KB

    • Double-buffer disabled:

      -------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:           691.46       699.65       701.18         0.15
      90:           717.57       722.69       721.15         0.14
      99:           752.90       748.29       748.29         0.00
      99.7:        1872.38      1743.36      1780.22         1.39
      ------------------------------------------------------------------------------------------------------------------
      -------------------------------- SUMMARY (Concurrent2) -----------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:           350.59       353.66       353.41         0.05
      90:           691.46       701.18       697.60         0.34
      99:           732.42       733.95       729.34         0.42
      99.7:        1377.79      1279.49      1302.02         1.16
      ------------------------------------------------------------------------------------------------------------------
    • Double-buffer enabled:

      -------------------------------- SUMMARY (Concurrent) ------------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:           342.40       344.96       344.45         0.10
      90:           357.25       360.32       359.04         0.24
      99:           688.38       691.97       691.46         0.05
      99.7:        1376.77      1480.19      1383.94         4.43
      ------------------------------------------------------------------------------------------------------------------
      -------------------------------- SUMMARY (Concurrent2) -----------------------------------------------------------
      Percentile   run1         run2         run3      % Variation
      50:           343.68       345.47       346.24         0.15
      90:           360.06       362.11       363.14         0.19
      99:           694.02       698.62       699.14         0.05
      99.7:        1400.32      1510.91      1435.14         3.40
      ------------------------------------------------------------------------------------------------------------------

Jitter

If you wish to tune your code for ultra-low latency, you could take a similar approach to our QueueReadJitterMain

net.openhft.chronicle.queue.jitter.QueueReadJitterMain

This code can be considered as a basic stack sampler profiler. This is assuming you base your code on the net.openhft.chronicle.core.threads.EventLoop, you can periodically sample the stacks to find a stall. It is recommended to not reduce the sample rate below 50 microseconds as this will produce too much noise

It is likely to give you finer granularity than a typical profiler. As it is based on a statistical approach of where the stalls are, it takes many samples, to see which code has the highest grouping ( in other words the highest stalls ) and will output a trace that looks like the following :

28	at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012)
	at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
	at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43)
	at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90)
	at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31)
	at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source)
	at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297)
	at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246)

25	at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:58)
	at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:748)

21	at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1027)
	at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
	at net.openhft.chronicle.core.util.WeakReferenceCleaner.newCleaner(WeakReferenceCleaner.java:43)
	at net.openhft.chronicle.bytes.NativeBytesStore.<init>(NativeBytesStore.java:90)
	at net.openhft.chronicle.bytes.MappedBytesStore.<init>(MappedBytesStore.java:31)
	at net.openhft.chronicle.bytes.MappedFile$$Lambda$4/1732398722.create(Unknown Source)
	at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:297)
	at net.openhft.chronicle.bytes.MappedFile.acquireByteStore(MappedFile.java:246)

14	at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain.lambda$main$1(QueueWriteJitterMain.java:54)
	at net.openhft.chronicle.queue.jitter.QueueWriteJitterMain$$Lambda$11/967627249.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:748)

from this, we can see that most of the samples (on this occasion 28 of them ) were captured in ConcurrentHashMap.putVal() if we wish to get finer grain granularity, we will often add a net.openhft.chronicle.core.Jvm.safepoint into the code because thread dumps are only reported at safe-points.

  • Results:

In the test described above, the typical latency varied between 14 and 40 microseconds. The 99 percentile varied between 17 and 56 microseconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 microseconds and 41 milliseconds, a factor of 2000.

Table 3. Possible throughput results depending on acceptable latencies

Acceptable Latency

Throughput

< 30 microseconds 99.3% of the time

7 million message per second

< 20 microseconds 99.9% of the time

20 million messages per second

< 1 milliseconds 99.9% of the time

50 million messages per second

< 60 microseconds 99.3% of the time

80 million message per second

Chronicle Queue vs Kafka

Chronicle Queue is designed to out-perform its rivals such as Kafka. Chronicle Queue supports over an order-of-magnitude of greater throughput, together with an order-of-magnitude of lower latency, than Apache Kafka. While Kafka is faster than many of the alternatives, it doesn’t match Chronicle Queue’s ability to support throughputs of over a million events per second, while simultaneously achieving latencies of 1 to 20 microseconds.

Chronicle Queue handles more volume from a single thread to a single partition. This avoids the need for the complexity, and the downsides, of having partitions.

Kafka uses an intermediate broker to use the operating system’s file system and cache, while Chronicle Queue directly uses the operating system’s file system and cache. For comparison see Kafka Documentation

More Information and Support

  • Big Data and Chronicle Queue - a detailed description of some techniques utilised by Chronicle Queue

  • Encryption - describes how to encrypt the contents of a Queue

  • FAQ - questions asked by customers

  • How it works - more depth on how Chronicle Queue is implemented

  • Timezone rollover - describes how to configure file-rolling at a specific time in a given time-zone

  • Utilities - lists some useful utilities for working with queue files

Chronicle Software Release Notes

Leave your e-mail to get information about the latest releases and patches to stay up-to-date.

chronicle-queue's People

Contributors

alamar avatar am7c7 avatar apete avatar ashelleyhft avatar catherine-lawrey avatar chroniclekevinpowe avatar danielshaya avatar dependabot[bot] avatar dpisklov avatar emmachronicle avatar epickrram avatar glukos avatar hft-team-city avatar j4sm1ne96 avatar jansturenielsen avatar jerryshea avatar jozsefbartokhft avatar lburgazzoli avatar minborg avatar neil-clifford avatar nicktindall avatar nickward avatar peter-lawrey avatar pisfly avatar robaustin avatar scottkidder avatar sergebg avatar tgd avatar tomshercliff avatar yevgenp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

chronicle-queue's Issues

DELAY constant value in MicroJitterSampler

Hi Peter,

I was checking the code of MicroJitterSampler and noticed that one of DELAY constant value has a multiplier of 10000, whereas every other entry is 1000.
Seems like a typo to me.

static final long[] DELAY = {
        ...
        ...
        ...
        2 * 1000 * 1000,
        5 * 1000 * 1000,
        10 * 1000 * 10000,
        20 * 1000 * 1000,
        50 * 1000 * 1000,
        ...

};

Regards,

Amitoj.

Ps. - It was nice to see your presentation at JAXLondon.

jvm Crash via segmentation fault in vanilla chronicle in release 3.2.1

Here is StackTrace for same:

Stack: [0x00007fc71c8f9000,0x00007fc71c9fa000], sp=0x00007fc71c9f7d60, free space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
V [libjvm.so+0x96c47d] Unsafe_SetOrderedInt+0xad
J sun.misc.Unsafe.putOrderedInt(Ljava/lang/Object;JI)V
j net.openhft.chronicle.LoggerChronicle$LogAppenderImpl.finish()V+18
j net.openhft.lang.io.AbstractBytes.close()V+8
j net.openhft.chronicle.LoggerChronicle$AbstractLoggerExcerpt.close()V+39
j net.openhft.chronicle.LoggerChronicle$LogAppenderImpl.close()V+1
j net.openhft.chronicle.tools.WrappedExcerpt.close()V+4
j net.openhft.chronicle.LoggerChronicle$LoggerCheckedAppender.finalize()V+1
v ~StubRoutines::call_stub
V [libjvm.so+0x5f8485] JavaCalls::call_helper(JavaValue_, methodHandle_, JavaCallArguments_, Thread_)+0x365
V [libjvm.so+0x5f6ee8] JavaCalls::call(JavaValue_, methodHandle, JavaCallArguments_, Thread_)+0x28
V [libjvm.so+0x630ba4] jni_invoke_nonstatic(JNIEnv__, JavaValue_, jobject, JNICallType, jmethodID, JNI_ArgumentPusher_, Thread_)+0x2b4
V [libjvm.so+0x63ff31] jni_CallVoidMethod+0x171
j java.lang.ref.Finalizer.invokeFinalizeMethod(Ljava/lang/Object;)V+0
j java.lang.ref.Finalizer.runFinalizer()V+45
j java.lang.ref.Finalizer.access$100(Ljava/lang/ref/Finalizer;)V+1
j java.lang.ref.Finalizer$FinalizerThread.run()V+24
v ~StubRoutines::call_stub
V [libjvm.so+0x5f8485] JavaCalls::call_helper(JavaValue_, methodHandle_, JavaCallArguments_, Thread_)+0x365
V [libjvm.so+0x5f6ee8] JavaCalls::call(JavaValue_, methodHandle, JavaCallArguments_, Thread_)+0x28
V [libjvm.so+0x5f71b7] JavaCalls::call_virtual(JavaValue_, KlassHandle, Symbol_, Symbol_, JavaCallArguments_, Thread_)+0x197
V [libjvm.so+0x5f72d7] JavaCalls::call_virtual(JavaValue_, Handle, KlassHandle, Symbol_, Symbol_, Thread_)+0x47
V [libjvm.so+0x6731e5] thread_entry(JavaThread_, Thread_)+0xe5
V [libjvm.so+0x94d38f] JavaThread::thread_main_inner()+0xdf
V [libjvm.so+0x94d495] JavaThread::run()+0xf5
V [libjvm.so+0x815288] java_start(Thread_)+0x108

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J sun.misc.Unsafe.putOrderedInt(Ljava/lang/Object;JI)V
j net.openhft.chronicle.LoggerChronicle$LogAppenderImpl.finish()V+18
j net.openhft.lang.io.AbstractBytes.close()V+8
j net.openhft.chronicle.LoggerChronicle$AbstractLoggerExcerpt.close()V+39
j net.openhft.chronicle.LoggerChronicle$LogAppenderImpl.close()V+1
j net.openhft.chronicle.tools.WrappedExcerpt.close()V+4
j net.openhft.chronicle.LoggerChronicle$LoggerCheckedAppender.finalize()V+1
v ~StubRoutines::call_stub
j java.lang.ref.Finalizer.invokeFinalizeMethod(Ljava/lang/Object;)V+0
j java.lang.ref.Finalizer.runFinalizer()V+45
j java.lang.ref.Finalizer.access$100(Ljava/lang/ref/Finalizer;)V+1
j java.lang.ref.Finalizer$FinalizerThread.run()V+24
v ~StubRoutines::call_stub

LoggerChronicle is replica of VanillaChronicle replacing length of excerpt from vanilla chronicle by '\n' in appender as only change.

ExcerptTailer: boolean index(long) method should be removed

Hi,
I create a Chronicle on an already existing file. This file already contains data (some has been read already). I create a ExcerptTailer, then set the index to where I left the last time (which is not the start of the file).
I must say that the sum of all data written exceeds the size of a datablock.
When reading, it goes well for a moment and suddenly either:

  • segmentation fault
  • the read data is bigger than the one I expected (the end is actually the data I expected - but prefixed with what comes before in the file)

Using an Excerpt instead of ExcerptTailer solves the problems.
The presence of "index(long)" made me think that I could at least place myself somewhere in the file, then do only reading. If tailer is only for reading sequentially with no random access, then I guess the method should be removed.

Regards,
John

NPE in RollingChronicle

Doing the following triggers a NPE.

ChronicleConfig CONFIG = ChronicleConfig.TEST.clone();
RollingChronicle rollingChronicle = new RollingChronicle(basePath, CONFIG);
rollingChronicle.createAppender().index();

ExcerptTailer.read(ByteBuffer) fails

When writing a simple ByteBuffer out and attempting to read it back via ExcerptTailer.read, the ByteBuffer is never read.

String path = new File(System.getProperty("java.io.tmpdir"), "foo").getAbsolutePath() + "/" + UUID.randomUUID().toString();
try (IndexedChronicle chronicle = new IndexedChronicle(path, ChronicleConfig.TEST)) {
  ExcerptAppender appender = chronicle.createAppender();
  appender.startExcerpt(100);
  ByteBuffer entry = ByteBuffer.wrap("1".getBytes());
  appender.write(entry);
  appender.finish();

  ExcerptTailer tailer = chronicle.createTailer();
  ByteBuffer input = ByteBuffer.allocate(1);
  tailer.read(input);

  // Fails
  assert input.equals(entry);
}

This appears to be because ExcerptTailer.remaining() is 0, which causes the read to do nothing.

Data file corruption in VanillaChronicle

Hi,

I have discovered an issue that causes corruption in VanillaChronicle.
Occasionally an existing data file is overwritten, so that earlier indexes that refer to the existing data file become invalid (length value is corrupt).

I believe that I have tracked the issue down to use of VanillaDataCache in VanillaChronicle. dataCache is an instance in VanillaChronicle, but it is shared across all appender instances. This means that VanillaDataCache.dataForLast can be invoked by multiple concurrent threads, resulting in inconsistent values for lastCount.

I will attempt to create a test case that illustrates the problem.

Thanks,
Ross

Leaking threads on FreeBSD / OpenJDK7

Running Chronicle maven tests on FreeBSD 10 with OpenJDK 7 end with the first test passing and the remaining tests not finishing.

InProcessChronicleSource opens an Acceptor thread which loops and calls accept() on the socket. However, this thread is never closed even when close() is called on the socket. The result is the thread stays open and bound to the socket, waiting for an inbound connection.

Later tests then fail as they cannot bind to the socket. (Since it's still bound to the previous test).

VanillaChronicleSourceTest and VanillaChronicleTest fails on Windows 8.1

I was trying to to install/build the entire OpenHFT suite with maven. Everythings worked fine (Java Affinity, Java Compiler, Java Lang) until the Java Chronicle tests. There I got the errors below. Is this an issue on my side? When I ignore those tests then the install process completes with out any errors.

I'm running a Windows 8.1 64bit System with Java JDK 1.7.0_55 64bit.


Test set: net.openhft.chronicle.VanillaChronicleSourceTest

Tests run: 5, Failures: 3, Errors: 0, Skipped: 0, Time elapsed: 11.287 sec <<< FAILURE! - in net.openhft.chronicle.VanillaChronicleSourceTest
testSourceSinkStartResumeRollingEverySecond(net.openhft.chronicle.VanillaChronicleSourceTest) Time elapsed: 11.005 sec <<< FAILURE!
java.lang.AssertionError: expected:<1000000000> but was:<1000000050>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at org.junit.Assert.assertEquals(Assert.java:542)
at net.openhft.chronicle.VanillaChronicleSourceTest.testSourceSinkStartResumeRollingEverySecond(VanillaChronicleSourceTest.java:254)

testReplicationWithRolling(net.openhft.chronicle.VanillaChronicleSourceTest) Time elapsed: 0.018 sec <<< FAILURE!
java.lang.AssertionError: i: 0 expected:<1000000000> but was:<1000000499>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at net.openhft.chronicle.VanillaChronicleSourceTest.testReplicationWithRolling(VanillaChronicleSourceTest.java:133)

testReplicationWithRolling2(net.openhft.chronicle.VanillaChronicleSourceTest) Time elapsed: 0.11 sec <<< FAILURE!
java.lang.AssertionError: i: 0 expected:<1000000000> but was:<1000000090>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at net.openhft.chronicle.VanillaChronicleSourceTest.testReplicationWithRolling2(VanillaChronicleSourceTest.java:173)


Test set: net.openhft.chronicle.VanillaChronicleTest

Tests run: 16, Failures: 2, Errors: 1, Skipped: 1, Time elapsed: 31.166 sec <<< FAILURE! - in net.openhft.chronicle.VanillaChronicleTest
testReplicationWithRollingFilesEverySecond2(net.openhft.chronicle.VanillaChronicleTest) Time elapsed: 20.034 sec <<< FAILURE!
java.lang.AssertionError: i: 1 expected:<1000000001> but was:<1000000000>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at net.openhft.chronicle.VanillaChronicleTest.testReplicationWithRollingFilesEverySecond2(VanillaChronicleTest.java:620)

testReplicationWithRollingFilesEverySecond(net.openhft.chronicle.VanillaChronicleTest) Time elapsed: 0.008 sec <<< FAILURE!
java.lang.AssertionError: major jumped expected:<1401730659> but was:<1401730685>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at net.openhft.chronicle.VanillaChronicleTest.testReplicationWithRollingFilesEverySecond(VanillaChronicleTest.java:576)

testTailerPerf(net.openhft.chronicle.VanillaChronicleTest) Time elapsed: 0.027 sec <<< ERROR!
java.nio.BufferUnderflowException: null
at net.openhft.lang.io.AbstractBytes.returnOrThrowEndOfBuffer(AbstractBytes.java:245)
at net.openhft.lang.io.AbstractBytes.readByteOrThrow(AbstractBytes.java:240)
at net.openhft.lang.io.AbstractBytes.readUnsignedByteOrThrow(AbstractBytes.java:236)
at net.openhft.lang.io.AbstractBytes.parseLong(AbstractBytes.java:1419)
at net.openhft.chronicle.VanillaChronicleTest.testTailerPerf(VanillaChronicleTest.java:218)

Support for OpenJDK 6 and old Sun Java 6 versions

Some JDKs don't have the Unsafe.copyMemory method. For example I get this exception on FreeBSD with OpenJDK 6:

java.lang.NoSuchMethodError: sun.misc.Unsafe.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V
at net.openhft.lang.io.NativeBytes.write(NativeBytes.java:227) ~[bench-1.0-SNAPSHOT.jar:na]
at net.openhft.lang.io.AbstractBytes.write(AbstractBytes.java:626) ~[bench-1.0-SNAPSHOT.jar:na]

$ java -version
openjdk version "1.6.0"
OpenJDK Runtime Environment (build 1.6.0-b23)
OpenJDK 64-Bit Server VM (build 20.0-b11, mixed mode)

Java-Chronicle should provide a work around for such cases.

Should chronicle-queue remove ChronicleConfig ?

Hi. when I updated to the 3.3 snapshot for testing, I found the ChronicleConfig class in package of net.openhft.chronicle had been removed and located at chronicle-sandbox (some slice difference by comparison) what's more those could make backward compatibility with other lib integrated with the former chronicle release editions such spring reactor chronicle module (and others) . so I had to go back to the former edition like 3.2.5 which I could not benefit from the latest upgrade and so that the
other integration framework has to change their code to satisfy the change.

so maybe keep the normal class in net.openhft.chronicle package as the former one should be some good idea which would not induce many changes.

How To Remove an arbitrary Excerpt with the given index

Dear Team

Is there a way to permanently remove (delete) some arbitrary Excerpt, perhaps identified by their indices, without actually deleting the entire chronicle?

If yes, then please provide a simple demo application to do that.

If no, then can we implement it? Maybe I can contribute if you give me an overview / idea...

Thanks.

Using small block sizes

Hi

Fantastic library, btw. Seems like there might be an issue when writing more data than the data block size:

        ChronicleConfig cfg = ChronicleConfig.DEFAULT.clone();
        cfg.dataBlockSize( 1 * 1024 ); // works with 256 * 1024
        IndexedChronicle ic = new IndexedChronicle("/tmp/file", cfg  );

        ExcerptAppender ap = ic.createAppender();
        for(int i = 0; i < 10000; i++) {
            ap.startExcerpt(16);
            ap.writeLong(i); ap.writeLong(i);
            ap.finish();
        }

        System.out.println(ic.size()); // expecting 10000, getting more

        Excerpt exc = ic.createExcerpt();
        exc.index(5000); System.out.println( exc.readLong() ); // expecting 5000

Increasing the data block size to 256K seems to solve the problem.

Regards

IndexedChronicle TCP replication not working

Below is the code for Master.java and Slave.java. As you can see master just creates an IndexedChronicle wrapped by ChronicleSource and starts publishing some data. Slave creates its separate IndexedChronicle wrapped by ChronicleSink and starts listening for data. Everything works fine until you stop the slave after it has received some data and then start it again ... in such scenario the slave is not able to reconnect and continue with message processing because it seems the initial index check is always shifted by one ... please see the exception below:

2014-11-18 18:13:41.700 [INFO] - Connected to localhost/127.0.0.1:7777
2014-11-18 18:13:41.716 [INFO] - Lost connection to localhost/127.0.0.1:7777 retrying
java.io.StreamCorruptedException: Expected index 41 but got 40
at net.openhft.chronicle.tcp.ChronicleSink$PersistentIndexedSinkExcerpt.readNextExcerpt(ChronicleSink.java:491)
at net.openhft.chronicle.tcp.ChronicleSink$PersistentIndexedSinkExcerpt.readNextExcerpt(ChronicleSink.java:484)
at net.openhft.chronicle.tcp.ChronicleSink$AbstractPersistentSinkExcerpt.readNext(ChronicleSink.java:391)
at net.openhft.chronicle.tcp.ChronicleSink$AbstractPersistentSinkExcerpt.nextIndex(ChronicleSink.java:346)
at org.dett.tests.chronicle.Slave$Reader.run(Slave.java:31)
at java.lang.Thread.run(Thread.java:745)

Steps to reproduce:

  1. start master
  2. start slave
  3. hit enter in master process to start publishing data
  4. stop/kill slave
  5. start slave again

If we change IndexedChronicle to VanillaChronicle everything seems to work as expected.

I'm testing on Windows 8.1 x64 ... tried both last published version to maven central (3.2.5) and current snapshot (3.2.6-SNAPSHOT).

public class Master {

    public static void main(String[] args) throws IOException, InterruptedException {
        final Chronicle source = new ChronicleSource(new IndexedChronicle("./master"), 7777);                                       
        ExcerptAppender excerpt = source.createAppender();
        ConsoleUtils.waitForEnter("Hit <enter> to start sending ...");
        for (int i = 1; i <= 1000; i++) {
            // use a size which will cause mis-alignment.
            excerpt.startExcerpt();
            excerpt.writeLong(i);
            excerpt.append(' ');
            excerpt.append(i);
            excerpt.append('\n');
            excerpt.finish();
            Thread.sleep(100);                       
        }
        System.out.println("Finished writing messages ...");            
        ConsoleUtils.waitForEnter("Hit <enter> to close source ...");        
        source.close();
    }
}

public class Slave {

    private static final int PORT = 7777;   
    private static final AtomicBoolean reading = new AtomicBoolean(true);

    public static void main(String[] args) throws IOException, InterruptedException {                                                         
        Thread t = new Thread(new Reader());
        t.start();      
        ConsoleUtils.waitForEnter("Hit <enter> to stop reading ...");       
        reading.set(false);
        ConsoleUtils.waitForEnter("Hit <enter> to terminate ...");                     
    }

    private static class Reader implements Runnable {           
        @Override
        public void run() {          
            try {
                Chronicle sink = new ChronicleSink(new IndexedChronicle("./slave"), "localhost", PORT);                 
                ExcerptTailer excerpt = sink.createTailer();                
                while (reading.get()) {                 
                    while (!excerpt.nextIndex()) { }                    
                    long n = excerpt.readLong();
                    System.out.println(String.format("Message %s received", n));
                    excerpt.finish();                   
                }
                System.out.println("Reading stopped ...");
                sink.close();
            } catch (IOException e) {               
                e.printStackTrace();
            }
        }
    };
}

VanillaChronicle JVM Crash in 3.3.4

Seems this may have been the first write after midnight..

RAX=0x000000065a97e950 is an oop
net.openhft.lang.io.VanillaMappedBytes
klass: 'net/openhft/lang/io/VanillaMappedBytes'
RBX=0x000000000000001f is an unknown value
RCX=0x0000000000000020 is an unknown value
RDX=0x0000000672c7f6d0 is an oop
java.util.LinkedHashMap$Entry
klass: 'java/util/LinkedHashMap$Entry'
RSP=0x00007f20585dbf40 is pointing into the stack for thread: 0x00007f2065f2f800
RBP=0x00000000ce4f7a02 is an unknown value
RSI=0x00000006727bd010 is an oop
net.openhft.chronicle.VanillaChronicle$VanillaAppenderImpl
klass: 'net/openhft/chronicle/VanillaChronicle$VanillaAppenderImpl'
RDI=0x0000000000000016 is an unknown value
R8 =0x00007f1fe0000030 is an unknown value
R9 =0x0000000000000007 is an unknown value
R10=0x00007f1fe000000c is an unknown value
R11=0x00007f1fe000000c is an unknown value
R12=0x0000000000000000 is an unknown value
R13=0x0000000000000000 is an unknown value
R14=0x0000000000000049 is an unknown value
R15=0x00007f2065f2f800 is a thread

Stack: [0x00007f20584dd000,0x00007f20585de000], sp=0x00007f20585dbf40, free space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
J 5553 C2 Writer.put([BIIB)V (109 bytes) @ 0x00007f212152cb9f [0x00007f212152cae0+0xbf]
v ~StubRoutines::call_stub
V [libjvm.so+0x600085] JavaCalls::call_helper(JavaValue_, methodHandle_, JavaCallArguments_, Thread_)+0x365
V [libjvm.so+0x5feae8] JavaCalls::call(JavaValue_, methodHandle, JavaCallArguments_, Thread_)+0x28
V [libjvm.so+0x639564] jni_invoke_nonstatic(JNIEnv__, JavaValue_, jobject, JNICallType, jmethodID, JNI_ArgumentPusher_, Thread*)+0x2b4
V [libjvm.so+0x648481] jni_CallVoidMethod+0x171

vm_info: Java HotSpot(TM) 64-Bit Server VM (24.71-b01) for linux-amd64 JRE (1.7.0_71-b14), built on Sep 26 2014 16:41:40 by "java_re" with gcc 4.3.0 20080428 (Red Hat 4.3.0-8)

time: Sat Jan 24 00:00:47 2015

File Descriptors not closing

I found this bug while using vanilla chronicle v3.0.1 as well as 3.1.1, I have a long running process and after rolling over Chronicle is still keeping reference of old file descriptors and disk becomes full (df shows 100% disk usage and lsof shows old files being referred from the process). The restart of process brings disk usage back to normal. Can some one point me in a direction how to fix this issue?

Support for reading/writing nio's ByteBuffer

I am using Netty to receive data which I want to directly dump into a journal. I would be great if Java-Chronicle can do zero-copy writing. For this Java-Chronicle would need to support writing (and preferably also reading) nio's ByteBuffer directly.

BufferUnderflowException in Getting Started

I am trying to determine if the following wiki page is out of date or is exposing a bug in the system:

https://github.com/OpenHFT/Chronicle-Queue/blob/master/docs/HowItWorks.md#getting-started

Using Windows 8.1, Java 8u25 and Chronicle Queue 3.3.3 with no modifications other than adding a package declaration, I get the following:

Exception in thread "main" java.lang.IllegalStateException: java.nio.BufferUnderflowException
    at net.openhft.lang.io.AbstractBytes.readObject(AbstractBytes.java:1990)
    at com.test.ExampleCacheMain.main(ExampleCacheMain.java:26)
Caused by: java.nio.BufferUnderflowException
    at net.openhft.lang.io.AbstractBytes.returnOrThrowEndOfBuffer(AbstractBytes.java:195)
    at net.openhft.lang.io.AbstractBytes.readByteOrThrow(AbstractBytes.java:439)
    at net.openhft.lang.io.AbstractBytes.readUnsignedByteOrThrow(AbstractBytes.java:433)
    at net.openhft.lang.io.serialization.BytesMarshallableSerializer.readSerializable(BytesMarshallableSerializer.java:123)
    at net.openhft.lang.io.AbstractBytes.readObject(AbstractBytes.java:1988)
    ... 1 more

Modifying to use readUTF and writeUTF, I get:

Exception in thread "main" java.nio.BufferUnderflowException
    at net.openhft.lang.io.AbstractBytes.returnOrThrowEndOfBuffer(AbstractBytes.java:195)
    at net.openhft.lang.io.AbstractBytes.readByteOrThrow(AbstractBytes.java:439)
    at net.openhft.lang.io.AbstractBytes.readUnsignedByteOrThrow(AbstractBytes.java:433)
    at net.openhft.lang.io.AbstractBytes.readUTF0(AbstractBytes.java:202)
    at net.openhft.lang.io.AbstractBytes.readUTF(AbstractBytes.java:698)
    at com.test.ExampleCacheMain.main(ExampleCacheMain.java:26)

IndexedChronicleTest testWasPadding fails on Windows

I've cloned master (f2da0be) and I'm trying to run mvn clean package on Windows 7. With both Java 6 and Java 7 the build fails with the error:

testWasPadding(net.openhft.chronicle.IndexedChronicleTest)  Time elapsed: 0.047 sec  <<< FAILURE!
java.lang.AssertionError: null
        at net.openhft.chronicle.NativeExcerptAppender.writePaddedEntry(NativeExcerptAppender.java:99)
        at net.openhft.chronicle.NativeExcerptAppender.startExcerpt(NativeExcerptAppender.java:54)
        at net.openhft.chronicle.IndexedChronicleTest.testWasPadding(IndexedChronicleTest.java:103)

feature request: wind to the first entry

I wrote the following code to test how to check if an excerpt has more data without altering it:

public class IndexedChronicleMain {
    private static final Logger LOGGER   = LoggerFactory.getLogger(IndexedChronicleMain.class);
    private static final String BASEPATH = "./data/chronicle";

    // *************************************************************************
    //
    // *************************************************************************

    private static void write(int data) throws Exception {
        IndexedChronicle ic = new IndexedChronicle(BASEPATH);
        ExcerptAppender ex = ic.createAppender();
        ex.startExcerpt(4);
        ex.writeInt(data);
        ex.finish();

        LOGGER.debug("write.index = {}",ex.index());
    }

    public static void read() throws Exception {
        IndexedChronicle ic = new IndexedChronicle(BASEPATH);
        Excerpt ex = ic.createExcerpt();

        hasNext(ex);
        hasNext(ex);
        hasNext(ex);
    }

    public static boolean hasNext(Excerpt ex) throws Exception {
        long    index   = ex.index();
        boolean hasNext = ex.nextIndex();
        ex.index(index);

        LOGGER.debug("index : hasNext={},before={},after={}",hasNext,index,ex.index());

        return hasNext;
    }

    // *************************************************************************
    //
    // *************************************************************************

    public static void main(String[] args) {
        try {
            write(0);
            write(1);
            write(2);

            read();

            ChronicleTools.deleteOnExit(BASEPATH);

        } catch(Exception e) {
            LOGGER.warn("Main Exception", e);
        }
    }
}

Here the results:

13:26:10.099|DEBUG|azzoli.test.reactor.IndexedChronicleMain| write.index = 1
13:26:10.101|DEBUG|azzoli.test.reactor.IndexedChronicleMain| write.index = 2
13:26:10.105|DEBUG|azzoli.test.reactor.IndexedChronicleMain| write.index = 3
13:26:10.107|DEBUG|azzoli.test.reactor.IndexedChronicleMain| index : hasNext=true,before=-1,after=0
13:26:10.108|DEBUG|azzoli.test.reactor.IndexedChronicleMain| index : hasNext=true,before=0,after=0
13:26:10.108|DEBUG|azzoli.test.reactor.IndexedChronicleMain| index : hasNext=true,before=0,after=0

I expected to see
index : hasNext=true,before=-1,after=-1
instead of
index : hasNext=true,before=-1,after=0

The second invalidate the hasNext method as it alters the state of the excerpt.
The problem is that the excerpt tell me that -1 is not a valid index.

Being able to set the excerpt index to -1 would also be useful to wind to the first entry.

OSGi support

chronicle/pom.xml lacks bundle thus the generated jar is not OSGi ready

net.openhft.lang.Jvm#PID_BITS returns incorrect value for Linux

VanillaChronicle breaks because the incorrect number of bits is returned for the pid_max in VanillaChronicleConfig#THREAD_ID_BITS (if pid_max is not a power of 2)

In our system, /proc/sys/kernel/pid_max = 4000000
which results in PID_BITS=21.
The max value for 21 bits is 2097152.
The correct value should be 22 bits to allow for the full range of process ids.

An easy workaround is to specify -Dos.max.pid.bits=22 in the parameters when launching the Java process.

Document file format and protocol

For interoperability purposes (e.g. having non-java producers or consumers, and/or porting to another language like C/C++) it would be great to have more documentation on what the file format is and what the protocols are for the producer, consumers, and replicator. More specifically:

  1. What is the format of the files (index, data)
  2. How are Chronicles initialized
  3. How is a new Excerpt written
  4. How does a consumer read an Excerpt
  5. How does a consumer wait for new Excerpts
  6. Documenting signaling between producers / consumers (if any)
  7. How does replication work

I would be willing to write this document up based on my investigations but I'd need help from Peter.

Designing concurrent IndexedChronicle

The current implementation of Chronicle has a single writer multiple reader architecture. ( I think we can create multiple readers without any problem )

I am trying to enhance IndexedChronicle in a way that multiple writers will be available... Current implementation fits messaging queue requirements but in order to use chronicle as a baseline for Cache implementation, concurrency should be provided...

Lets discuss some design issues and try to make a draft architectural design.. I will coding it..

Current implementation has one write head and this write head advances at each write further..

I think IndexedChronicle will be a base for such implementation therefore I will talking over it..

We have indexCache and dataCache at the lowest layer. When we try to put some data to indexed chronicle, data is written to dataCache, and address of the data is written to indexCache.

Appender does not look for suitable cell before put operation because it always points to the first available place in dataCache..

indexCache and dataCache has a blocks of configurable size so when required a new blocked is mapped from file to memory...

What if we want to write with two threads concurrently.. The basic solution should be defining a write lock and each appender acquires lock before writing but this will decrease the performance and blocks waiting threads...

We need more than one lock.. We may put locks for each dataBlock and if a writer wants to write to a block first it should get the lock.. Because we have several blocks, several threads can write different blocks at the same time.. This will increase random update/write performance.. But decrease sequential write of multiple threads... ( acquiring lock will bring some overhead )

But the problem of this approach is we don't have much blocks in early times of the cache.. There will be a warm up period where cache will have more performance..

The other approach would be slicing blocks into several concurrency blocks and creating locks for each concurency blocks... This will be a better solution because in that case we don't need to wait chronicle to become bigger to get high performance... Trade of is the space for locks.. They will be preserved in heap and the more locks we have the more heap we use...

These ideas are just questions in my mind.. Managing indexCache is another issue and I think we shouldn't consider it separately..

Please write your comments, meanwhile I am looking for more accurate solutions..

regards.

native linked list support

would it be possible to add linked list support as part of native chronicle api?

i.e. when there are interleaved market streams in the same chronicle queue
and the user wants to replay only single market-id w/o stepping through every index entry

use case seems common enough, with market-id type=long

Source-Sink issues

Hi Peter,

Indeed, reading from a sink and writing to a local chronicle on a blocking thread, and then reading from the local chronicle (non blocking) in a tight loop seems a good idea.

However, I am having trouble implementing it: it seems that in certain circumstances VolatileExcerptTailer.nextIndex() is reading two records when only one is published on its port (the second record may come from another sink on a different port?).

I have built a simplified test case that exemplifies this issue (see the attached file). The application consists of three processes: P1, P2 and P3. One needs to start P2 and P3 first in any order and then P1. The flow is as follows:

(1) in a loop, P1 writes a record to chr1 and waits to read a record from chr5.
(2) P2 reads a record from chr1 and writes it to a source chr2 on port2. On a separate thread, a NonBlockingSink reads from a source on port5 and writes to chr5.
(3) in its NonBlockingSink, on a separate thread, P3 reads from a source on port3 (same as port2) and writes to chr3. On its main thread, in a tight loop, P3 reads from chr3 and writes to source chr4 on port4 (same as port5).

P1, P2, and P3 are all running on the same machine, but the intent is to have P1 and P2 on one host and P3 on a different host (that's why P2 and P3 communicate via source/sink pairs).

P1 is supposed to send one record, wait until it reads it coming back, send another record and so on, for a total of 10 records. However, it receives only 1 record and then blocks.

I added some tracing to ChronicleSink.VolatileExcerptTailer.nextIndex() (see below) and I get the following trace in P3 (but a similar trace is obtained in P2):

excerptSize=20 Zamolxis/127.0.0.1:15311
receivedIndex=0 Zamolxis/127.0.0.1:15311 Thread-1
positionAddr=188696728
reading from sink 15311
read from sink 15311 { long=1000 string=foo bar int=0 }
wrote to local { long=1000 string=foo bar int=0 }
excerptSize=1000 Zamolxis/127.0.0.1:15311
receivedIndex=8029748840875687936 Zamolxis/127.0.0.1:15311 Thread-1

Notice the second excerptSize=1000 (I only send small messages) and receivedIndex=8029748840875687936 (while the first index was 0).

Am I doing something wrong? I am using version 3.2.6-SNAPSHOT of chronicle.

Thanks,
Vladimir

            int excerptSize = buffer.getInt();
            long receivedIndex = buffer.getLong();

            System.out.println("excerptSize="+excerptSize+" "+address);

            switch (excerptSize) {
                case ChronicleTcp.IN_SYNC_LEN:
                case ChronicleTcp.PADDED_LEN:
                case ChronicleTcp.SYNC_IDX_LEN:
                    return false;
            }

            System.out.println("receivedIndex="+receivedIndex+" "+address+" "+Thread.currentThread().getName());

            if (excerptSize > 128 << 20 || excerptSize < 0) {
                throw new StreamCorruptedException("Size was " + excerptSize);
            }

            if(buffer.remaining() < excerptSize) {
                if(!connector.read(excerptSize)) {
                    return false;
                }
            }

            index = receivedIndex;
            positionAddr = startAddr + buffer.position();
            limitAddr = positionAddr + excerptSize;
            lastSize = excerptSize;
            finished = false;
            System.out.println("positionAddr="+positionAddr);

Chronicle for off heap cache

Hi,

Is there any use case where chronicle used as a base for off-heap cache implementation. I have some feeling that we can use chronicle architecture to build an off-heap cache library. And I saw that you have started implementing HugeCollections that will be similar to what I am thinking...

What do you think? It is possible/feasible ?
And if it is possible what is the starting point do you have any advice?

regards

OSGiBundleTest: Exporting java.* packages not allowed: java.lang (Windows 8.1)

After lburgazzoli helped me to solve the main problems when running "mvn test" for Java-Chronicle on a Windows 8.1 machine, with his fix:
https://github.com/lburgazzoli/Java-Chronicle/tree/HFT-CHRON-27

I got some other problems when running "mvn install" on Chronicle-Test. @lburgazzoli I know you now have a windows system at hand, so you might found the same issue already, but in case you missed it, here it is:

-------------------------------------------------------------------------------
Test set: net.openhft.chronicle.osgi.OSGiBundleTest
-------------------------------------------------------------------------------
Tests run: 2, Failures: 0, Errors: 1, Skipped: 1, Time elapsed: 0.851 sec <<< FAILURE! - in net.openhft.chronicle.osgi.OSGiBundleTest
checkInject(net.openhft.chronicle.osgi.OSGiBundleTest)  Time elapsed: 0.699 sec  <<< ERROR!
org.osgi.framework.BundleException: Exporting java.* packages not allowed: java.lang
    at org.apache.felix.framework.util.manifestparser.ManifestParser.normalizeExportClauses(ManifestParser.java:729)
    at org.apache.felix.framework.util.manifestparser.ManifestParser.<init>(ManifestParser.java:191)
    at org.apache.felix.framework.BundleRevisionImpl.<init>(BundleRevisionImpl.java:118)
    at org.apache.felix.framework.BundleImpl.createRevision(BundleImpl.java:1199)
    at org.apache.felix.framework.BundleImpl.<init>(BundleImpl.java:96)
    at org.apache.felix.framework.Felix.installBundle(Felix.java:2899)
    at org.apache.felix.framework.BundleContextImpl.installBundle(BundleContextImpl.java:165)
    at org.apache.felix.framework.BundleContextImpl.installBundle(BundleContextImpl.java:138)
    at org.ops4j.pax.exam.nat.internal.NativeTestContainer.installAndStartBundles(NativeTestContainer.java:307)
    at org.ops4j.pax.exam.nat.internal.NativeTestContainer.start(NativeTestContainer.java:175)
    at org.ops4j.pax.exam.spi.reactors.AllConfinedStagedReactor.invoke(AllConfinedStagedReactor.java:79)
    at org.ops4j.pax.exam.junit.impl.ProbeRunner$2.evaluate(ProbeRunner.java:278)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.ops4j.pax.exam.junit.impl.ProbeRunner.run(ProbeRunner.java:112)
    at org.ops4j.pax.exam.junit.PaxExam.run(PaxExam.java:93)
    at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
    at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)

readEnum causes BufferUnderflowException in 3.2.2 release

Hi,

After updating to 3.2.2 release calling ExcerptTailer.readEnum() causes the exception below. The same code runs fine in 3.2.3 SNAPSHOT.

Best regards
Slawomir

java.lang.IllegalStateException: java.nio.BufferUnderflowException
at net.openhft.lang.io.AbstractBytes.readInstance(AbstractBytes.java:1885)
at net.openhft.lang.io.AbstractBytes.readEnum(AbstractBytes.java:1740)

Caused by: java.nio.BufferUnderflowException
at net.openhft.lang.io.AbstractBytes.returnOrThrowEndOfBuffer(AbstractBytes.java:246)
at net.openhft.lang.io.AbstractBytes.readByteOrThrow(AbstractBytes.java:241)
at net.openhft.lang.io.AbstractBytes.readUnsignedByteOrThrow(AbstractBytes.java:237)
at net.openhft.lang.io.AbstractBytes.readUTF0(AbstractBytes.java:396)
at net.openhft.lang.io.AbstractBytes.appendUTF0(AbstractBytes.java:389)
at net.openhft.lang.io.AbstractBytes.readUTFΔ(AbstractBytes.java:375)
at net.openhft.lang.io.serialization.impl.ClassMarshaller.read(ClassMarshaller.java:72)
at net.openhft.lang.io.serialization.impl.ClassMarshaller.read(ClassMarshaller.java:35)
at net.openhft.lang.io.serialization.BytesMarshallableSerializer.readSerializable(BytesMarshallableSerializer.java:117)
at net.openhft.lang.io.AbstractBytes.readInstance(AbstractBytes.java:1883)
... 16 more

Chronicle v2.0.1 test failures

Two of the tests fail when I try to build Chronicle v2.0.1 -

testOverTCP(net.openhft.chronicle.InProcessChronicleTest) Time elapsed: 0.256 sec <<< FAILURE!
java.lang.AssertionError: ' 1' expected:<1457219> but was:<1>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:743)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:555)
at net.openhft.chronicle.InProcessChronicleTest.testOverTCP(InProcessChronicleTest.java:90)
[...]

and

multiThreaded(net.openhft.chronicle.IndexedChronicleTest) Time elapsed: 2.484 sec <<< FAILURE!
java.lang.AssertionError: index: 104798483
at net.openhft.chronicle.IndexedChronicleTest.multiThreaded(IndexedChronicleTest.java:277)
[...]

This was run on an intel x5677, java version 1.7.0_40

ChronicleConfig.TEST cannot be used with default message capacity

ChronicleConfig.TEST's default message capacity results in an exception when trying to start an excerpt via an appender:

java.lang.IllegalArgumentException: Capacity too large 131072 >= 8192
    at net.openhft.chronicle.IndexedChronicle$IndexedExcerptAppender.startExcerpt(IndexedChronicle.java:779)
    at net.openhft.chronicle.IndexedChronicle$IndexedExcerptAppender.startExcerpt(IndexedChronicle.java:768)

Workaround is to manually set a message capacity on the config that is < 8192. But ideally, the default capacity shouldn't result in an exception.

Using 3.2.2.

Thread ID larger than 16 bits leads to FileNotFoundException

Using 3.4.0 of the Chronicle Queue, when starting a chronicle if Kernel32.INSTANCE.GetCurrentThreadId() within WindowsJNAAffinity returns an integer larger than 65535 then you will receive an exception such as

Exception in thread "main" java.lang.AssertionError: java.io.FileNotFoundException: D:\Chronicle\MD\20150224\data-34788-0 at net.openhft.chronicle.VanillaChronicle$AbstractVanillaExcerpt.index(VanillaChronicle.java:409)
at net.openhft.chronicle.VanillaChronicle$AbstractVanillaExcerpt.nextIndex(VanillaChronicle.java:423)

Whats quite nasty about this is that everything will work as expected until you hit the uint16 max value + 1'th kernel thread ID, whilst we are not using Windows for production if we did this would be very nasty as it cannot be easily remedied.

It would appear that when determining THREAD_ID_BITS in VanillaChronicle a call to Jvm.PID_BITS is made, whilst the pid's may max out at the specified value thread Id's can and will go higher (the Windows Kernel API call returns a 32bit DWORD) so im not sure if the fix is to increase the Windows value to also be 24 bits as per the Linux change or if other changes are required to separate pids/thread id's.

This pull request previously addressed this issue for Linux OS's
#64

chronice-queue performance on win os seems very slow.

Hi,peter. while I try to use the persistQueue with reactor(https://github.com/reactor/reactor) integration with chronice-queue (just pay some time to look at the class IndexedChronicleQueuePersistor which located in reactor.queue package). I made some simple wrapper class for testing and the performance is very slow in win os. just like

import java.io.IOException;

import net.openhft.chronicle.ChronicleConfig;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.io.encoding.kryo.KryoCodec;
import reactor.queue.IndexedChronicleQueuePersistor;
import reactor.queue.PersistentQueue;
import reactor.queue.QueuePersistor;

import com.zjht.channel.component.util.string.RandomHelper;
import com.zjht.channel.spi.msg.transfer.ChannelTransferMsg;
import com.zjht.channel.spi.util.FilePathHelper;

public class TestPersistQueue {

public static void main(String[] args) throws Exception {
    PersistentQueue<ChannelTransferMsg> transferQueue = new PersistentQueue<ChannelTransferMsg>(getTransferMsgPersitor());

    System.out.println("before poll1 queue===" + transferQueue.isEmpty());

    long t1 = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        transferQueue.offer(ChannelTransferMsg.createTransferMsg(RandomHelper.randomNumeric().getBytes()));
    }

    long t2 = System.currentTimeMillis();

    while (!transferQueue.isEmpty()) {
        ChannelTransferMsg transferMsg = transferQueue.poll();

        System.out.println("1===" + transferMsg.toString());
    }

    long t3 = System.currentTimeMillis();

    System.out.println("offer cost==={" + (t2 - t1) + "}ms,poll cost==={" + (t3 - t2) + "}ms");

    System.out.println("after poll1 queue===" + transferQueue.isEmpty());

    while (!transferQueue.isEmpty()) {
        ChannelTransferMsg transferMsg = transferQueue.poll();

        System.out.println("2========" + transferMsg.toString());
    }

    System.out.println("after poll2 queue===" + transferQueue.isEmpty());
}

private static QueuePersistor<ChannelTransferMsg> getTransferMsgPersitor() throws IOException {
    String basePath = FilePathHelper.getUserDirPath() + "/" + "persist";

    System.out.println("basePath===" + basePath);

    IndexedChronicleQueuePersistor<ChannelTransferMsg> indexChronicePersistor = new IndexedChronicleQueuePersistor<ChannelTransferMsg>(
            basePath, getReactorCodec(), false, true, ChronicleConfig.DEFAULT.clone());

    return indexChronicePersistor;
}

private static Codec<Buffer, ChannelTransferMsg, ChannelTransferMsg> getReactorCodec() {
    return new KryoCodec<ChannelTransferMsg, ChannelTransferMsg>();
}

}

here comes the test result in my notebook(windows xp 32bit and JDK1.7.0_65):
offer cost==={312}ms,poll cost==={109}ms
after poll1 queue===true
after poll2 queue===true

so would you mind sparing sometime to locate where my problem for I did think the performance should not reduce so heavily if compared with in-memory way.

Problem building eclipse project

I don't know it is an issue but I can not build eclipse project from code because
maven can not find openhft/java-lang jar

[DEBUG] Failure to find net.openhft:lang:jar:6.0.2-SNAPSHOT in https://repository.jboss.org/nexus/content/groups/public-jboss/ was cached in the local repository, resolution wi
ll not be reattempted until the update interval of jboss-public-repository-group has elapsed or updates are forced

I tried to build java-lang from source but has some problems with "delta" character :)

UTFDataFormatException: malformed input around byte

Test case:

Producer writes string consisting of 65632 characters 'a', and consumer tries to read it, but fails with:
java.lang.IllegalStateException: java.io.UTFDataFormatException: malformed input around byte 65535 ( to get other byte ordinals instead of 65535 - one can experiment with some characters other than 'a' whose UTF-8 encoding has more bytes ).

Produced string can be read with readUTFΔ from source Chronicle without any problem, but not from sink Chronicle.
This is what net.openhft.chronicle.tools.ChronicleReader shows near the end of sink data chronicle:
aaaa 8_\0c 1_\0^A 1*\0\xe0\x80^Daaaaa ...

Failing unit test: uranium/Java-Chronicle@656b350

Chronicle put/get performance stability

Hi,

I am doing performance tests on simple cache implementation to see overall performance. Below I have two results for different runs. It is normal to see some differences between runs but 3x-4x performance decrease is strange.

Took 14.937 secs to get seq 20,000,000 entries
Took 4.193 secs to get seq 20,000,000 entries

Because we do not have intensive heap usage, GarbageCollector is not victim,
What may be the reason for such a performance decrease, where should I check, monitor?

regards

Illegal state exception around datafile rollover

In V 3.2.1 I am getting IllegalStateExceptions around rollover of Vanilla data file.
java.lang.IllegalStateException: Cannot write 224 only 216 remaining
at net.openhft.lang.io.AbstractBytes.checkWrite(AbstractBytes.java:746) ~[lang-6.4.6.jar:na]
at net.openhft.lang.io.AbstractBytes.write(AbstractBytes.java:740) ~[lang-6.4.6.jar:na]
at net.openhft.chronicle.tools.WrappedExcerpt.write(WrappedExcerpt.java:305) ~[chronicle-3.2.1.jar:na]

Calling startExceprt() twice results in weird state for IndexedExcerptAppender

If you call startExcerpt(), write some bytes, then call startExceprt() again (without calling finish()), you end up in a funny state.

In Chronicle 1.x the second call to startExcerpt() would move the position of the Excerpt back to the start and the already written bytes would be discarded.

I liked this behaviour so I was thinking a fix would be to add something like:

if (!finished) {
    position(0);
}

We just added the above workaround in our code but it would be easy for us to miss somewhere.

Issue with deletion of chronicle file

I noticed that there is issue with delete operation for chronicle files, what's complicating tests (problem with creating namy test in one unit for the same chronicle)

it is test example

public class CloseChronicleTest {

    @Test
    public void simpleChronicleTest1() throws IOException {
        final String basePath = "simpleTest";
        deleteAndAssert(basePath);
        IndexedChronicle chronicle = new IndexedChronicle(basePath);
        chronicle.close();
        deleteAndAssert(basePath);
    }

    @Test
    public void simpleChronicleTest2() throws IOException {
        final String basePath = "simpleTest";
        deleteAndAssert(basePath);
        IndexedChronicle chronicle = new IndexedChronicle(basePath);
        ExcerptAppender appender = chronicle.createAppender();
        appender.close();
        chronicle.close();
        // it must be called because of JDK-6558368
        System.gc();
        deleteAndAssert(basePath);
    }

    private void deleteAndAssert(String basePath) throws IOException {
        for (String name : new String[] { basePath + ".data", basePath + ".index" }) {
            File file = new File(name);
            if (file.exists()) {
                Assert.assertTrue(file.getCanonicalPath(), file.delete());
            }
        }
    }
}

it seems that it could be working, but we need to nullified mappedbuffer in two classes.
What do you think about it? IMHO it would make testing easier.
Check out:
https://github.com/pwielgolaski/Java-Chronicle/commit/2735c21d277c55252e9cde6a4e26fafc98997227

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.