Giter Site home page Giter Site logo

eventing-natss's People

Contributors

adrcunha avatar aliok avatar astelmashenko avatar chaodaig avatar chizhg avatar creydr avatar devguyio avatar dprotaso avatar dubee avatar evankanderson avatar grantr avatar harwayne avatar knative-automation avatar knative-prow-robot avatar lberk avatar lionelvillard avatar markusthoemmes avatar mattmoor avatar mattmoor-sockpuppet avatar matzew avatar n3wscott avatar nachocano avatar nicolaferraro avatar pierdipi avatar slinkydeveloper avatar srvaroa avatar syedriko avatar tzununbekov avatar vaikas avatar zhaojizhuang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventing-natss's Issues

Update nats streaming image version to 0.22.0

Problem
Update nats streaming image before we deprecate it. Update from 0.11.0 to 0.22.0

And the nats streaming image version list is here

Persona:

Exit Criteria

Time Estimate (optional):
How many developer-days do you think this may take to resolve?

0.5

Additional context (optional)
Add any other context about the feature request here.

Deleting the last `NatsJetStreamChannel` in a namespace when namespace-scoped can result in orphaned resources

Describe the bug
If you have a NatsJetStreamChannel in a namespace with the eventing.knative.dev/scope: namespace annotation set, a dispatcher is deployed to that namespace. If the channel is then deleted, the controller deletes the dispatcher deployment, but sometimes the dispatcher doesn't have time to delete the underlying stream and/or consumers.

We can't move the stream/consumer management to the controller, because the credentials to the NATS cluster are based on the dispatcher (and we have different credentials in different namespaces).

What we really need to do is have multiple finalizers on the channel, so the actual channel isn't deleted until both reconcilers have time to finalize the resource. Once all finalizers are removed, it will then be deleted and the controller can remove the dispatcher deployment.

Expected behavior
Any underlying resources are cleaned up when a channel is deleted.

To Reproduce
See above.

Knative release version
Latest

Additional context
I already have the start of this fix in a big WIP which includes #542 and #543

fix: The broker trigger based on NatsJetStreamChannel leads to "No TTL seen, dropping".

Describe the bug
Event is displayed in the dead-letter with the reason ''No TTL seen, dropping"

Expected behavior
Event that passed through a knative trigger based on NatsJetStreamChannel should be displayed in the event-display pod

To Reproduce

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-natjsm-channel
  labels:
    eventing.knative.dev/release: devel
data:
  channelTemplateSpec: |
    apiVersion: messaging.knative.dev/v1alpha1
    kind: NatsJetStreamChannel
    spec:
      stream:
        overrideName: mindwm_user_mindwm-dev1
        config:
          additionalSubjects: 
            - "mindwm.user.mindwm-dev1.>"
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: mindwmbroker
  namespace: user
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: config-natjsm-channel
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dead-letter-broker
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: all-events
spec:
  broker: mindwmbroker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dead-letter
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: dead-letter-broker
  labels:
    contrib.eventing.knative.dev/release: devel
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: dead-letter
  labels:
    contrib.eventing.knative.dev/release: devel
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
  labels:
    contrib.eventing.knative.dev/release: devel
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
$ kubectl get NatsJetStreamChannel
NAME                       READY   REASON   URL                                                                         AGE
mindwmbroker-kne-trigger   True             http://mindwmbroker-kne-trigger-kn-jsm-channel.ksandbox.svc.cluster.local   16m
$ kubectl get broker
NAME           URL                                                                              AGE   READY   REASON
mindwmbroker   http://broker-ingress.knative-eventing.svc.cluster.local/ksandbox/mindwmbroker   16m   True    
$ kubectl get trigger
NAME         BROKER         SUBSCRIBER_URI                                    AGE   READY   REASON
all-events   mindwmbroker   http://event-display.ksandbox.svc.cluster.local   14m   True    
$ nats stream info mindwm_user_mindwm-dev1  
Information for Stream mindwm_user_mindwm-dev1 created 2024-03-08 20:20:37

              Subjects: ksandbox.mindwmbroker-kne-trigger._knative, mindwm.user.mindwm-dev1.>
              Replicas: 1
              Storage: File
 $ cat<<EOF | nats pub mindwm.user.mindwm-dev1.test-topic
> 
{
    "specversion" : "1.0",
    "type" : "com.github.pull.create",
    "source" : "/cloudevents/spec/pull",
    "subject" : "123",
    "id" : "A234-1234-1234",
    "time" : "2018-04-05T17:31:00Z",
    "comexampleextension1" : "value",
    "comexampleothervalue" : 5,
    "datacontenttype" : "text/xml",
    "data" : "<much wow=\"xml\"/>"
}
> EOF
20:39:11 Reading payload from STDIN
20:39:11 Published 342 bytes to "mindwm.user.mindwm-dev1.test-topic"
$ kubectl get pods
NAME                                         READY   STATUS    RESTARTS   AGE
dead-letter-00001-deployment-dc546cf-c88h8   2/2     Running   0          4s
$ kubectl logs dead-letter-00001-deployment-dc546cf-c88h8
Defaulted container "user-container" out of: user-container, queue-proxy
2024/03/08 20:41:02 Failed to read tracing config, using the no-op default: empty json tracing config
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: com.github.pull.create
  source: /cloudevents/spec/pull
  subject: 123
  id: A234-1234-1234
  time: 2018-04-05T17:31:00Z
  datacontenttype: text/xml
Extensions,
  comexampleextension1: value
  comexampleothervalue: 5
Data,
  <much wow="xml"/>
$ kubectl -n knative-eventing logs --tail=1 mt-broker-filter-c46948dfd-8mf7d
{
  "level": "warn",
  "ts": "2024-03-08T20:41:01.114Z",
  "logger": "mt_broker_filter",
  "caller": "filter/filter_handler.go:187",
  "msg": "No TTL seen, dropping",
  "commit": "cb5dfda",
  "triggerRef": "ksandbox/all-events",
  "event": "Context Attributes,\n  specversion: 1.0\n  type: com.github.pull.create\n  source: /cloudevents/spec/pull\n  subject: 123\n  id: A234-1234-1234\n  time: 2018-04-05T17:31:00Z\n  datacontenttype: text/xml\nExtensions,\n  comexampleextension1: value\n  comexampleothervalue: 5\nData,\n  <much wow=\"xml\"/>\n"
}

