Giter Site home page Giter Site logo

quix-streams's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

quix-streams's Issues

Add interface for test purposes to Quix/KafkaStreamingClient

Is your feature request related to a problem? Please describe.
As a software developer I wish to have tests exercising the code where I use either of these clients.

Describe the solution you'd like
Add an interface to both. Check if a common interface is possible, but it is not mandatory.

Describe alternatives you've considered
Use of

is possible for some testing, but not always the best.

Additional context
Might need to treat this interface with mostly testing goal, rather than an unbreakable contract. Not introducing breaking change however is still vastly preferred.

Restore StreamPackage functionality from 0.4.x in python

Is your feature request related to a problem? Please describe.
As part of the migration from 0.4.x, the StreamPackage class was not yet fully implemented. This issue is to track that functionality restoration.

Describe the solution you'd like
Work as in 0.4.x

Additional context
At the time being it is not included in the automatic imports, so previous code using it would have to reference it Like this: https://quix.discourse.group/t/streampackage-class-not-recognized/57

Daylight saving time or other than UTC breaks unit tests

Tell us about the bug
When timezone is not specified, current timezone is assumed (correctly), but unit tests are hard coded to specific nanosecond values expected. This results in failing tests when the timezine isn't exactly UTC, such as daylight saving time or locations other than those using UTC.

What did you expect to see?
Not failing unit tests

Screenshots
image

Add publish for individual timestamp.

Is your feature request related to a problem? Please describe.
Following usecase is otherwise very difficult:
`def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):

for row in data.timestamps:

    if row.parameters["Speed"].numeric_value > 200:
        topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(row)`

Describe the solution you'd like
Add overload for this class.

Consistency for Timestamp name

Timestamp name standard seems to be "time" (for instance when reading parameter data as a dataframe) but then, when I query that same data from influxdb it's called "Timestamp".

It's ok to be flexible to understand several names, but can we have the same one for sdk produced timestamps and influxdb?

Add python Apple silicon build

Is your feature request related to a problem? Please describe.
Need to add apple silicon build to releases. Whether it is pre-build or build-from source python package, it is up to debate, but each release should include an Apple silicon compatible package.
Documentation for installing on apple silicon will need to be updated according to this comment: #93 (comment)

Describe the solution you'd like
A pre-baked wheel would be preferable to have the simple pip-install experience. Availability of build agents might make this more difficult than a source build.

Describe alternatives you've considered
Source build, but we have no example of that yet, so would be a first. Also not the most optimal experience for quick install.

Additional context
The pre-req .net 8 compatibility was introduced in this PR: #93

CheckIfClosed improvement

This could have very serious performance implications on writing because it is executed in every single packet sent to the topic.

Probably we should avoid lock every time, and only do the lock when we are in an actual Closing state.

image

Segfault with ubuntu 20.04

Tell us about the bug
On ubuntu 20.04 with the latest python3 (3.8.10) installed, when an exception is set via the python C API in the c# codebase, segfault occurs.

Example code to produce segfault:

from quixstreams import QuixStreamingClient
client = QuixStreamingClient("some-invalid-token")  # this will throw an exception using python C API in c#, but segfaults

What did you expect to see?
No Segfault, instead a python exception thrown about invalid token.

Screenshots
If applicable, add screenshots to help explain your problem.

What tools and platforms are you using (e.g. iOS, Chrome, Desktop)
Ubuntu 20.04 (such as windows WSL, google collab)

Anything else we should know?
Works fine in other distributions or when python is compiled from source.

Integrate RAW events with streaming context

Is your feature request related to a problem? Please describe.
The API for RAW events in the custom schema is very hidden in the current implementation. Also is not used by Quix platform so data is not surfaced in SaaS.

Describe the solution you'd like
RAW events surfaced with existing events API.

Describe alternatives you've considered
Flattening of messages into existing data frame solution. Will come later.

C# and Python API docs to be generated based on code documentation

Is your feature request related to a problem? Please describe.
I would like us to have generated documentation similar to other libraries that is automatically updated based on code docs rather than always relying on manual documentation being up-to-date.

Describe the solution you'd like
Whatever is acceptable for the given language, such as the Microsoft style for c#.

Redis as alternative to other state stores

Is your feature request related to a problem? Please describe.
Would like to have the option to have Redis as the alternative to currently existing LocalFileStateStorage.

Describe the solution you'd like
Implement the IStateStorage interface and expose C#/Python bindings to use it where IStateStorage can be used.

Tune down unnecessary producer exception

Is your feature request related to a problem? Please describe.
If a producer is idle longer than ~8_000_000ms - possibly only in azure environment -, it logs a "Kafka Producer Exception". which can incorrectly lead to an assumption that the connection is broken for good and it won't be publishing the next time it needs to.
However, this is not true as automatic reconnection happens underneath.

