quixio / quix-streams Goto Github PK
View Code? Open in Web Editor NEWQuix Streams - A library for data streaming and Python Stream Processing
License: Apache License 2.0
Quix Streams - A library for data streaming and Python Stream Processing
License: Apache License 2.0
Is your feature request related to a problem? Please describe.
As a software developer I wish to have tests exercising the code where I use either of these clients.
Describe the solution you'd like
Add an interface to both. Check if a common interface is possible, but it is not mandatory.
Describe alternatives you've considered
Use of
Additional context
Might need to treat this interface with mostly testing goal, rather than an unbreakable contract. Not introducing breaking change however is still vastly preferred.
Is your feature request related to a problem? Please describe.
As part of the migration from 0.4.x, the StreamPackage class was not yet fully implemented. This issue is to track that functionality restoration.
Describe the solution you'd like
Work as in 0.4.x
Additional context
At the time being it is not included in the automatic imports, so previous code using it would have to reference it Like this: https://quix.discourse.group/t/streampackage-class-not-recognized/57
Tell us about the bug
When timezone is not specified, current timezone is assumed (correctly), but unit tests are hard coded to specific nanosecond values expected. This results in failing tests when the timezine isn't exactly UTC, such as daylight saving time or locations other than those using UTC.
What did you expect to see?
Not failing unit tests
Is your feature request related to a problem? Please describe.
Following usecase is otherwise very difficult:
`def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):
for row in data.timestamps:
if row.parameters["Speed"].numeric_value > 200:
topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(row)`
Describe the solution you'd like
Add overload for this class.
Is your feature request related to a problem? Please describe.
old_way = row.parameters["EngineRPM"].numeric_value
new_way = row.parameters["EngineRPM"]
Timestamp name standard seems to be "time" (for instance when reading parameter data as a dataframe) but then, when I query that same data from influxdb it's called "Timestamp".
It's ok to be flexible to understand several names, but can we have the same one for sdk produced timestamps and influxdb?
Is your feature request related to a problem? Please describe.
Need to add apple silicon build to releases. Whether it is pre-build or build-from source python package, it is up to debate, but each release should include an Apple silicon compatible package.
Documentation for installing on apple silicon will need to be updated according to this comment: #93 (comment)
Describe the solution you'd like
A pre-baked wheel would be preferable to have the simple pip-install experience. Availability of build agents might make this more difficult than a source build.
Describe alternatives you've considered
Source build, but we have no example of that yet, so would be a first. Also not the most optimal experience for quick install.
Additional context
The pre-req .net 8 compatibility was introduced in this PR: #93
Tell us about the bug
On ubuntu 20.04 with the latest python3 (3.8.10) installed, when an exception is set via the python C API in the c# codebase, segfault occurs.
Example code to produce segfault:
from quixstreams import QuixStreamingClient
client = QuixStreamingClient("some-invalid-token") # this will throw an exception using python C API in c#, but segfaults
What did you expect to see?
No Segfault, instead a python exception thrown about invalid token.
Screenshots
If applicable, add screenshots to help explain your problem.
What tools and platforms are you using (e.g. iOS, Chrome, Desktop)
Ubuntu 20.04 (such as windows WSL, google collab)
Anything else we should know?
Works fine in other distributions or when python is compiled from source.
Is your feature request related to a problem? Please describe.
The API for RAW events in the custom schema is very hidden in the current implementation. Also is not used by Quix platform so data is not surfaced in SaaS.
Describe the solution you'd like
RAW events surfaced with existing events API.
Describe alternatives you've considered
Flattening of messages into existing data frame solution. Will come later.
Is your feature request related to a problem? Please describe.
I would like us to have generated documentation similar to other libraries that is automatically updated based on code docs rather than always relying on manual documentation being up-to-date.
Describe the solution you'd like
Whatever is acceptable for the given language, such as the Microsoft style for c#.
Is your feature request related to a problem? Please describe.
Our platform requires Quix customer schema. If not used, it is not that helpful.
Describe the solution you'd like
Custom events get materialised as tabular data.
Additional context
https://quix.notion.site/Merge-events-and-time-series-endpoints-d5ea49e51492498e90d2935decc64c24?pvs=4
Is your feature request related to a problem? Please describe.
Would like to have the option to have Redis as the alternative to currently existing LocalFileStateStorage.
Describe the solution you'd like
Implement the IStateStorage interface and expose C#/Python bindings to use it where IStateStorage can be used.
Is your feature request related to a problem? Please describe.
If a producer is idle longer than ~8_000_000ms - possibly only in azure environment -, it logs a "Kafka Producer Exception". which can incorrectly lead to an assumption that the connection is broken for good and it won't be publishing the next time it needs to.
However, this is not true as automatic reconnection happens underneath.
Describe the solution you'd like
Tune down the exception, so it is only shown when relevant, such as failure on next publish.
Describe alternatives you've considered
Keep alive, but excessive and isn't a guarantee either. Only been able to reproduce using either producer/kafka in Azure cloud.
If you try to send timeseries messages through a buffer and then immediately call the flush function, the messages that were most recently added to the buffer may not get sent.
However, if I sleep for even a 0.1 seconds before calling the flush function, all messages are sent as expected.
The same behaviour happens in both C# and python client.
Reproducible example in Benchmark kafka libraries repo
Lets move rolling window Python code from benchmark so anybody can use it.
Move it to QuixStreamingClient if there is anything not already present.
Is your feature request related to a problem? Please describe.
The goal is to not have to have code changes like this: #85
Describe the solution you'd like
Just building from a tag should set these according to the tag name.
I understand that in streaming is quite uncommon to have big chunks (batches) of data been publish, so a long dataframe with lots or rows is unlikely.
Anyhow, right now the dataframe to TimeseriesDataRaw conversion is done by row and column, which is inefficient. I'll be proposing a new vectorized version to improve speed (specially with big dataframes).
Is your feature request related to a problem? Please describe.
Add support for this in state helpers:
def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):
stream_state = stream_consumer.get_scalar_state("total_rpm", int, lambda x: 0) # default value for state name.
for row in data.timestamps:
rpm = row.parameters["EngineRPM"].numeric_value
stream_state += rpm
row.add_value("total_rpm", stream_state)
Ok, that title is bad, but the idea around properties of streams is that they have a bit of delay before gettings sent to avoid sending it multiple times for each change in a very short interval.
Once this interval elapses ( or someone calls .Flush()), it gets set as the latest set of properties and sent to the wire.
In addition to this, we have a certain period of properties re-broadcast. To avoid re-broadcasting it on its own, filling up the topic with nothing else, we have logic to only do this if there is other message type to send, like parameter data.
In this scenario, the properties gets sent, then the actual data if the re-broadcast logic is fulfilled.
Now, the problem is that when initially a stream is created, properties are set and data gets sent, the re-broadcast logic is triggered, but with the initial null stream properties instead of the configured one, as the properties flush is not yet triggered.
Use schema registry for stream metadata, so we don't have to periodically send it & it is immediately available
@PatrickMiraP and @peter-quix to pair on this.
Tell us about the bug
from_dataframe() breaks when dealing with columns with complex numbers:
`---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
in
in from_dataframe(data_frame, epoch)
111 isnumeric = (label_type == int or label_type == float or label_type == complex)
112 if isnumeric:
--> 113 if math.isnan(content):
114 continue # ignore it, as panda uses NaN instead of None, unable to detect difference
115 if panda_col_label.startswith('TAG__'): # in case user of lib didn't put it in quote don't throw err
TypeError: can't convert complex to float`
One could add (by mistake) the parent multiple times and it travels in the msg multiple times. This is completely unnecessary.
Is your feature request related to a problem? Please describe.
def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):
stream_state = stream_consumer.get_rolling_window_state("average_rpm_10s_state", duration_ms = 10000)
for row in data.timestamps:
stream_state.add_and_purge(row)
rpm_values = list(map(lambda x: x.parameters["EngineRPM"], stream_state.items))
row.add_value("average_rpm_10s", sum(rpm_values) / len(rpm_values))
Hi Hi! I'm getting an error when adding an empty tag.
The code actually does this .add_tags(row.tags)
so it's copying the tags from the received TimeseriesData
into the outgoing timeseries buffer
Listening to streams. Press CTRL-C to exit.
Time:1678882097309000000
Tags: {'room': 'foo', 'role': 'Customer', 'name': 'steve', 'phone': '', 'email': ''}
Params:
chat-message: ii
{'label': 'POSITIVE', 'score': 0.9442011713981628}
Exception: System.ArgumentNullException: Tag (phone) value can't be null or empty (Parameter 'tagValue')
at QuixStreams.Streaming.Models.TimeseriesDataTimestamp.AddTag(String, String) + 0x141
at QuixStreams.Streaming.Models.StreamProducer.Interop.TimeseriesDataBuilderInterop.AddTag(IntPtr, IntPtr, IntPtr) + 0x11b
Discussed here: https://stream-processing.slack.com/archives/C04SDM5UG9F/p1678883613307439
Maybe in the same go rename Quix.Process to Quix.Telemetry
Essentially your layer 2 introduces telemetry, it is a whole lot more descriptive than "Process"
Is your feature request related to a problem? Please describe.
Would like to have the option to have RocksDB as the alternative to currently existing LocalFileStateStorage.
Describe the solution you'd like
Implement the IStateStorage interface and expose C#/Python bindings to use it where IStateStorage can be used.
Tell us about the bug
from_dataframe() conversion seems to break if there are nulls in the time column of the dataframe.
What did you expect to see?
Here, as with any null dealing situation, there is a decision to make:
Drop the rows with nulls in the time column:
dataframe = dataframe.dropna(subset=time_label)
Extrapolate the nulls in the time columns using surrounding values:
dataframe[time_label] = dataframe[time_label].interpolate(method='linear')
This is verifying that the QuixStreamingClient is properly picking up the workspace based on the credential provided (either from normal token + WS_ID env or SDK Token)
Is your feature request related to a problem? Please describe.
Python (and at odd places c#) code - not the .md files - documentation is incorrect/outdated/grammatically wrong. Some of this resulted from 0.4.x->0.5.0 migration.
Describe the solution you'd like
Code documentation is updated to reflect the latest version of the actual code it is documenting.
Is your feature request related to a problem? Please describe.
When doing streaming processing, I find myself needing to keep windows of data in memory very VERY often. Like, if I want to calculate average speed over the last 5 min, I have to keep the last 5 mins of speed data in memory (speed + timestamp) and update that window on every new message (both appending the new data to it and trimming out the now older than 5 mins data). I do this so often that I've ended up creating a python class that I keep using on my projects. This is fine, but I'm sure you guys can do it better :)
Describe the solution you'd like
I'd love that the sdk had some window functionality. Maybe as a TimeseriesData method. As a first feature, I propose that the window is created with:
Describe alternatives you've considered
I was trying to think if it was worth doing this using kafka pointers and some of its charaacteristics (segments, etc.). but probably is not. Even if there was a way to create a rolling window with pointers you would be querying a big chunk of data every time. Doing it from the client point of view seems a better idea.
Additional context
I can share my current python class for reference:
`
class Window:
def init(self, app_abv: str, window_nanoseconds: int):
self.df_window = pd.DataFrame()
self.window_size = window_nanoseconds
def calculate(self, df: pd.DataFrame):
# Add new data to appliance window
self._update_mini_df_window(df_i)
# Here mathematical operations are done with the updated window
# Fill df with new values
df = pd.merge(df, self.df_window[["time", "new_col"]], how='left', on=["time"])
return df
def _update_mini_df_window(self, df:pd.DataFrame):
# If df_window is empty, start it with df
if self.df_window.empty:
self.df_window = df
# If df_window has more recent data than what we are getting in real time, there has been some messed up, maybe with replays :S.
# Erase any data older than the newly received df
elif self.df_window["time"].iloc[-1] > df["time"].iloc[-1]:
self.df_window = self.df_window[self.df_window["time"]<=df["time"].iloc[-1]]
self.df_window = pd.concat([self.df_window, df]).drop_duplicates()
self.df_window = self.df_window.sort_values("time", ascending=True).reset_index(drop=True)
# Interpolate to fill nulls in key columns
self.df_window[cat_col] = self.df_window[cat_col].fillna(method="ffill", axis=0)
self.df_window[num_col] = self.df_window.set_index('time')[num_col].interpolate('index').values
else:
# Else, concat data and ensure chronological order
self.df_window = pd.concat([self.df_window, df]).drop_duplicates()
self.df_window = self.df_window.sort_values("time", ascending=True).reset_index(drop=True)
# Interpolate to fill nulls in key columns
self.df_window[cat_col] = self.df_window[cat_col].fillna(method="ffill", axis=0)
self.df_window[num_col] = self.df_window.set_index('time')[num_col].interpolate('index').values
# TRIM WINDOW
"""
# If df window is getting big in size, we just trim it
bytes_in_a_GB = 1073741824
if self.df_window.memory_usage(deep=True).sum()/bytes_in_a_GB > 0.25: # if bigger than 0.25 gb
self.df_window = self.df_window.iloc[-1000:] # Restart window to last 1000 rows
if self.df_window.memory_usage(deep=True).sum()/bytes_in_a_GB > 0.25: # if this is still big, restart
self.df_window = self.df_window.iloc[[-1]]
"""
# Keep only needed rows
self.df_window = self.df_window[self.df_window["time"] > (self.df_window['time'].iloc[-1] - self.window_size)]
self.df_window["TEMP_timeDelta"] = self.df_window["time"] - self.df_window["time"].shift(1)
`
Is your feature request related to a problem? Please describe.
In case of a restart, no message get sent twice to the output topic.
Describe the solution you'd like
TBD
Tell us about the bug
What did you expect to see?
Correct type hint and expect EventDataBuilder to have add_tags similarly to TimeseriesDataBuilder
Confluent.Kafka.KafkaException: Broker: Specified group generation id is not valid
tulios/kafkajs#1009
Maybe we can do a workaround///handling for users
Is there anything we need to add regarding licensing @PatrickMiraP?
Is your feature request related to a problem? Please describe.
Make QuixStreams available via Conda.
Tell us about the bug
Python implementation in 0.4.x used to convert types via pythonnet previously in a more automated fashion.
This results in an exception of " Invalid type passed as parameter value." when passing values such as None or numpy.int64.
What did you expect to see?
Investigate what pythonnet did, and how it picked preference for type. Maybe that is something we can do.
Alternatively, None values could be silently ignored if tried to be added. Maybe better ideas in comments?
More details
Previous code:
https://github.com/quixio/quix-streams/blob/v0.4.10/src/PythonClient/lib/quixstreaming/models/parameterdatatimestamp.py#L103
new code:
https://github.com/quixio/quix-streams/blob/v0.5.1/src/PythonClient/src/quixstreams/models/timeseriesdatatimestamp.py#L130
Any workaround?
For the time being the only workaround is to make sure the given value falls into one of str, float, int, bytes, bytearray. So for example None would have to be converted to N/A for float.
We should interrogate the broker for the maximum allowed size and automatically configure the splitter so it works according to that rather than a configured value from the client.
Potential improvement could be done by adding staging support in IStateStorage. Stage with OnCommitting, Finalize with OnCommitted, to reduce chance of unsuccessful storage save to minimum offered by storage limitations.
When a consumed stream is closed, revocation doesn't include it in the list. this can cause saving state even when shouldn't
Possibly threading issues when modifying a state from multiple threads. Attempted to reproduce using unit tests in C# and python without luck. Left without locking implemented for now
Enable Avro support - https://github.com/apache/avro
Is your feature request related to a problem? Please describe.
Depending on my business logic I might want to guarantee my data is sent without closing down the topic for future publishing.
Describe the solution you'd like
Having the option to Flush for topic would be beneficial, similar to how there is flush for stream or parts of a stream. With this I wouldn't be forced to close/dispose the topic to guarantee a send.
Describe alternatives you've considered
Other ideas that occurred to me are less mainstream. Flush sounds like the ideal thing considering it is already how it is done elsewhere in the lib.
Is your feature request related to a problem? Please describe.
Future builds now support apple silicon. As we're not yet using GitHub for our builds, this does not reflect as code change.
Describe the solution you'd like
Update all aspects of the apple silicon support documentation, such as readme (landing page for pypi wheel) and m1 install guide readme.
This support will be available in 0.5.4, until then rosetta must be used (unless we backfill the builds)
Is your feature request related to a problem? Please describe.
When creating a same callback to react to different topics, I'd like some times to have the topic name information within the callback function (for example, to create a TAG in the output informing about the input topic that is behind that specific message).
Describe the solution you'd like
I'd like to have access to the topic name property through the InputTopic, OutputTopic, StreamReader and StreamWriter objects. So, same as I can do input_stream.stream_id
I'd also like that input_topic.topic_id
and input_stream.topic_id
were possible.
Is your feature request related to a problem? Please describe.
While the title sounds counter intuitive, the problem is real. When there is no consumer group in use, librdkafka still requires a consumer group to connect to Kafka with, but it will not commit anything. Because of this quirk of the underlying library we need to be able to specify a prefix to use when a random consumer group is generated for this purpose.
Describe the solution you'd like
Add consumer_prefix=""
in python and relevant in C# when opening a topic for consumption.
Describe alternatives you've considered
Not an alternative solution, but a bad workaround: specify consumer group with your prefix and a random id. Won't behave exactly the same, as by default commit is enabled, but future runs won't use it if you always use a new one.
The .net 8 preview is causing some issues due to level of support when onboarding developers that only work with C# codebase. Given we only use .net 8 features (Native AOT) for python, this should not be an issue.
Suggestion is to use LTS for C#, and only use .net 8 for the N-AOT builds for now.
Is your feature request related to a problem? Please describe.
Stream properties are defacto state but not treated as state in the library and therefore using them is pretty hard.
Describe the solution you'd like
Internally they are stored in state therefore after restart they will remain as before restart.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.