Knative release version
1.12.3, also, in version 1.13.0, the same thing occurred

Additional context
I tried the latest version of eventing-nats from master branch version and knative 1.13
I also tried eventing-nats 1.10.7 with Knative version 1.12.3.

It's possible related to knative/eventing#2443 ?

1.3.5, 1.4.0, 1.10.2 dispatcher not send response to broker

Describe the bug
after upgrade 1.3.0 to 1.3.5, knative sequence with broker not work correctly, the dispatch controller don't accept content-type "application/cloudevents+json", it's generate error dispatcher/consumer.go:73 failed to handle message failed to forward reply to http://broker-ingress.knative-eventing.svc.cluster.local/xxxxxx, specversion: no specversion

So the response message is not send to broker

If content-type is application/json the dispatcher don't have error, but msg is not send to broker

i used the app appender https://github.com/knative/eventing/blob/main/cmd/appender/main.go same as documentation

Expected behavior
response msg send to broker

To Reproduce
knative service with response cloudevent

Knative release version
1.2.2

Make installable without "ko"

Problem
I am not familiar with "ko" but it seems to be used to only resolve the image names. I cannot install without it as it produces errors with non-resolving imagenames.

Persona:
System Integrator

Exit Criteria
I can use kubectl apply -f *.. to install and it works without error messages. (May

Time Estimate (optional):
0

Additional context (optional)
Maybe i am wrong and everyone except me is using ko as a daily tool. But i think this should not be needed.

Duplication of events sent when NATS slow consumer error appears

Describe the bug
Currently I am using Jmeter to send varying loads direct to a Jetstream Server pod. The server accepts all messages quickly, and NATS Jetstream Dispatcher is subscribing to the stream and sending the messages to a simple Knative service that has a Trigger listening to all events and persisting in a postgreSQL database .

At low load, we are seeing fast persisting of messages in database.
Bug only appears when we tried to send 25 000 messages to Jetstream server, and duplication of events is seen
(prometheus logs shows 30+k messages sent by the Broker Filter pod)

Logs in Jetstream-ch-dispatcher shows:
ERROR MESSAGE: nats: slow consumer, messages dropped on connection [95700] for subscription on "_INBOX.vNtVZS4IIQ2XCN5w06S5uS"
image

Expected behavior
No duplication of messages to be observed, each message is only sent once from the Jetstream Dispatcher

To Reproduce
Running in a 4 Core, 16 GB RAM Ubuntu VM
Installed Knative serving with net-contour
Installed MT-channel broker
Used the yaml file included on knative site: https://github.com/knative-sandbox/eventing-natss/releases/download/knative-v1.4.0/eventing-natss.yaml

Increased the limits of the Jetstream server to use 1000m CPU and 500M Memory
Publishing to Jetstream server via NodePort

Knative release version
1.4

Additional context
Understood that a new version of Jetstream Dispatcher is in the works, but wondering if this issue will persist in future versions

Please update if more information is needed on my end, thanks.

Trigger reset NatsJetStreamChannel config

Describe the bug
With Broker and custom config channeltemplate, the trigger reset the configuration of the NatsJetStreamChannel

stream is empty and consumerConfigTemplate disappear

Expected behavior
custom config persistent

To Reproduce
create a broker with custom config

kind: ConfigMap
apiVersion: v1
metadata:
  name: config-natjsm-channel-cfg
  namespace: cfg-events
data:
  channelTemplateSpec: |-
    apiVersion: messaging.knative.dev/v1alpha1
    kind: NatsJetStreamChannel
    spec:
      consumerConfigTemplate:
        deliverPolicy: Last
      stream:
        config:
          maxAge: 30m0s
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: test
  namespace: cfg-events
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: config-natjsm-channel-cfg
    namespace: knative-eventing
  delivery:
    backoffDelay: PT30S
    backoffPolicy: exponential
    retry: 2

Create Trigger

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: display-trigger
  namespace: cfg-events
  labels:
    eventing.knative.dev/broker: test
spec:
  broker: test
  filter:
    attributes:
      type: samples.http.mod3
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: sequence-display
      namespace: cfg-events

The spec generated for natsjetreamchannel after broker create

spec:
  consumerConfigTemplate:
    ackWait: 0s
    deliverPolicy: Last
    replayPolicy: ''
  delivery:
    backoffDelay: PT30S
    backoffPolicy: exponential
    retry: 2
  stream:
    config:
      duplicateWindow: 0s
      maxAge: 30m0s

After trigger creation

  delivery:
    backoffDelay: PT30S
    backoffPolicy: exponential
    retry: 2
  stream: {}
  subscribers:
    - delivery:
        backoffDelay: PT30S
        backoffPolicy: exponential
        retry: 2
      generation: 1
      replyUri: 'http://broker-ingress.knative-eventing.svc.cluster.local/cfg-events/test'
      subscriberUri: >-
        http://broker-filter.knative-eventing.svc.cluster.local/triggers/cfg-events/display-trigger/3c4023d4-56bb-434b-a17d-d0856e1bcee5
      uid: 3adf1d09-02f5-4daa-a3b5-cb297afb16ca

Knative release version
1.9.0

Additional context
1.10.3 dispatcher jsm version

Patched by knative eventing controller https://github.com/knative/eventing/blob/d89300ce34fa0d757d4193837f7b0de88a04f2e2/pkg/reconciler/subscription/subscription.go#L439C22-L439C39

No webhook for natss channel => no validation, nor defaulting

Describe the bug
While debugging #62 noticed that the reason why the Channel doesn't work is because it does not have the correct duck shape defined in the webhook and then realized there's no webhook, so the validation doesn't even run.

Expected behavior
A validating / defaulting webhook runs for natss channel.

To Reproduce
Create an mt broker targeting the natss channel, it will subscribe to it using the old v1alpha1 channelable duck and since the reconciler is using v1beta1 shape, does not see the subscriptions.

Knative release version
head

Additional context
Add any other context about the problem here such as proposed priority

Documentation

Problem
There's currently no eventing-natss specific documentation (as far as I can tell)

Persona:
System Operator
Event consumer
System Integrator
Contributors

Exit Criteria
Unsure

Time Estimate (optional):
How many developer-days do you think this may take to resolve?
1-5 depending on how in depth the documentation is.

Additional context (optional)
Even just some example resources would be very beneficial.

JetStream consumer does not properly propagate tracing context

Describe the bug

This bit of code attaches the trace to c.ctx and not ctx, then ctx is used in SendMessage()

sc, ok := tracing.ParseSpanContext(event)
var span *trace.Span
if !ok {
c.logger.Warn("Cannot parse the spancontext, creating a new span")
c.ctx, span = trace.StartSpan(c.ctx, jsmChannel+"-"+string(c.sub.UID))
} else {
c.ctx, span = trace.StartSpanWithRemoteParent(c.ctx, jsmChannel+"-"+string(c.sub.UID), sc)
}
defer span.End()
te := TypeExtractorTransformer("")
dispatchExecutionInfo, err := c.dispatcher.SendMessage(
ctx,
message,
c.sub.Subscriber,
kncloudevents.WithReply(c.sub.Reply),
kncloudevents.WithDeadLetterSink(c.sub.DeadLetter),
kncloudevents.WithRetryConfig(c.sub.RetryConfig),
kncloudevents.WithTransformers(&te),
kncloudevents.WithHeader(additionalHeaders),
)

broker failed SubscriptionNotMarkedReadyByChannel

Describe the bug

[root@k8s-master ~]# kubectl get triggers.eventing.knative.dev
NAME                 BROKER       SUBSCRIBER_URI                                AGE     READY     REASON
testtrigger          testbroker   http://my-service.default.svc.cluster.local   20m     Unknown   SubscriptionNotMarkedReadyByChannel

To Reproduce

  1. Install Natss streaming
    follow steps in https://github.com/knative-sandbox/eventing-natss/blob/release-0.19/config/broker/README.md

  2. install event-natss

wget https://github.com/knative-sandbox/eventing-natss/releases/download/v0.19.0/eventing-natss.yaml
kubectl apply -f  eventing-natss.yaml
  1. create broker and trigger follows e2e step https://github.com/knative-sandbox/eventing-natss/tree/master/test/e2e/config/direct
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: testbroker
  annotations:
    eventing.knative.dev/broker.class: MTChannelBasedBroker
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: config-natss-channel
    namespace: natss

trigger

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: testtrigger
spec:
  broker: testbroker
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: recorder

config-natss-channel

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-natss-channel
  namespace: natss
  labels:
    eventing.knative.dev/release: devel
data:
  channelTemplateSpec: |
    apiVersion: messaging.knative.dev/v1beta1
    kind: NatssChannel

Knative release version
v0.19.0

Additional context
Add any other context about the problem here such as proposed priority

Get rid of the removed DeadLetterChannel in ChannelableStatus

Problem
This PR knative/eventing#6722 removes DeadLetterChannel in ChannelableStatus

We will have to remove it from the following places

Persona:
Which persona is this feature for?

Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.

Time Estimate (optional):
How many developer-days do you think this may take to resolve?

Additional context (optional)
Add any other context about the feature request here.

Performance using a single stream and MaxPending of 256

Problem
(I have not experienced a problem yet. I am pretty new to NATS, so excuse me if my question is obvious.)

Looking at the code, I'm worried about scaling; precisely about how many messages can be processed per second. In my use case, the order of magnitude is many thousands/second (to give an upper bound, let's say with a max of 100'000 messages/second).

My question is about this specific part of the code:

https://github.com/knative-sandbox/eventing-natss/blob/6c90ad0ecd9356f85977163bf3c1847fb00c6d13/pkg/natsutil/jetstreamutil.go#L10-L14

As far as I understand and from my tests, I have seen that there is only one stream created (K-ORDERS), with a restriction of 256 messages sent at the same time. I'm wondering if this is enough to handle a maximum of 100'000 messages/second. If I understand correctly the comment above MaxPending, 256 is the number of inflight messages at one time. I believe that it can be reached easily with 100'000 messages/second. What happens if that 256 number is reached? Are messages delayed?

It should be great to be able to customize MaxPending value. Also, is it possible/necessary to define multiple streams?

Persona:
Which persona is this feature for?
System Operator

Thanks for your help! 🙏

fix: Jetstream dispatcher fails with panic: runtime error: invalid memory address or nil pointer dereference

Describe the bug
Dispatcher fails with panic:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0x1803016]

