Comments (10)
Hello @agolovenko, thanks for the PR.
We investigated the issue further and the core of the problem seems to be in CatalystDataToAvro
. To answer your question: yes the warning shouldn't be created if everything works as it should. I'm already working on a fix to both of these issues. It should be finished tomorrow.
from abris.
Hi @agolovenko ,
First of all, you're doing everything correctly.
Regarding the logs, they seem to be generated by Spark, but should not harm performance. You can disable them in the logging configs. Are you using log4j?
Regarding performance, it might depend on several factors. What is the latency of your network? What is the size of your message? What is the record rate on the production side? What is the latency introduced by the security layer?
To have a better understanding of the performance factors, can you run the same workload in a different cluster or turn-off security?
Cheers.
from abris.
Hi @felipemmelo, thanks for the prompt reply.
Regarding the logs messages: they seem to originate here in Abris: https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/read/confluent/SchemaManager.scala#L183. Sure I can set the logLevel or disable that. But before doing so I'd like to understand that the code works as supposed to. And if it does then why post a warning?
Regrading performance:
- Network & security latency: We've tried the exactly same setup but with a dummy stream just reading and writing binary data without any avro parsing and throughtput increased by x1000:
val outputDF = inputDF.select(col("value"))
- We generate records ourselves. But to ensure that it's not a bottleneck I was running stream on a topic that already had 1M of messages, set
startingOffsets
toearliest
and cleaned the checkpoints folder. - Regarding the message, in both cases it was the same:
- payload:
{"firstname": "Ashot", "lastname": "Golovenko", "country":"RU"}
- schema:
{"type":"record","name":"ProofOfConcept","fields":[{"name":"firstname","type":"string"},{"name":"lastname","type":"string"},{"name":"country","type":"string"}]}
- payload:
Thanks!
from abris.
Update: we've isolated the performance problem to writing in confluent avro format using to_confluent_avro
:
This
val outputDF = inputDF
.select(from_confluent_avro(col("value"), schemaRegistryConfigIn) as "parsed_message")
.select("parsed_message.*")
.select(to_avro(struct("firstname", "lastname", "country")) as "value")
vs this:
val outputDF = inputDF
.select(from_confluent_avro(col("value"), schemaRegistryConfigIn) as "parsed_message")
.select("parsed_message.*")
.select(
to_confluent_avro(struct("firstname", "lastname", "country"), schemaRegistryConfigOut) as "value"
)
Gives 1000x performance boost. Still no idea why though.
from abris.
Update 2:
I'm quite confident I've found the source of low throughput.
I was debugging the execution and this chain is causing large delays being executed on every record:
- https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/sql/CatalystDataToAvro.scala#L44
- https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/sql/CatalystDataToAvro.scala#L76
- https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/parsing/utils/AvroSchemaUtils.scala#L102
The last line calls two checks SchemaManager.exists(subject)
and SchemaManager.isCompatible(schema, subject)
that albeit use CachedSchemaRegistryClient
aren't cached calls and poll the rest API of schema registry. This happens twice per record.
https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/read/confluent/SchemaManager.scala#L216-L241
Removing these checks is getting the performance to the desired level (I tweaked the snapshot version). To me, it looks like these checks should be done once only per schema.
from abris.
Hi @agolovenko , yes, that definitely makes sense. We'll get into this and try to release a new version asap.
In the meantime, since you've already done, wouldn't you like to submit the change as a PR?
from abris.
There's quite a difference between a quick patch and "doing it the right way". I simply commented out the checks which caused the slowdown.
I added a quick PR of how I see the fix could look like but it breaks the tests, and I'm not sure why since I didn't have time to dig deep in code. Suggestions appreciated. #86
from abris.
My other question still stands though: if the code is working as designed then why is it logging warnings? https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/read/confluent/SchemaManager.scala#L183
from abris.
Hello @agolovenko, the issues were fixed. Would you mind trying it by yourself before we release new version of Abris?
The fix is already in master branch so you can just build it with Maven from there. By default, Abris is build for Scala 2.12, but here is a guide how to build it for 2.11 in case you need it.
from abris.
Hi @cerveada ! Just checked the fix and it works great. The performance is back to the exected level now. Thanks a lot!
from abris.
Related Issues (20)
- from_avro converts `\uFFFD` to a question mark HOT 1
- schema registry being called with http instead of https HOT 2
- Improve code-coverage & add GH check action HOT 1
- Fix JaCoCo CI for PRs from forked repos HOT 1
- update madrapps/jacoco-report
- Detect different schema versions in batch HOT 5
- Revert pull_request action back HOT 3
- TopicNameStrategy issue HOT 1
- Split GitHub actions for tests and test coverage
- Multiple schemas in one topic example HOT 1
- Spark 3.4 Support HOT 13
- malformed records to topic HOT 2
- foreach batch download by schem id HOT 3
- Container exited with a non-zero exit code 137 | Out of memory HOT 5
- Issues running inside Scala notebook on databricks HOT 1
- Fix tests for Spark 3.5.0
- Fix NoSuchMethodException in Spark 3.5.x
- get key from avro message HOT 3
- Compatibility with Spark 3.5 HOT 3
- Version 6.4.0 failing for Spark 3.5.0 HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from abris.