Describe the solution you'd like
Tune down the exception, so it is only shown when relevant, such as failure on next publish.

Describe alternatives you've considered
Keep alive, but excessive and isn't a guarantee either. Only been able to reproduce using either producer/kafka in Azure cloud.

Additional context
image

Feature Improvement - from_dataframe() -> 'TimeseriesDataRaw':

I understand that in streaming is quite uncommon to have big chunks (batches) of data been publish, so a long dataframe with lots or rows is unlikely.
Anyhow, right now the dataframe to TimeseriesDataRaw conversion is done by row and column, which is inefficient. I'll be proposing a new vectorized version to improve speed (specially with big dataframes).

Add scalar state type

Is your feature request related to a problem? Please describe.
Add support for this in state helpers:

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):

    stream_state = stream_consumer.get_scalar_state("total_rpm", int, lambda x: 0) # default value for state name.

    for row in data.timestamps:

        rpm = row.parameters["EngineRPM"].numeric_value

        stream_state += rpm

        row.add_value("total_rpm", stream_state)

Updated stream properties are not sent before other data until it is flushed

Ok, that title is bad, but the idea around properties of streams is that they have a bit of delay before gettings sent to avoid sending it multiple times for each change in a very short interval.

Once this interval elapses ( or someone calls .Flush()), it gets set as the latest set of properties and sent to the wire.

In addition to this, we have a certain period of properties re-broadcast. To avoid re-broadcasting it on its own, filling up the topic with nothing else, we have logic to only do this if there is other message type to send, like parameter data.
In this scenario, the properties gets sent, then the actual data if the re-broadcast logic is fulfilled.

Now, the problem is that when initially a stream is created, properties are set and data gets sent, the re-broadcast logic is triggered, but with the initial null stream properties instead of the configured one, as the properties flush is not yet triggered.

Schema Registry support

Use schema registry for stream metadata, so we don't have to periodically send it & it is immediately available

from_dataframe() cannot handle complex numbers

Tell us about the bug
from_dataframe() breaks when dealing with columns with complex numbers:

`---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
in

in from_dataframe(data_frame, epoch)
111 isnumeric = (label_type == int or label_type == float or label_type == complex)
112 if isnumeric:
--> 113 if math.isnan(content):
114 continue # ignore it, as panda uses NaN instead of None, unable to detect difference
115 if panda_col_label.startswith('TAG__'): # in case user of lib didn't put it in quote don't throw err

TypeError: can't convert complex to float`

KeyboardInterrupt exception in python when using app.run

Tell us about the bug
When the python application making use of app.run() gets interrupted using CTRL-C (or OS equivalent), it raises an exception, even though it is supposed to be handled in codebase.

What did you expect to see?
No exception

Screenshots
image

Add rolling window state helper

Is your feature request related to a problem? Please describe.

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):

    stream_state = stream_consumer.get_rolling_window_state("average_rpm_10s_state", duration_ms = 10000)

    for row in data.timestamps:

        stream_state.add_and_purge(row)

        rpm_values = list(map(lambda x: x.parameters["EngineRPM"], stream_state.items))

        row.add_value("average_rpm_10s", sum(rpm_values) / len(rpm_values))

Error adding empty tag

Hi Hi! I'm getting an error when adding an empty tag.

The code actually does this .add_tags(row.tags) so it's copying the tags from the received TimeseriesData into the outgoing timeseries buffer

Listening to streams. Press CTRL-C to exit.
Time:1678882097309000000
  Tags: {'room': 'foo', 'role': 'Customer', 'name': 'steve', 'phone': '', 'email': ''}
  Params:
    chat-message: ii
{'label': 'POSITIVE', 'score': 0.9442011713981628}
Exception: System.ArgumentNullException: Tag (phone) value can't be null or empty (Parameter 'tagValue')
   at QuixStreams.Streaming.Models.TimeseriesDataTimestamp.AddTag(String, String) + 0x141
   at QuixStreams.Streaming.Models.StreamProducer.Interop.TimeseriesDataBuilderInterop.AddTag(IntPtr, IntPtr, IntPtr) + 0x11b

Discussed here: https://stream-processing.slack.com/archives/C04SDM5UG9F/p1678883613307439

RocksDB as alternative to other state stores

Is your feature request related to a problem? Please describe.
Would like to have the option to have RocksDB as the alternative to currently existing LocalFileStateStorage.

Describe the solution you'd like
Implement the IStateStorage interface and expose C#/Python bindings to use it where IStateStorage can be used.

from_dataframe() brakes with nulls in time column