goroutine 133 [running]:
knative.dev/eventing-natss/pkg/channel/jetstream/utils.parseBackoffFuncAndDelay(0x1a498a0?)
        knative.dev/eventing-natss/pkg/channel/jetstream/utils/consumerconfig.go:148 +0x16
knative.dev/eventing-natss/pkg/channel/jetstream/utils.CalculateNakDelayForRetryNumber(0x1f70ac0?, 0xc0008dc090?)
        knative.dev/eventing-natss/pkg/channel/jetstream/utils/consumerconfig.go:109 +0x1b
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*NatsMessageDispatcherImpl).processDispatchResult(0xc0001289a8, {0x1f8eab8, 0xc0007bd8c0}, 0xc0001289a8?, 0x1f8eab8?, 0xc0007bd710?, 0xc0007bd890?, {0x1f6ece0?, 0xc0004d3120?})
        knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/message_dispatcher.go:229 +0x75a
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*NatsMessageDispatcherImpl).DispatchMessageWithNatsRetries(0xc0001289a8, {0x1f8eab8, 0xc0007bd710}, {0x1f8ea48, 0xc00096b2a0}, 0xc0007bd770, 0xc0000d73b0, 0xc0000d7440, 0xc0000d74d0, 0xc0004e1f40, ...)
        knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/message_dispatcher.go:142 +0xf8a
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).doHandle(0xc0003961e0, {0x1f8eab8, 0xc0007bd710}, 0xc0001bc000)
        knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:119 +0x659
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler.func1()
        knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:78 +0x13d
