polarstreams / polar Goto Github PK
View Code? Open in Web Editor NEWLightweight & elastic kubernetes-native event streaming system
License: GNU Affero General Public License v3.0
Lightweight & elastic kubernetes-native event streaming system
License: GNU Affero General Public License v3.0
Currently, the response does not include the offset of the payload: https://github.com/barcostreams/barco/blob/main/docs/developer/NETWORK_FORMATS.md#consumer-poll-response
The poll response should contain the offset for bookeeping after the topic name
.
We are reading ahead from the segment file reusing the same buffer which is OK but when returning a single chunk (readSingleChunk()
) we are allocating new buffers instead of using a pool.
Similar to register, stateless consumers should be able to manually commit on all brokers with a single call.
There's currently a minimum of 2 consumer ranges per token, which can be seen as arbitrary from the user perspective, because it simplified calculations during token splits/joins.
We should support setting the value to 1 consumer range per token.
I am testing Barco in OpenShift version 4.11 (Kubernetes version v1.24.0+4f0dd4d) but the Barco pods are not starting successfully. Following the installation (where I found some warnings reported here #17), the pods are in the following state:
$ oc get pod
NAME READY STATUS RESTARTS AGE
barco-0 0/1 CrashLoopBackOff 7 (21s ago) 11m
barco-1 0/1 Error 7 (5m21s ago) 11m
Checking the pod logs :
$ oc logs -f barco-0
{"level":"info","time":"2022-09-19T06:31:55Z","message":"Starting Barco"}
{"level":"info","time":"2022-09-19T06:31:55Z","message":"Using home dir as /var/lib/barco"}
{"level":"info","time":"2022-09-19T06:31:55Z","message":"Initializing local db from /var/lib/barco/data/local.db"}
{"level":"fatal","error":"unable to open database file: no such file or directory","time":"2022-09-19T06:31:55Z","message":"Exiting"}
Steps to reproduce
We should pass deadlines on all the replication communications to unbounded request times.
We should set the prometheus.io/scrape
and prometheus.io/port
in the k8s resource files to automatically scrape Brokers.
Discovery API's GET /v1/brokers
return the service name without being fully qualified with the namespace.
It should return "my-svc.my-namespace"
to work on all environments.
It would be nice to support existing Kafka client API to leverage the existing Kafka ecosystem.
The goal is to support protocol translation for interoperability, not as a storage format.
Protocol documentation: https://kafka.apache.org/protocol.html
Tasks:
We should proactively close consumer connections that are idle after timeout elapsed.
The documentation describes how to use HTTP/2 commands (by curl
cli) to produce messages, however it is only described to use a client to consume them.
If the documentation supports HTTP/2 to produce and consume:
Barco supports producing and consuming events using [HTTP/2](https://en.wikipedia.org/wiki/HTTP/2) APIs. HTTP/2 provides solid low level features that are required in a client-server protocol like framing, request pipelining, ping, compression, etc. without the performance pitfalls of HTTP 1.
and includes an example to produce, it could be great to add an example to consume the same message (following the steps to subscribe, poll, and commit. Otherwise, it seems that it is always needed to have a client to consume messages.
We recently introduced consumer_id
querystring parameter which uses snake case but the API uses camelCase...
We should change it to consumerId
on v0.5.0
, while still supporting the snake case version until v0.7.0
.
Support receiving a commit message with the last consumed offset and commit it.
When the container restarts, the ALTER TABLE ...
command generates the error: duplicate column name: cluster_size
We should run tests on GitHub actions for arm64.
We could use something like https://github.com/uraimo/run-on-arch-action that uses QEMU and run go test ./...
or any approach.
We should add support for uploading and downloading files from S3 and other object storage providers.
Ideally, the configuration should be done in a way that facilitates budgeting for short period of time on a broker volume and longer periods on object storage.
For example, if a user wants to have 7 day retention, they could set 1 day local volume and 6 days on S3, that way the local volume should only be large enough to hold 1 day of events.
We should create a basic HPA resource file for users to have as a reference, it should be conservative in scaling down with long stabilizationWindowSeconds
.
Scaling up should be set at 100% (2x) and scaling down should set at 50% (1/2 x).
There's a flaky test "should scale down"
. After looking into the root cause of the failure, it seems the max offset can't be retrieved from the peers.
This only happens in CI and occasionally but it probably hints a larger issue.
Java client could be a great feature to allow producing and consuming messages from this language. This client should be published in the Maven Central Repository to allow be integrated as a dependency in the most common Java frameworks such as Quarkus, SpringBoot, and Micronaut.
This client should be aligned in the same way as the current Go Client.
We should use a type for representing the group name that way we make sure that it doesn't get mixed with other string parameters.
For example:
type Group string
and then, use the Group
type for method signatures:
type OffsetState interface {
Initializer
fmt.Stringer
// Here: Use Group instead of string
Get(group Group, ...) offset *Offset
// Use Group instead of string
GetAllWithDefaults(group Group, t...) []Offset
// Use Group instead of string
Set(group Group, ...) bool
// ...
}
We should provide arm64 images.
There are a couple of tasks for providing both amd64 and arm64 images under a single tag:
./build/container/Dockerfile
) with the following env variables for arm64 CGO_ENABLED=1 CC=aarch64-linux-gnu-gcc
.We could create a shell script or something with the steps.
After scaling up/down, the offset a consumer group that hasn't been tracked yet can't be properly initialized.
To have a simpler API for stateless consumers, we should support relaying the register message to all the peers.
Relates to #65
We stream the request body directly into the compressed writer which is good for saving memory resources but, as coalescing multiple requests into a single chunk is single threaded, it can lead to one slow client adding latency to other unrelated clients.
We should use intermediate buffers to read the bodies in parallel and avoid introducing latency to other requests when a client is slow.
This can have an impact on performance so we should benchmark the brokers as part of this task.
We should provide static binaries of PolarStreams and document how to install it manually on a linux instance.
To support both amd64
and arm64
we could build them inside containers and then copy them using:
docker create --name dummy IMAGE_NAME
docker cp dummy:/path/to/file /dest/to/file
docker rm -f dummy
On amd64, we can just disable CGO_ENABLED
:
CGO_ENABLED=0 go build -ldflags '-s -linkmode external -extldflags=-static' .
But arm64 requires musl.
We should document how to setup a cluster on plain Linux instances.
This task depends on #79.
HTTP/2 breaks request and responses into multiple streamids.
Unlike other protocols like the Cassandra protocol, it can interleave partial requests/responses.
On the producing server side this is a problem as we can't independently move forward a request body without moving the whole stream forward.
It uses incorrect ranges to refer to a dev mode offset value.
When writing at high throughput rates, its possible that the file name of the replicated data does not match.
We should support both HTTP/1 and HTTP/2 on the consuming interface.
Relates to #65.
There's no ServeConn()
on net/http
yet, tho: golang/go#36673
PolarStreams performs CRC validation of chunks when reading. Currently, it will panic when finding a corrupted chunk. We should retrieve the data from a replica when this occurs.
We should add more metrics related to the broker producing interface:
Creating new generations due to ownership changes (scaling / failover) should be tested more thoroughly and implementation should be revisited to make sure it resists sudden process kills (power failure) and that the system will self heal.
Consumer assignment was made with 1 consumer range per token in mind.
We have to account for different consumer ranges when assigning consumers to token ranges.
Support creating and removing topics, allowing disabling automatic creation when producing.
This includes topic validation when producing and other metadata.
We should track the ability to have different consumer ranges by topic on a separate ticket.
Similar to Kafka’s auto.offset.reset
, we should support setting the strategy when there’s no offset for a given consumer group.
We could expose the following setting:
“On new consumer group: <start from earliest | start from latest>“
Currently we support producing in a stateless manner, for example you can send a message :
curl -X POST -d '{"hello":"world"}' \
-H "Content-Type: application/json" \
"http://barco.streams:9251/v1/topic/my-topic/messages"
It would be awesome if we could also support the same level of client statelessness for consumers, to enable stuff like curl
for consuming, for example:
curl -X PUT "http://barco.streams:9252/v1/consumer/register?consumer_id=1"
curl -X POST -H "Accept: application/vnd.barco.consumermessage+json" \
"http://barco.streams:9252/v1/consumer/poll?consumer_id=1"
This would represent a registration of a consumer with id "1", belonging to the "default" consumer group, to get data from all topics, followed by a request to poll the data. The poll response would contain multiple messages:
[
{
"topic": "my-topic",
"token": "-9223372036854775808",
"rangeIndex": "1",
"version": "1",
"startOffset": 123
"values": [
{"hello": 1},
{"hello": 2},
{"hello": 3}
]
}
]
Node.JS client could be a great feature to allow producing and consuming messages from this language. This client should be published in the npm repositories to be used by the Node.JS community of developers.
This client should be aligned in the same way as the current Go Client.
I am testing Barco in OpenShift version 4.11 (Kubernetes version v1.24.0+4f0dd4d) but I am getting some warnings of PodSecurity
violations.
Following the instructions I found the following warning when the customization is applied:
❯ kubectl apply -k .
namespace/streams created
serviceaccount/barco created
role.rbac.authorization.k8s.io/barco created
clusterrole.rbac.authorization.k8s.io/barco created
rolebinding.rbac.authorization.k8s.io/barco created
clusterrolebinding.rbac.authorization.k8s.io/barco created
service/barco created
Warning: would violate PodSecurity "restricted:v1.24": allowPrivilegeEscalation != false (container "barco" must set securityContext.allowPrivilegeEscalation=false), unrestricted capabilities (container "barco" must set securityContext.capabilities.drop=["ALL"]), runAsNonRoot != true (pod or container "barco" must set securityContext.runAsNonRoot=true), seccompProfile (pod or container "barco" must set securityContext.seccompProfile.type to "RuntimeDefault" or "Localhost")
statefulset.apps/barco created
Steps to reproduce
The current k8s definition of the barco
service has not included nodePorts
, so it is impossible to execute the sample commands to test and verify the production of messages.
Steps to reproduce
❯ TOPIC="my-topic"
curl -X POST -i -d '{"hello":"world"}' \
-H "Content-Type: application/json" --http2-prior-knowledge \
"http://barco.streams:9251/v1/topic/${TOPIC}/messages"
curl: (6) Could not resolve host: barco.streams
❯ minikube service barco --url -n streams
😿 service streams/barco has no node port
Suggestions
I would like to suggest improving the Getting Started by adding some references to have a local Kubernetes cluster (or using the local development environment) before to try to execute the commands to produce or consume. Otherwise, it is a bit complicated to follow up on the instructions.
Meanwhile the barco
service is not available to expose it, there is an alternative way to execute the commands using the exec
option of kubectl
CLI:
❯ k exec barco-0 -- curl -X POST -i -d '{"hello":"world"}' -H "Content-Type: application/json" --http2-prior-knowledge "http://barco:9251/v1/topic/my-topic/messages"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 19 100 2 100 17 1 16 0:00:02 0:00:01 0:00:01 18
HTTP/2 200
content-type: text/html; charset=utf-8
content-length: 2
date: Mon, 19 Sep 2022 07:12:15 GMT
OK
After restarting with BARCO_DEV_MODE=true
while leaving data dir intact, the broker can error when trying to propose ownership of the token range.
We currently send the compressed chunks to the consumers as a way to have less traffic between brokers and clients.
We should also support having the broker uncompress and adapt the response to be used directly by simple clients like curl
.
Stateless consumers should be able to avoid manual committing on "goodbye".
We should include a page in the documentation detailing how fanout works in a Barco Streams cluster.
We should consider using publishing metrics via custom.metrics.k8s.io api that includes disk usage and other metrics that can be useful to be consumed by a HPA.
We support HTTP/2 which features framing and requests pipelining. In order to support packing more messages into the same request we should support ndjson, which is a streaming-friendly format.
GroupReadQueue
streams the consumer response and handles manual commits.
We should also commit on the "goodbye" consumer message but without affecting the response.
We should refactor GroupReadQueue to only write the response on some occasions.
We should add integrity checks for interbroker message header.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.