Giter Site home page Giter Site logo

kcache's Introduction

KCache - An In-Memory Cache Backed by Apache Kafka

Build Status Maven Javadoc

KCache is a client library that provides an in-memory cache backed by a compacted topic in Kafka. It is one of the patterns for using Kafka as a persistent store, as described by Jay Kreps in the article It's Okay to Store Data in Apache Kafka.

Maven

Releases of KCache are deployed to Maven Central.

<dependency>
    <groupId>io.kcache</groupId>
    <artifactId>kcache</artifactId>
    <version>5.0.0</version>
</dependency>

For Java 11 or above, use 5.x otherwise use 4.x.

Usage

An instance of KafkaCache implements the java.util.SortedMap interface. Here is an example usage:

import io.kcache.*;

String bootstrapServers = "localhost:9092";
Cache<String, String> cache = new KafkaCache<>(
    bootstrapServers,
    Serdes.String(),  // for serializing/deserializing keys
    Serdes.String()   // for serializing/deserializing values
);
cache.init();   // creates topic, initializes cache, consumer, and producer
cache.put("Kafka", "Rocks");
String value = cache.get("Kafka");  // returns "Rocks"
cache.remove("Kafka");
cache.close();  // shuts down the cache, consumer, and producer

One can also use RocksDB to back the KafkaCache:

Properties props = new Properties();
props.put(KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(KafkaCacheConfig.KAFKACACHE_BACKING_CACHE_CONFIG, "rocksdb");
props.put(KafkaCacheConfig.KAFKACACHE_DATA_DIR_CONFIG, "/tmp");
Cache<String, String> cache = new KafkaCache<>(
    new KafkaCacheConfig(props),
    Serdes.String(),  // for serializing/deserializing keys
    Serdes.String()   // for serializing/deserializing values
);
cache.init();

Basic Configuration

KCache has a number of configuration properties that can be specified.

  • kafkacache.bootstrap.servers - A list of host and port pairs to use for establishing the initial connection to Kafka.
  • kafkacache.group.id - The group ID to use for the internal consumer. Defaults to kafkacache.
  • kafkacache.client.id - The client ID to use for the internal consumer. Defaults to kafka-cache-reader-<topic>.
  • kafkacache.topic - The name of the compacted topic. Defaults to _cache.
  • kafkacache.topic.replication.factor - The desired replication factor for the compacted topic. Defaults to 3.
  • kafkacache.topic.num.partitions - The desired number of partitions for for the compacted topic. Defaults to 1.
  • kafkacache.topic.partitions - A list of partitions to consume, or all partitions if not specified.
  • kafkacache.topic.partitions.offset - The offset to start consuming all partitions from, one of beginning, end, a positive number representing an absolute offset, a negative number representing a relative offset from the end, or @<value>, where <value> is a timestamp in ms. Defaults to beginning.
  • kafkacache.init.timeout.ms - The timeout for initialization of the Kafka cache, including creation of the compacted topic. Defaults to 300 seconds.
  • kafkacache.timeout.ms - The timeout for an operation on the Kafka cache. Defaults to 60 seconds.
  • kafkacache.backing.cache - The backing cache for KCache, one of memory (default), bdbje, caffeine, lmdb, mapdb, rdbms, or rocksdb.
  • kafkacache.data.dir - The root directory for backing cache storage. Defaults to /tmp.

Configuration properties can be passed as follows:

Properties props = new Properties();
props.setProperty("kafkacache.bootstrap.servers", "localhost:9092");
props.setProperty("kafkacache.topic", "_mycache");
Cache<String, String> cache = new KafkaCache<>(
    new KafkaCacheConfig(props),
    Serdes.String(),  // for serializing/deserializing keys
    Serdes.String()   // for serializing/deserializing values
);
cache.init();
...

Security

KCache supports both SSL authentication and SASL authentication to a secure Kafka cluster. See the JavaDoc for more information.

Using KCache as a Replicated Cache

KCache can be used as a replicated cache, with some caveats. To ensure that updates are processed in the proper order, one instance of KCache should be designated as the sole writer, with all writes being forwarded to it. If the writer fails, another instance can then be elected as the new writer.

For an example of a highly-available service that wraps KCache, see Keta.

kcache's People

Contributors

dependabot-preview[bot] avatar dependabot[bot] avatar eikemeier avatar rayokota avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kcache's Issues

Dependencies

I think scala-libraryadded by mistake.

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

And I believe it is better to make rocksdb optional for project since the binary is around 20 mb.

Connection to node failed authentication due to: Failed to process post-handshake messages

Hi!

I am a fan of KCache, and have lately been using it quite extensively! Great work!

However, my application occasionally outputs some Kafka related error messages. More precisely, the most common error messsage is

Connection to node failed authentication due to: Failed to process post-handshake messages

with the following stack trace

io.kcache.exceptions.CacheTimeoutException: Put operation timed out while waiting for an ack from Kafka
	at io.kcache.KafkaCache.put(KafkaCache.java:533)
	at io.kcache.KafkaCache.put(KafkaCache.java:490)
	at Cache.insert(Cache.kt:50)
	at.Cache.insert(Cache.kt:54)
	at ConsumerThread.addToCache(ConsumerThread.kt:56)
	at ConsumerThread.addToCache$default(ConsumerThread.kt:55)
	at.ConsumerThread.run(ConsumerThread.kt:35)
Caused by: java.util.concurrent.TimeoutException: Timeout after waiting for 60000 ms.
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:78)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
	at io.kcache.KafkaCache.put(KafkaCache.java:522)
	... 6 common frames omitted