created by knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler in goroutine 119
        knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:74 +0x67

Expected behavior
The message should be delivered to Knative Eventing.

To Reproduce

bob.json

{ "knativebrokerttl":"255", "specversion" : "1.0", "type" : "io-document", "source" : "/apis/v1/namespaces/default/ping", "subject" : "bob.laptop.tmux.default.foobar.42", "id" : "A234-1234-1234", "time" : "2018-04-05T17:31:00Z", "datacontenttype" : "application/json", "data" : { "ps1_start": "user@mindwm-dev1:/home/user$", "ps1_end": "mindwm-client-e3168b3e-dc6f-11ee-a11d-8f9d41316c00", "input": "docker ps", "output": "CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\nc262a8b3ffe9 catthehacker/ubuntu:act-22.04 \"tail -f /dev/null\" 17 hours ago Up 17 hours act-build-act-yaml-apply-cfea7ea99cb09115e5339bf380d4d4d4e801c8485e41b7af87a5afe0f52bdce5\nb0dca6d6255f 16baf1d54ec7 \"/usr/bin/docker-ent…\" 12 days ago Up 17 hours minio\n" } }

$ cat bob.json | jq | nats pub mindwm.user.mindwm-dev1.abc
13:07:50 Reading payload from STDIN
13:07:50 Published 902 bytes to "mindwm.user.mindwm-dev1.abc"
$ cat bob.json | jq | nats pub mindwm.user.mindwm-dev1.abc
13:07:51 Reading payload from STDIN
13:07:51 Published 902 bytes to "mindwm.user.mindwm-dev1.abc"
$ cat bob.json | jq | nats pub mindwm.user.mindwm-dev1.abc
13:07:51 Reading payload from STDIN
13:07:51 Published 902 bytes to "mindwm.user.mindwm-dev1.abc"

Knative release version
knative: 1.12.3
eventing-jsm.yaml: 1.10.7

Additional context
pod jetstream-ch-dispatcher lifecycle while error kubectl get pods -w ..

jetstream-ch-controller-877d7f697-fggk8    1/1     Running   0               16h
jetstream-ch-dispatcher-674c49f67b-fwmpd   1/1     Running   9 (5m17s ago)   15h
jetstream-ch-dispatcher-674c49f67b-fwmpd   0/1     Error     9 (5m52s ago)   15h
jetstream-ch-dispatcher-674c49f67b-fwmpd   0/1     CrashLoopBackOff   9 (13s ago)     15h
jetstream-ch-dispatcher-674c49f67b-fwmpd                   1/1     Running             10 (5m9s ago)   15h
jetstream-ch-dispatcher-674c49f67b-fwmpd                   0/1     Error               10 (5m14s ago)   15h
jetstream-ch-dispatcher-674c49f67b-fwmpd                   0/1     CrashLoopBackOff    10 (14s ago)     15h
jetstream-ch-dispatcher log

{"level":"warn","ts":"2024-03-09T12:59:28.883Z","logger":"jetstream-channel-dispatcher","caller":"dispatcher/consumer.go:109","msg":"Cannot parse the spancontext, creating a new span","commit":"44752ad-dirty","knative.dev/pod":"jetstream-ch-dispatcher-674c49f67b-fwmpd","knative.dev/controller":"knative.dev.eventing-natss.pkg.channel.jetstream.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.NatsJetStreamChannel","knative.dev/traceid":"de6cf57a-c425-4590-af73-e4cb7a6ef985","knative.dev/key":"ksandbox/mindwmbroker-kne-trigger","is_leader":false,"is_leader":false,"sub_uid":"026196f7-fe3b-491c-a852-949e605d3bce"}
{"level":"info","ts":"2024-03-09T12:59:28.883Z","logger":"jetstream-channel-dispatcher","caller":"controller/controller.go:550","msg":"Reconcile succeeded","commit":"44752ad-dirty","knative.dev/pod":"jetstream-ch-dispatcher-674c49f67b-fwmpd","knative.dev/controller":"knative.dev.eventing-natss.pkg.channel.jetstream.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.NatsJetStreamChannel","knative.dev/traceid":"de6cf57a-c425-4590-af73-e4cb7a6ef985","knative.dev/key":"ksandbox/mindwmbroker-kne-trigger","duration":0.002935707}
{"level":"warn","ts":"2024-03-09T12:59:28.884Z","logger":"jetstream-channel-dispatcher","caller":"dispatcher/consumer.go:109","msg":"Cannot parse the spancontext, creating a new span","commit":"44752ad-dirty","knative.dev/pod":"jetstream-ch-dispatcher-674c49f67b-fwmpd","knative.dev/controller":"knative.dev.eventing-natss.pkg.channel.jetstream.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.NatsJetStreamChannel","knative.dev/traceid":"de6cf57a-c425-4590-af73-e4cb7a6ef985","knative.dev/key":"ksandbox/mindwmbroker-kne-trigger","is_leader":false,"is_leader":false,"sub_uid":"29c2eb36-0a62-44db-8459-8f5e9db05029"}
{"level":"error","ts":"2024-03-09T12:59:32.827Z","logger":"jetstream-channel-dispatcher","caller":"dispatcher/message_dispatcher.go:210","msg":"failed to execute message","commit":"44752ad-dirty","knative.dev/pod":"jetstream-ch-dispatcher-674c49f67b-fwmpd","error":"unexpected HTTP response, expected 2xx, got 502","dispatch_resp_code":502,"stacktrace":"knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*NatsMessageDispatcherImpl).processDispatchResult\n\tknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/message_dispatcher.go:210\nknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*NatsMessageDispatcherImpl).DispatchMessageWithNatsRetries\n\tknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/message_dispatcher.go:142\nknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).doHandle\n\tknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:119\nknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler.func1\n\tknative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:78"}
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0x1803016]