Tell us about the bug
from_dataframe() conversion seems to break if there are nulls in the time column of the dataframe.

What did you expect to see?
Here, as with any null dealing situation, there is a decision to make:

  • Drop the rows with nulls in the time column:
    dataframe = dataframe.dropna(subset=time_label)

  • Extrapolate the nulls in the time columns using surrounding values:
    dataframe[time_label] = dataframe[time_label].interpolate(method='linear')

API documentation out of date at places

Is your feature request related to a problem? Please describe.
Python (and at odd places c#) code - not the .md files - documentation is incorrect/outdated/grammatically wrong. Some of this resulted from 0.4.x->0.5.0 migration.

Describe the solution you'd like
Code documentation is updated to reflect the latest version of the actual code it is documenting.

Window Operations

Is your feature request related to a problem? Please describe.
When doing streaming processing, I find myself needing to keep windows of data in memory very VERY often. Like, if I want to calculate average speed over the last 5 min, I have to keep the last 5 mins of speed data in memory (speed + timestamp) and update that window on every new message (both appending the new data to it and trimming out the now older than 5 mins data). I do this so often that I've ended up creating a python class that I keep using on my projects. This is fine, but I'm sure you guys can do it better :)

Describe the solution you'd like
I'd love that the sdk had some window functionality. Maybe as a TimeseriesData method. As a first feature, I propose that the window is created with:

  • Parameter: the parameter you want to create a window on (speed in my initial example)
  • Window size: either in number of messages or time period
    On next iterations, if these windows allowed several parameter, they could be used to perform enriching operations (table joins in batch). However iteration of nulls, and other issues would have to be solved.

Describe alternatives you've considered
I was trying to think if it was worth doing this using kafka pointers and some of its charaacteristics (segments, etc.). but probably is not. Even if there was a way to create a rolling window with pointers you would be querying a big chunk of data every time. Doing it from the client point of view seems a better idea.

Additional context
I can share my current python class for reference:

`
class Window:
def init(self, app_abv: str, window_nanoseconds: int):
self.df_window = pd.DataFrame()
self.window_size = window_nanoseconds

def calculate(self, df: pd.DataFrame):

    # Add new data to appliance window
    self._update_mini_df_window(df_i)
    
    # Here mathematical operations are done with the updated window
    
    
    # Fill df with new values
    df = pd.merge(df, self.df_window[["time", "new_col"]], how='left', on=["time"])
    
    return df              


def _update_mini_df_window(self, df:pd.DataFrame):

    # If df_window is empty, start it with df
    if self.df_window.empty:
        self.df_window = df

    # If df_window has more recent data than what we are getting in real time, there has been some messed up, maybe with replays :S. 
    # Erase any data older than the newly received df
    elif self.df_window["time"].iloc[-1] > df["time"].iloc[-1]:
        self.df_window = self.df_window[self.df_window["time"]<=df["time"].iloc[-1]]
        self.df_window = pd.concat([self.df_window, df]).drop_duplicates()
        self.df_window = self.df_window.sort_values("time", ascending=True).reset_index(drop=True)

        # Interpolate to fill nulls in key columns
        self.df_window[cat_col] = self.df_window[cat_col].fillna(method="ffill", axis=0)
        self.df_window[num_col] = self.df_window.set_index('time')[num_col].interpolate('index').values
    
    else:
        # Else, concat data and ensure chronological order
        self.df_window = pd.concat([self.df_window, df]).drop_duplicates()
        self.df_window = self.df_window.sort_values("time", ascending=True).reset_index(drop=True)
        
        # Interpolate to fill nulls in key columns
        self.df_window[cat_col] = self.df_window[cat_col].fillna(method="ffill", axis=0)
        self.df_window[num_col] = self.df_window.set_index('time')[num_col].interpolate('index').values
    

    # TRIM WINDOW
    """
    # If df window is getting big in size, we just trim it
    bytes_in_a_GB = 1073741824
    if self.df_window.memory_usage(deep=True).sum()/bytes_in_a_GB > 0.25:    # if bigger than 0.25 gb
        self.df_window = self.df_window.iloc[-1000:] # Restart window to last 1000 rows
        if self.df_window.memory_usage(deep=True).sum()/bytes_in_a_GB > 0.25:    # if this is still big, restart
            self.df_window = self.df_window.iloc[[-1]]
    """
    
    # Keep only needed rows
    self.df_window = self.df_window[self.df_window["time"] > (self.df_window['time'].iloc[-1] - self.window_size)]
    self.df_window["TEMP_timeDelta"] = self.df_window["time"] - self.df_window["time"].shift(1)

`

Exactly once processing semantic

Is your feature request related to a problem? Please describe.
In case of a restart, no message get sent twice to the output topic.