To clarify, for my application I have implemented a wrapper class Cache, containing simple logic, around the KCache .

Do you have any tips on how to overcome/solve these errors?

KAFKACACHE_BOUNDED_CACHE_EXPIRY_SECS_CONFIG is not working

I am using 4.0.11 and trying to create a cache where entries expire after few seconds. But even when I am doing multiple writes to different keys, the old keys are not expiring which is expected out of backing guava cache. I am using cleanup policy delete
and below config

    Properties props = new Properties();
    props.put(KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG, "cache-test");
    props.put(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG, "cache-test-2");
    props.put(KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG, "test-cache-client-2");
    props.put(KafkaCacheConfig.KAFKACACHE_TOPIC_REPLICATION_FACTOR_CONFIG, 1);
    props.put(KafkaCacheConfig.KAFKACACHE_TOPIC_REQUIRE_COMPACT_CONFIG, false);
    
    props.put(KafkaCacheConfig.KAFKACACHE_BACKING_CACHE_CONFIG, "memory");
    props.put(KafkaCacheConfig.KAFKACACHE_BOUNDED_CACHE_EXPIRY_SECS_CONFIG, 1);
    
    cache.put("1", "first");
    Thread.sleep(3000);
    cache.put("2", "2");
    print(cache.get("1")) //not empty

Query with different keys

I really love Kafka Cache. We have been using it with processors but we want to switch to your library. We have items around thousands and holding them as a key and value our in memory caches. However there are some cases we need to query not only by id also by other keys. Would you please add such a feature to query by other way.
For example we have around 1000 shops. We can query them by id with your library but we would like to query by name too.
Thanks

question about topic setup

Hi thanks for great work of this library.

Question what do u recommend on the number of topics creation?

Initially i thought to have one global topic, and then each key in event can be defined uniquely by each app. But this would eventually lead to big memory foot print.

So I guess a better approach would be to create a topic per application and environment?

compilation error in ClusterTestHarness.java

Hi, what's the issue? ClusterTestHarness.java

line 85
servers.get(i).config().advertisedListeners().head().host(),

java.lang.Error: Unresolved compilation problem:
The method head() is ambiguous for the type Seq

at io.kcache.utils.ClusterTestHarness.setUp(ClusterTestHarness.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:539)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:761)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:461)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:207)

io.kcache.exceptions.CacheInitializationException: Failed trying to create or validate topic my-topic

Hi!

I have an application running KCache. The application consists of several threads that each polls from a topic and then saves the records to their own kcache. Periodically, each thread reads a set of the stored record and analyze them. The application is deployed on OpenShift as a container and runs fine for a good period of time.

However, after about twelve hours of runtime, the application crashes with the following error message:

Exception in thread "main" io.kcache.exceptions.CacheInitializationException: Failed trying to create or validate my-topic-kcache
	at io.kcache.KafkaCache.createOrVerifyTopic(KafkaCache.java:350)
	at io.kcache.KafkaCache.init(KafkaCache.java:267)
	at utils.Cache.<init>(Cache.kt:40)
	at consumers.MyThread.<init>(MyThread.kt:39)
	at ApplicationKt.module(Application.kt:88)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at kotlin.reflect.jvm.internal.calls.CallerImpl$Method.callMethod(CallerImpl.kt:97)
	at kotlin.reflect.jvm.internal.calls.CallerImpl$Method$Static.call(CallerImpl.kt:106)
	at kotlin.reflect.jvm.internal.KCallableImpl.call(KCallableImpl.kt:108)
	at kotlin.reflect.jvm.internal.KCallableImpl.callDefaultMethod$kotlin_reflection(KCallableImpl.kt:159)
	at kotlin.reflect.jvm.internal.KCallableImpl.callBy(KCallableImpl.kt:112)
	at io.ktor.server.engine.internal.CallableUtilsKt.callFunctionWithInjection(CallableUtils.kt:119)
	at io.ktor.server.engine.internal.CallableUtilsKt.executeModuleFunction(CallableUtils.kt:36)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading$launchModuleByName$1.invoke(ApplicationEngineEnvironmentReloading.kt:332)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading$launchModuleByName$1.invoke(ApplicationEngineEnvironmentReloading.kt:331)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.avoidingDoubleStartupFor(ApplicationEngineEnvironmentReloading.kt:356)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.launchModuleByName(ApplicationEngineEnvironmentReloading.kt:331)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.access$launchModuleByName(ApplicationEngineEnvironmentReloading.kt:30)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading$instantiateAndConfigureApplication$1.invoke(ApplicationEngineEnvironmentReloading.kt:312)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading$instantiateAndConfigureApplication$1.invoke(ApplicationEngineEnvironmentReloading.kt:310)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.avoidingDoubleStartup(ApplicationEngineEnvironmentReloading.kt:338)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.instantiateAndConfigureApplication(ApplicationEngineEnvironmentReloading.kt:310)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.createApplication(ApplicationEngineEnvironmentReloading.kt:143)
	at io.ktor.server.engine.ApplicationEngineEnvironmentReloading.start(ApplicationEngineEnvironmentReloading.kt:277)
	at io.ktor.server.netty.NettyApplicationEngine.start(NettyApplicationEngine.kt:174)
	at io.ktor.server.netty.EngineMain.main(EngineMain.kt:26)
	at ApplicationKt.main(Application.kt:33)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1632808693387, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
	at io.kcache.KafkaCache.createOrVerifyTopic(KafkaCache.java:336)
	... 29 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1632808693387, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics
{"@timestamp":"2021-09-28T07:57:57.759+02:00","@version":"1","message":"Thread kafka-cache-reader-thread-thread1-kcache exiting with uncaught exception: ","logger_name":"io.kcache.utils.ShutdownableThread","thread_name":"kafka-cache-reader-thread-thread1-kcache","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError: Java heap space\n"}
Exception in thread "thread1" java.lang.OutOfMemoryError: Java heap space
Exception in thread "kafka-cache-reader-thread-thread1-kcache" java.lang.OutOfMemoryError: Java heap space

The caches have the following configuration:

val cacheProps = Properties()
            cacheProps[KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG] =
                appConfig.propertyOrNull("kafka.bootstrapServers")?.getString()
            cacheProps[KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG] =
                appConfig.propertyOrNull("kafka.consumerGroupIdPrefix")?.getString() + "-$name-kcache"
            cacheProps[KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG] =
                appConfig.propertyOrNull("applicationId")?.getString() + "$name-kcache"
            cacheProps[KafkaCacheConfig.KAFKACACHE_SECURITY_PROTOCOL_CONFIG] =
                appConfig.propertyOrNull("kafka.securityProtocol")?.getString()

Has anyone experienced anything similar, or have a solution to this problem?

Property kafkacache.bounded.cache.expiry.secs doesn't seem to work

I'm using 4.0.2 and it seems property kafkacache.bounded.cache.expiry.secs doesn't work.
Further simplifying it, I've directly created an InMemoryBoundedCache instance since it is used by KafkaCache when expiry time is defined, put an item to it, and my expectation was that it had to be evicted after waiting longer than the defined expiry time was.

Cache<String, String> cache = new InMemoryBoundedCache<>(null, Duration.ofSeconds(1), null);
cache.put("1", "first");
Thread.sleep(3000);
assertTrue(cache.isEmpty()); // <- it fails

Using Kcache as a replicated cache

I am thinking of using kcache as a replicated cache in my application where all my instances would have the whole copy of the cache. In the docs it says that kcache may have problems using as a replicated cache in terms of ordering of updates, but in my application when considering a context of few seconds, even if there are updates from multiple instances they are of same key value pair. So I think it won't be a problem for me. I wrote a test code and all key value pairs get replicated to each instance correctly. Can I really face some issues regarding this in production with the cache?

I am adding my test code below