goroutine 99 [running]:
knative.dev/eventing-natss/pkg/channel/jetstream/utils.parseBackoffFuncAndDelay(0x1a498a0?)
knative.dev/eventing-natss/pkg/channel/jetstream/utils/consumerconfig.go:148 +0x16
knative.dev/eventing-natss/pkg/channel/jetstream/utils.CalculateNakDelayForRetryNumber(0x1f70ac0?, 0xc00030ad80?)
knative.dev/eventing-natss/pkg/channel/jetstream/utils/consumerconfig.go:109 +0x1b
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*NatsMessageDispatcherImpl).processDispatchResult(0xc0000103a8, {0x1f8eab8, 0xc0006131a0}, 0xc0000103a8?, 0x1f8eab8?, 0xc000612ff0?, 0xc000613170?, {0x1f6ece0?, 0xc00040ae10?})
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/message_dispatcher.go:229 +0x75a
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*NatsMessageDispatcherImpl).DispatchMessageWithNatsRetries(0xc0000103a8, {0x1f8eab8, 0xc000612ff0}, {0x1f8ea48, 0xc00040a750}, 0xc000613050, 0xc000484a20, 0xc000484ab0, 0xc000484b40, 0xc000527680, ...)
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/message_dispatcher.go:142 +0xf8a
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).doHandle(0xc0000d23c0, {0x1f8eab8, 0xc000612ff0}, 0xc000180930)
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:119 +0x659
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler.func1()
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:78 +0x13d
created by knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler in goroutine 145
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:74 +0x67

Upgrade to latest JetStream SDK

Problem
We're using a legacy SDK for interacting with NATS, I have been observing some performance issues in our environment, and although I'm not 100% certain the issue is in this repo, things got slightly better after restarting the dispatcher. I believe it's still to do with the number of goroutines we have running, and the new SDK makes it much easier to control how many messages we fetch in parallel.

I'm proposing this as a feature rather than a bug, because as I said, I'm not actually 100% certain there is a performance issue.

I already have some WIP changes which use the new SDK whilst fixing some other minor issues, any PR is probably going to come addressing multiple issues (sorry! but doing all the things I want to do in separate PRs is going to take too long and I'm pushed for time in this repo)

Persona:

  • System Integrator
  • System Operator

Exit Criteria
Library is updated and performance is improved.

Time Estimate (optional):
1 day

Additional context (optional)
My WIP code makes changes so that we use the new FetchNoWait(batchSize) method, and convert the jetstream.Msg to a new struct which implements binding.Message (since the CE SDK doesn't support this library yet). This struct also contains a Context() method which is configured with a timeout based on AckWait so that we can successfully cancel requests as soon as they expire.

We then write this Message implementation to a local channel which we handle in a fixed number of goroutines, rather than spawning a new goroutine for every message.

I think this will make things much easier to tweak for performance issues, and should be more performant out of the box.

Make proper use of Stats reporter.

Problem
Function signatures changed here:
knative/eventing#4259

I've fixed so the natss compiles, but we need to properly make use of the stats. Just like in the above PR.

Persona:
Which persona is this feature for?

Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.

Time Estimate (optional):
1-2

Additional context (optional)
Add any other context about the feature request here.

Scaling dispatchers for higher throughput

Problem
I'm trying to integrate NATS channel to my application, and with default configuration only one event can be handled at a time. This severely limit the throughput of my application. Increasing number of dispatcher pods should increase the throughput.

Persona:
Developer

Exit Criteria
Developers can scale number of dispatcher for NATS channel. Number of events processed at a time should scale with number of dispatchers

Time Estimate (optional):
How many developer-days do you think this may take to resolve?

Additional context (optional)
Add any other context about the feature request here.

JetStream release combined YAML assets contain namespaces on cluster level resources

Describe the bug
The eventing-jsm.yaml asset in all releases contains namespaces on ClusterRoleBinding resources, as well as having RoleBinding resources reference ClusterRole resource with a namespace.
2 examples:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: jetstream-ch-dispatcher
  namespace: knative-eventing
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: jetstream-ch-dispatcher
subjects:
  - kind: ServiceAccount
    name: jetstream-ch-dispatcher
    namespace: knative-eventing

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: jetstream-ch-dispatcher-configmap-reader
  namespace: knative-eventing
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: eventing-config-reader
subjects:
  - kind: ServiceAccount
    name: jetstream-ch-dispatcher
    namespace: knative-eventing

Expected behavior
Applying the manifest would work and allow to version lock deployments of the NATS extension if required to a specific release.

To Reproduce
N/A

Knative release version
1.10.7 (appears to be in all versions though)

Additional context
N/A

CrashLoopBackOff v1.10.3

Describe the bug

panic: runtime error: index out of range [1024] with length 1024
goroutine 215 [running]:
encoding/base64.(*Encoding).Encode(0x1bfe220?, {0xc00087b400?, 0x3dc?, 0x4?}, {0xc00087bc00?, 0xc000669198?, 0xc000669388?})
	encoding/base64/base64.go:158 +0x26f
knative.dev/eventing/pkg/channel.(*MessageDispatcherImpl).dispatchExecutionInfoTransformers(0xc0005de150, 0x1?, 0xc0009eca80)
	knative.dev/[email protected]/pkg/channel/message_dispatcher.go:324 +0x4fe
knative.dev/eventing/pkg/channel.(*MessageDispatcherImpl).DispatchMessageWithRetries(0xc0005de150, {0x1f63a60, 0xc0009ec900}, {0x1f639f0, 0xc00086e170}, 0xc0009ec960, 0xc0007fad80?, 0xc0007fae10, 0xc0007faea0, 0xc00059cd80, ...)
	knative.dev/[email protected]/pkg/channel/message_dispatcher.go:140 +0xf28
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).doHandle(0xc000397b90, {0x1f63a60, 0xc0009ec900}, 0xc00050a1c0)
	knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:168 +0x628
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler.func2.1(0xc000083db0?, 0xc000669de0, 0x1f44340?, {0x1f63a60?, 0xc0009ec900?}, 0x10?)
	knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:107 +0x48
knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler.func2()
	knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:108 +0x6e
created by knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher.(*Consumer).MsgHandler in goroutine 161
	knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/consumer.go:101 +0x2f3

Knative release version

1.9

Incorrect number of retries when when MaxDeliver bigger then 1

Describe the bug
Retries are configured on Channel.consumerConfigTemplate.maxDeliver and trigger/subscriber level. Dispatcher sends messages via DispatchMessageWithRetries based on trigger/subscriber config.

If I set both retries: on trigger/subscriber level and Channel.consumerConfigTemplate.maxDeliver and a targer service will timeout each request, then I get (maxDeliver*retryMax) number of retries.

Expected behavior
In case of bad request or timeout maxDeliver should be used, message should be terminated.

To Reproduce
Setup a broker and trigger (or channel and subscription)

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: "default"

broker config:

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-br-default-channel
  namespace: knative-eventing
data:
  channel-template-spec: |-
    apiVersion: messaging.knative.dev/v1alpha1
    kind: NatsJetStreamChannel
    spec:
      stream:
        config:
          retention: Limits
          maxBytes: 1000000000
          replicas: 1
      consumerConfigTemplate:
        deliverPolicy: New
        maxAckPending: 100
        maxDeliver: 3
        ackWait: 300s
      delivery:
        backoffDelay: PT0.2S
        backoffPolicy: exponential
        retry: 0
        timeout: PT300S

trigger

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: display-trigger
spec:
  broker: default
  filter:
    attributes:
      type: fun.display
  delivery:
    backoffDelay: PT2S
    backoffPolicy: exponential
    retry: 3
    timeout: PT5S
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

and create a function with delay more then 5s and send event to broker

Knative release version
1.10.x

cc @dan-j

eventing-natss cannot send messages after natss-streaming restarts

Describe the bug
I noticed that if natss-streaming restarts,
natss-ch-dispatcher can only receive messages, but cannot send messages to eventing. Is there any solution for this?

Expected behavior
I hope to still be able to send messages after natss-streaming restarts

Remove bespoke SendMessage implementation

Problem
Copying the kncloudevents.Dispatcher smells a bit, and I have a proposal to remove it.

We can implement our own WithRetryConfig and WithDeadLetterSink options which control the dispatcher based on current state. The one thing I have a solution for, but 50/50 if I like it, is signalling when to nak/term messages...

I have this bit of code in WIP:

func ReportWithoutRetry(result *error, attempt int, cfg *kncloudevents.RetryConfig) *kncloudevents.RetryConfig {
	if cfg == nil {
		return nil
	}

	next := *cfg

	next.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
		retry, err := cfg.CheckRetry(ctx, resp, err)

		if retry {
			if cfg.Backoff != nil {
				multierr.AppendInto(result, NewDelayNak(cfg.Backoff(attempt, resp)))
			} else {
				multierr.AppendInto(result, protocol.ResultNACK)
			}
		}

		return false, err
	}

	return &next
}