Describe the solution you'd like
TBD

EventDataBuilder add_tags missing and some builder typehints are wrong

Tell us about the bug

  • TimeseriesDataBuilder has EventDataBuilder typehint for add_tags
  • EventDataBuilder add_tags is not implemented at all, while TimeseriesDataBuilder add_tags is

What did you expect to see?
Correct type hint and expect EventDataBuilder to have add_tags similarly to TimeseriesDataBuilder

Automatic conversion of types improved when adding value to timeseries

Tell us about the bug
Python implementation in 0.4.x used to convert types via pythonnet previously in a more automated fashion.
This results in an exception of " Invalid type passed as parameter value." when passing values such as None or numpy.int64.

What did you expect to see?
Investigate what pythonnet did, and how it picked preference for type. Maybe that is something we can do.
Alternatively, None values could be silently ignored if tried to be added. Maybe better ideas in comments?

More details
Previous code:
https://github.com/quixio/quix-streams/blob/v0.4.10/src/PythonClient/lib/quixstreaming/models/parameterdatatimestamp.py#L103
new code:
https://github.com/quixio/quix-streams/blob/v0.5.1/src/PythonClient/src/quixstreams/models/timeseriesdatatimestamp.py#L130

Any workaround?
For the time being the only workaround is to make sure the given value falls into one of str, float, int, bytes, bytearray. So for example None would have to be converted to N/A for float.

Move splitter to broker

We should interrogate the broker for the maximum allowed size and automatically configure the splitter so it works according to that rather than a configured value from the client.

State handling improvements

Potential improvement could be done by adding staging support in IStateStorage. Stage with OnCommitting, Finalize with OnCommitted, to reduce chance of unsuccessful storage save to minimum offered by storage limitations.

When a consumed stream is closed, revocation doesn't include it in the list. this can cause saving state even when shouldn't

Possibly threading issues when modifying a state from multiple threads. Attempted to reproduce using unit tests in C# and python without luck. Left without locking implemented for now

Introduce Flush for topic producer

Is your feature request related to a problem? Please describe.
Depending on my business logic I might want to guarantee my data is sent without closing down the topic for future publishing.

Describe the solution you'd like
Having the option to Flush for topic would be beneficial, similar to how there is flush for stream or parts of a stream. With this I wouldn't be forced to close/dispose the topic to guarantee a send.

Describe alternatives you've considered
Other ideas that occurred to me are less mainstream. Flush sounds like the ideal thing considering it is already how it is done elsewhere in the lib.

Update Apple silicon install guide to reflect support for it

Is your feature request related to a problem? Please describe.
Future builds now support apple silicon. As we're not yet using GitHub for our builds, this does not reflect as code change.

Describe the solution you'd like
Update all aspects of the apple silicon support documentation, such as readme (landing page for pypi wheel) and m1 install guide readme.

This support will be available in 0.5.4, until then rosetta must be used (unless we backfill the builds)

Topic name property available in topic and stream objects

Is your feature request related to a problem? Please describe.
When creating a same callback to react to different topics, I'd like some times to have the topic name information within the callback function (for example, to create a TAG in the output informing about the input topic that is behind that specific message).

Describe the solution you'd like
I'd like to have access to the topic name property through the InputTopic, OutputTopic, StreamReader and StreamWriter objects. So, same as I can do input_stream.stream_id I'd also like that input_topic.topic_id and input_stream.topic_id were possible.

Allow using consumer group prefix when not using consumer group

Is your feature request related to a problem? Please describe.
While the title sounds counter intuitive, the problem is real. When there is no consumer group in use, librdkafka still requires a consumer group to connect to Kafka with, but it will not commit anything. Because of this quirk of the underlying library we need to be able to specify a prefix to use when a random consumer group is generated for this purpose.

Describe the solution you'd like
Add consumer_prefix="" in python and relevant in C# when opening a topic for consumption.

Describe alternatives you've considered
Not an alternative solution, but a bad workaround: specify consumer group with your prefix and a random id. Won't behave exactly the same, as by default commit is enabled, but future runs won't use it if you always use a new one.

Use .net LTS for C# and only use .net 8 for python builds

The .net 8 preview is causing some issues due to level of support when onboarding developers that only work with C# codebase. Given we only use .net 8 features (Native AOT) for python, this should not be an issue.

Suggestion is to use LTS for C#, and only use .net 8 for the N-AOT builds for now.

Stream properties and metadata is internally saved using state store

Is your feature request related to a problem? Please describe.
Stream properties are defacto state but not treated as state in the library and therefore using them is pretty hard.

Describe the solution you'd like
Internally they are stored in state therefore after restart they will remain as before restart.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.