String bootstrapServers = "localhost:9092";
Properties props = new Properties();
props.put(KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG, "cache-test");
props.put(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG, "cache-test");
props.put(KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG, "test-cache-client");
Cache<String, String> cache = new KafkaCache<>(new KafkaCacheConfig(props), Serdes.String(), Serdes.String());
cache.init();
while (true) {
System.out.println("ENTER KEY");
Scanner in = new Scanner(System.in);
String s = in.nextLine();
System.out.println("ENTER VALUE");
String value = in.nextLine();
LocalDateTime start = LocalDateTime.now();
String output = cache.get(s);
System.out.println("DUARATIONGET:: " + Duration.between(start, LocalDateTime.now()).toMillis());
start = LocalDateTime.now();
cache.put(s, value);
System.out.println(output);
System.out.println("DUARATIONPUT:: " + Duration.between(start, LocalDateTime.now()).toMillis());
}

[Question]: Generic initialization

I have a problem making the initialization of a cache generic. The code is as follows:

class MyCache(name: String, test: Boolean = true) {
    private val cache: SortedMap<String, MyAvroObject>
    init {
        if (test) {
            cache = sortedMapOf()
        }
        else {
            val cacheProps = Properties()
            cacheProps[KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG] =
                appConfig.propertyOrNull("kafka.bootstrapServers")?.getString()
            cacheProps[KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG] =
                appConfig.propertyOrNull("applicationId")?.getString()
            cacheProps[KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG] = "GeneratingUnitsCache"
            cacheProps[KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG] = "app.$name.kcache"
            val serdeConfig = Collections.singletonMap(
                AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                appConfig.property("kafka.schemaRegistryUrl").getString(),
            )
            //*** WISH TO MAKE THIS PART GENERIC
            val avroSerdes: Serde<MyAvroObject> = SpecificAvroSerde()
            avroSerdes.configure(serdeConfig, false)
            cache = KafkaCache(
                KafkaCacheConfig(cacheProps),
                Serdes.String(),
                avroSerdes
            )
            cache.init()
            //***
        }
    }
}

Depending on which topic I consume from, the cache will contain different objects. For example, the cache could contain MyAvroObject, String or MyAvroObject2. The problem then becomes to initialize the cache with the correct Serdes (serializer / deserializer) depending on what kind of objects the cache will contain.

I have tried to make the actual definition of the cache generic:

class MyCache<T>(name: String, test: Boolean = true){
    private val cache: SortedMap<String, T>
...

but then I struggle to get the type of T when I have to choose the right Serdes. Is it possible to make the initialization generic?

JAAS Configuration options

How do we provide JAAS Configuration options from the KafkaCacheConfig setup? JAAS Config string of the form -"com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;".

Does there exist a changelog or release notes?

Hi

I tried to find out what had happened between my currently installed version of KCache and the newest version, which differed with several minor versions. I couldn't find a changelog or release notes, so I was forced to try to do a git diff between the tag of my version and current. So I am wondering if there is a changelog or release notes somewhere that I didn't find?

What's the proper way to handle a subset of data?

If I want my cache to maintain only a subset of the data in the topic it's consuming what's the appropriate way to handle this.

Have the serde return null values for records that aren't applicable?

issues while connecting topics (SASL_SSL) to Kcache(4.0.3)

hi @rayokota
I have started using 4.0.3 which has the parameter to provide jaas config.

However there is a issue in that if I use a topic that is already created :
props.setProperty(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG, "<Existing_topic>");
We get below error :
io.kcache.exceptions.CacheInitializationException: Failed trying to create or validate topic <Existing_topic>

......

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
at io.kcache.KafkaCache.createTopic(KafkaCache.java:396)
at io.kcache.KafkaCache.createOrVerifyTopic(KafkaCache.java:350)

Note :
The calling method's class, io.kcache.KafkaCache, was loaded from the following location:

jar:file:/Users/breddy3/.m2/repository/io/kcache/kcache/4.0.3/kcache-4.0.3.jar!/io/kcache/KafkaCache.class
The called method's class, org.apache.kafka.clients.admin.DescribeTopicsResult, is available from the following locations:

jar:file:/Users/breddy3/.m2/repository/org/apache/kafka/kafka-clients/3.0.1/kafka-clients-3.0.1.jar!/org/apache/kafka/clients/admin/DescribeTopicsResult.class
The called method's class hierarchy was loaded from the following locations:

org.apache.kafka.clients.admin.DescribeTopicsResult: file:/Users/breddy3/.m2/repository/org/apache/kafka/kafka-clients/3.0.1/kafka-clients-3.0.1.jar
If we use a topic that is non Existing :
props.setProperty(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG, "");
io.kcache.exceptions.CacheInitializationException: Failed trying to create or validate topic
at io.kcache.KafkaCache.createOrVerifyTopic(KafkaCache.java:360)
at io.kcache.KafkaCache.init(KafkaCache.java:276)

Need some understanding on writing data from Multiple JVMs

Hey @rayokota ,

Thanks for the awesome library. We are planning to use it in production.

However in our case we want to sync data from various JVms (each running an instance of Kcache).
My question is what is the con :- Is it that we may get an older version of data as the latest ?

Secondly what happens when we use a separate producer to send data to the same Kafka topic KCache is reading from. Will that work ?

Regards

KCache vs KTable / Replicated caches

Thank you very much for KCache. I have learned quite a lot by looking into the code. Actually this is not an issue but more like some questions for a better understanding.

I was wondering how KCache compares to a KTable / GlobalKTable. Like KCache, a KTable can be backed by a store like RocksDB which allows me to query for a specific value. Updating the values is a bit cumbersome in a KTable compared to KCache as one needs to write to the corresponding topic. Are there other differences I am not aware of?

Further, I am not sure if I understand the limitation of using KCache as a replicated cache. Does it mean, that there should only be one instance/application writing to the cache while there can be multiple consumers? In my use case, I would like to write some configuration properties to a log-compacting topic with multiple readers. For resilience purposes I wanted to replicate the writer. Does this mean, I need to do the leader election on my own?

Check for compacted topic does not account for "cleanup.policy=compact,delete"

Use case:

  • An expiring localCache implementation, like caffeine or guava, can be paired with a cleanup.policy=compact,delete topic that will expire old entries. This will result in the option to have a Kafka-backed LRU-style cache.

Issue:

  • KafkaCache.verifyTopic(...) throws (or logs WARN) when a cache topic is configured to cleanup.policy=compact,delete

Documentation:

A string that is either "delete" or "compact" or both

Proposed Solution:

  • Since the policy can include both compact and delete, the topic verification should validate that the policy includes compact, not only that it equals compact.

String retentionPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
if (!TopicConfig.CLEANUP_POLICY_COMPACT.equals(retentionPolicy)) {
String message = "The retention policy of the topic " + topic + " is not 'compact'. "
+ "You must configure the topic to 'compact' cleanup policy to avoid Kafka "
+ "deleting your data after a week. "
+ "Refer to Kafka documentation for more details on cleanup policies.";
if (requireCompact) {
log.error(message);
throw new CacheInitializationException("The retention policy of the topic " + topic
+ " is incorrect. Expected cleanup.policy to be "
+ "'compact' but it is " + retentionPolicy);
} else {
log.warn(message);
}

Add a possibility to work with topics where cleanup.policy is delete not only compact

Now for the purpose of data saving, there is a strict check if Kafka topic has cleanup.policy=compact, otherwise, Exception is thrown and the application fails. I think it will be a great idea to add some flag, and if I agree with possible data loss I can work with a topic which has cleanup.policy=delete too. It will be good for educational purposes because you could not create topics with compact policy on free Kafka online services.

Missing space in log message

--- src/main/java/io/kcache/KafkaCache.java
+++ src/main/java/io/kcache/KafkaCache.java
@@ -636,7 +636,7 @@ public class KafkaCache<K, V> implements Cache<K, V> {
                         }
                         updateOffset(record.partition(), record.offset());
                     } catch (Exception se) {
-                        log.error("Failed to add record from the Kafka topic"
+                        log.error("Failed to add record from the Kafka topic "
                             + topic
                             + " to the local cache", se);
                     }

Horizontal scalability

Is it possible to use it in distributed mode?
Sorry if I missed something but at first glance it looks like single instance only cache.
Do you have a plan to write a client that will be able to route query to appropriate instance of cache on the basis of key value?

Kafka taking lot of cpu resources

Kafka is taking a lot of cpu resources when number of messages being written to kcache are high, probably due to frequent log compactation happening in the background. Is there a way to reduce the cpu utilization?

Setting cache topic replication factor

I want to have the cache topic to be replicated. I have set props.put(KafkaCacheConfig.KAFKACACHE_TOPIC_REPLICATION_FACTOR_CONFIG, 3). In logs it says Creating the topic cache-test using a replication factor of 1, which is less than the desired one of 3.

kcache fails if SASL_SSL is enabled

I'm experiencing an error where kcache times out when connecting to a SASL_SSL enabled. I suspect it's due to the way the clients are configured:

io.kcache.exceptions.CacheInitializationException: Timed out trying to create or validate topic configuration
	at io.kcache.KafkaCache.createOrVerifyTopic(KafkaCache.java:207)
	at io.kcache.KafkaCache.init(KafkaCache.java:146)

I'm seeing that we create new properties objects and pass those into the respective clients which is then assuming that all clients are using (producer, config and admin) are using plaintext by default for communication.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.