We take in the RetryConfig defined on the subscription, and decorate it with our own CheckRetry function which always returns false but sets a protocol.ResultNACK error. You can then use it like so:

var retryError error

defer func() {
	err = msg.Finish(multierr.Append(err, retryError))
}()

...

dispatchExecutionInfo, err := c.dispatcher.SendMessage(
	ctx,
	internal.WithoutFinish(msg),
	c.sub.Subscriber,
	internal.WithRetryReporter(&retryError, attempt, c.sub.RetryConfig),
	internal.WithDeadLetterOnLastAttempt(attempt, c.sub.RetryConfig, c.sub.DeadLetter),
	kncloudevents.WithReply(c.sub.Reply),
	kncloudevents.WithTransformers(&te),
)

The finish method of the message (see #542) then understands how to check the errors to determine whether to nack, and if so what delay.

At the moment, my dead-letter handling will just ack the message. I'm not sure if this should result in a Term or not?

Persona:

  • System Integrator

Exit Criteria
No copy/paste of kncloudevents.

Time Estimate (optional):
1 day

Channel has two reconcilers updating it's status. That can lead to badness.

Describe the bug
While debugging an issue with natss channel, I noticed that the controller reconciler updates the status of the channel to indicate the channel resources have been created. But, the dispatcher reconciler also updates the status. I think this is a bug, similar to what we saw in IMC, some of the details of the issue are reported here:
knative/eventing#3686

I think what should be done is to patch the status from the dispatcher instead of using a genreconciler style ReconcileKind method.
Something like this:
https://github.com/knative/eventing/blob/master/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go#L94

A clear and concise description of what the bug is.

Expected behavior
A single reconciler only does a Status update.

To Reproduce
Steps to reproduce the behavior.

Knative release version

Additional context
Add any other context about the problem here such as proposed priority

Support Nats JetStream

Problem
Related to quesion in knative-users GoogleGroups

We are now using nats streaming server. And Nats deprecated nats streaming server.
Instead they suggest using JetStream.

So we should support jetstream in eventing-natss's dispatcher ,

This is a long-term task, we will gradually replace it by deprecating supporting nats streaming server, maybe supported at about 0.26 version or later

steps

  • add natsjetstreamchannel support #182
  • support jetstream in webhook #207
  • add ut e2e
  • add docs
  • rename the repo to eventing-nats knative/community#666

Persona:
Which persona is this feature for?

People who use jetstream for Event Channel

Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.

Time Estimate (optional):
How many developer-days do you think this may take to resolve?

maybe 2 or 3 days?

Additional context (optional)

update nats client package from stan.go to nats.go https://github.com/nats-io/nats.go

NATS reconnect jitter incorrect

Describe the bug
A clear and concise description of what the bug is.

We configure the NATS connection with a reconnect jitter like so:

opts = append(opts, nats.ReconnectJitter(1000, time.Millisecond))

This option on the NATS SDK is described as so:

// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
// Defaults to 100ms and 1s, respectively.
func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
	return func(o *Options) error {
		o.ReconnectJitter = jitter
		o.ReconnectJitterTLS = jitterForTLS
		return nil
	}
}

I believe our implementation is assuming the first parameter is the number of "units" to jitter by, and the second param is the unit, but it's actually two separate jitters for plaintext and TLS connections. The defaults are 100ms and 1s respectively, but our implementation is set to 1000ms and 1ms.

Expected behavior

These should be configurable by the user as separate configuration items.

@astelmashenko this was added in #408, was there any intentional rational for changing the plaintext jitter from the default 100ms to 1000ms? Happy to keep 1000ms as the default if this isn't configured, but I feel like that's rather large and would like to reduce the default if there was no purpose to the increase.

docs: Improve docs around `examples/`

Problem
There seems to be some great examples in the repository but it is not clear without having a deep understanding maybe of knative-eventing or NATS, what purpose they serve or what can be learned from them.

Persona:

All

Exit Criteria
Testing for the actual existence of documentation doesn't really tell us if it is usefulness or coherent, I'm not sure if there is a binary that we could run against documentation (yet) that would give us that signal.

Time Estimate (optional):
A few hours at best.

Additional context (optional)
Add any other context about the feature request here.

x509 certificate signed by unknown authority

Describe the bug
After deleting a broker, jetstream controller can not do finalization of underlying channel because of comminication error with nats-webhook.

Expected behavior
Broker/channel delete is working.

Knative release version
1.3.2

Additional context
eventing-natss version is 1.3.5

I create a broker and then deleted it, then observed that channel has not been deleted. And observer error logs.
jetstream-channel-controller:

{
    "level": "error",
    "ts": "2022-12-23T11:30:23.039Z",
    "logger": "jetstream-channel-controller",
    "caller": "controller/controller.go:559",
    "msg": "Reconcile error",
    "knative.dev/pod": "jetstream-ch-controller-57c65d84fb-5p5pm",
    "knative.dev/controller": "knative.dev.eventing-natss.pkg.channel.jetstream.controller.Reconciler",
    "knative.dev/kind": "messaging.knative.dev.NatsJetStreamChannel",
    "knative.dev/traceid": "b83875e4-319c-447c-ae15-0cff0e8ab9e3",
    "knative.dev/key": "viax/internal-kne-trigger",
    "duration": 0.049331877,
    "error": "failed to clear finalizers: Internal error occurred: failed calling webhook \"webhook.nats.messaging.knative.dev\": Post \"https://nats-webhook.knative-eventing.svc:443/defaulting?timeout=2s\": x509: certificate signed by unknown authority (possibly because of \"x509: ECDSA verification failure\" while trying to verify candidate authority certificate \"nats-webhook.knative-eventing.svc\")",
    "stacktrace": "knative.dev/pkg/controller.(*Impl).handleErr\n\tknative.dev/[email protected]/controller/controller.go:559\nknative.dev/pkg/controller.(*Impl).processNextWorkItem\n\tknative.dev/[email protected]/controller/controller.go:536\nknative.dev/pkg/controller.(*Impl).RunContext.func3\n\tknative.dev/[email protected]/controller/controller.go:484"
}

and nats-webhook logs:

{"level":"info","ts":"2022-12-23T13:40:55.322Z","logger":"nats-webhook","caller":"webhook/admission.go:90","msg":"Webhook ServeHTTP request=&http.Request{Method:\"POST\", URL:(*url.URL)(0xc000a02cf0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Accept\":[]string{\"application/json, */*\"}, \"Accept-Encoding\":[]string{\"gzip\"}, \"Content-Length\":[]string{\"37445\"}, \"Content-Type\":[]string{\"application/json\"}, \"User-Agent\":[]string{\"kube-apiserver-admission\"}}, Body:(*http.body)(0xc0008b7700), GetBody:(func() (io.ReadCloser, error))(nil), ContentLength:37445, TransferEncoding:[]string(nil), Close:false, Host:\"nats-webhook.knative-eventing.svc:443\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"172.212.1.64:38352\", RequestURI:\"/defaulting?timeout=2s\", TLS:(*tls.ConnectionState)(0xc00019d6b0), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.cancelCtx)(0xc0008b7740)}"}
{"level":"info","ts":"2022-12-23T13:40:55.341Z","logger":"nats-webhook","caller":"defaulting/defaulting.go:158","msg":"Kind: \"messaging.knative.dev/v1alpha1, Kind=NatsJetStreamChannel\" PatchBytes: null","knative.dev/kind":"messaging.knative.dev/v1alpha1, Kind=NatsJetStreamChannel","knative.dev/namespace":"viax","knative.dev/name":"internal-kne-trigger","knative.dev/operation":"UPDATE","knative.dev/resource":"messaging.knative.dev/v1alpha1, Resource=natsjetstreamchannels","knative.dev/subresource":"","knative.dev/userinfo":"{system:serviceaccount:knative-eventing:jetstream-ch-controller 1d7b066d-347a-4466-9374-fcf4d8529081 [system:serviceaccounts system:serviceaccounts:knative-eventing system:authenticated] map[authentication.kubernetes.io/pod-name:[jetstream-ch-controller-57c65d84fb-fw8td] authentication.kubernetes.io/pod-uid:[a05e83d6-0041-4728-b9d6-aab1ead4b94c]]}"}
{"level":"info","ts":"2022-12-23T13:40:55.341Z","logger":"nats-webhook","caller":"webhook/admission.go:133","msg":"remote admission controller audit annotations=map[string]string(nil)","knative.dev/kind":"messaging.knative.dev/v1alpha1, Kind=NatsJetStreamChannel","knative.dev/namespace":"viax","knative.dev/name":"internal-kne-trigger","knative.dev/operation":"UPDATE","knative.dev/resource":"messaging.knative.dev/v1alpha1, Resource=natsjetstreamchannels","knative.dev/subresource":"","knative.dev/userinfo":"{system:serviceaccount:knative-eventing:jetstream-ch-controller 1d7b066d-347a-4466-9374-fcf4d8529081 [system:serviceaccounts system:serviceaccounts:knative-eventing system:authenticated] map[authentication.kubernetes.io/pod-name:[jetstream-ch-controller-57c65d84fb-fw8td] authentication.kubernetes.io/pod-uid:[a05e83d6-0041-4728-b9d6-aab1ead4b94c]]}","admissionreview/uid":"f32148bf-86b0-465a-b048-133d866910d3","admissionreview/allowed":true,"admissionreview/result":"nil"}
{"level":"debug","ts":"2022-12-23T13:40:55.341Z","logger":"nats-webhook","caller":"webhook/admission.go:134","msg":"AdmissionReview patch={ type: JSONPatch, body: null }","knative.dev/kind":"messaging.knative.dev/v1alpha1, Kind=NatsJetStreamChannel","knative.dev/namespace":"viax","knative.dev/name":"internal-kne-trigger","knative.dev/operation":"UPDATE","knative.dev/resource":"messaging.knative.dev/v1alpha1, Resource=natsjetstreamchannels","knative.dev/subresource":"","knative.dev/userinfo":"{system:serviceaccount:knative-eventing:jetstream-ch-controller 1d7b066d-347a-4466-9374-fcf4d8529081 [system:serviceaccounts system:serviceaccounts:knative-eventing system:authenticated] map[authentication.kubernetes.io/pod-name:[jetstream-ch-controller-57c65d84fb-fw8td] authentication.kubernetes.io/pod-uid:[a05e83d6-0041-4728-b9d6-aab1ead4b94c]]}","admissionreview/uid":"f32148bf-86b0-465a-b048-133d866910d3","admissionreview/allowed":true,"admissionreview/result":"nil"}
2022/12/23 13:40:55 http: TLS handshake error from 172.212.1.64:50934: remote error: tls: bad certificate

is it really certificate problem? one strange thing is this message: AdmissionReview patch={ type: JSONPatch, body: null } from the last debug log before error log remote error: tls: bad certificate

Any thoughts?

cc
@dan-j
@lionelvillard
@zhaojizhuang

Broker field `spec.delivery.retry` is ignored

Describe the bug
When configuring a broker with fields under spec.delivery (retry, backoffPolicy, backoffDelay, ...), those fields seems to be ignored. There is no retries if the Trigger object fails to deliver successfully the message to the subscriber. In my case, the subscriber is a ksvc, and there is no retries if that ksvc returns an error.

Expected behavior
When configuring a broker with fields under spec.delivery, the message should be redelivered (multiple retries) if the receiver (ksvc) returns an error.

To Reproduce

  1. Have a fresh Kubernetes cluster with Knative Serving and Eventing installed via YAML files (version 1.4.0).

  2. Install components to work with NATS (natsjsm.yaml, channel messaging layer eventing-natss.yaml, broker layer mt-channel-broker.yaml):

# https://github.com/knative-sandbox/eventing-natss/blob/release-1.4/config/broker/README.md
kubectl apply -f https://raw.githubusercontent.com/knative-sandbox/eventing-natss/knative-v1.4.0/config/broker/natsjsm.yaml

# https://knative.dev/docs/install/yaml-install/eventing/install-eventing-with-yaml/#optional-install-a-default-channel-messaging-layer
kubectl apply -f https://github.com/knative-sandbox/eventing-natss/releases/download/knative-v1.4.0/eventing-natss.yaml

# https://knative.dev/docs/install/yaml-install/eventing/install-eventing-with-yaml/#optional-install-a-broker-layer
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.4.0/mt-channel-broker.yaml
  1. Use NatsJetStreamChannel as default channel:
kubectl patch configmap/config-br-default-channel \
  --namespace knative-eventing \
  --patch '{"data":{"channel-template-spec": "apiVersion: messaging.knative.dev/v1alpha1\nkind: NatsJetStreamChannel"}}'
  1. Deploy a simple ksvc (kn service create error-service --image=myimage --port=8080) that always return an error 503 to test handling delivery failure. The code is the following:
package main

import (
	"log"
	"net/http"
)

func handler(w http.ResponseWriter, req *http.Request) {
	log.Println("It does not work, returning 503")
	http.Error(w, "Does not work", 503)
}

func main() {
	http.HandleFunc("/", handler)
	http.ListenAndServe(":8080", nil)
}
  1. Create a Broker, a Trigger pointing to that ksvc, and a PingSource to send an event every minute:
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  namespace: default
  annotations:
    eventing.knative.dev/broker.class: MTChannelBasedBroker
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: config-br-default-channel # NatsJetStreamChannel defined in the ConfigMap
    namespace: knative-eventing
  delivery:
    retry: 5
    backoffPolicy: exponential
    backoffDelay: "PT1S"
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
  namespace: default
spec:
  broker: default
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: error-service
---
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: ping
  namespace: default
spec:
  schedule: "* * * * *"
  contentType: "application/json"
  data: '{"msg": "ping"}'
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
  1. Wait for the logs of error-service:
2022/05/29 14:22:00 It does not work, returning 503
2022/05/29 14:23:00 It does not work, returning 503
2022/05/29 14:24:00 It does not work, returning 503

As you can see, in the logs of the error-service pod, there is no sign that the message is send again after an error (there is one log per minute, not 1 + 5 retries as defined in the Broker spec.delivery.retry field.

Knative release version
v1.4.0

Additional context
I have also tried with NatssChannel (https://github.com/knative-sandbox/eventing-natss/blob/release-1.4/config/broker/README.md#1-nats-streaming-deprecation-notice-) and got the same result.

The docs (https://github.com/knative-sandbox/eventing-natss/blob/knative-v1.4.0/config/README.md#nats-streaming-channels) states:

If downstream rejects an event, that request is attempted again.

And the knative docs states that Nats Channels does not support any of the delivery fields (https://knative.dev/docs/eventing/event-delivery/#channel-support):

Nats Channel does not support delivery fields

So I don't know if this is the normal behavior or if I'm doing something wrong or misunderstanding something.

But what I would like to do is retrying the PingSource event if the ksvc returns an error.

Thanks for your help 🙌

Add the eventing source label to indicate the source type

Describe the bug
All eventing sources should have a label to indicate the source type for all the released resources.
The similar label is networking.knative.dev/ingress-provider for all the serving ingresses.
I propose to add the label contrib.knative.dev/source-provider for all the available resources.
Since this repository is for eventing-natss, we will use contrib.knative.dev/source-provider: natss.

Expected behavior
Add the label contrib.knative.dev/source-provider: natss for all resources.

Knative release version
main

Additional context
Facilitate the support in knative operator

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.