openfaas / connector-sdk Goto Github PK
View Code? Open in Web Editor NEWSDK for connecting events to functions
License: MIT License
SDK for connecting events to functions
License: MIT License
For controller authors it would be nice to provide some guidance on sensible defaults for RebuildInterval
and UpstreamTimeout
. Currently, if none if these are provided it's unclear what the defaults are (in the case of UpstreamTimeout
there is no timeout at all which could cause blocking issues during network/API problems).
Provide guidance (docs) on sensible defaults or check provided values (empty) in the code and enforce timeouts respectively.
If neither of these intervals/timeouts are specified the behavior of the connector using the SDK is undetermined, especially in cases of network/API interruptions.
Check/Update connector-sdk so that it is namespace-aware
Linked issue openfaas/faas-netes#511
Right now, when using the connector SDK and the connector client cannot connect/reach OpenFaaS gateway, it fails with a not very user friendly log.Fatal error:
2019/02/18 22:16:44 Get http://127.0.0.1:8080/system/functions: dial tcp 127.0.0.1:8080: connect: connection refused
In larger and more complex programs using the SDK this can get tricky to debug.
Instead, it should return an error (or <-chan error since it's spawning a goroutine) so users of the SDK can handle the error appropriately (retry, wrap the error, use different logging [related #6 ]).
When a controller is configured to print the response body, the response body will not actually appear in the logs until the next newline is printed to stdout.
The best explanation is probably an example. The following is taken from running a version of the kafka-connector built with the connector SDK with a simple function hello
that returns "Hello" every time it is invoked.
1. | 2019/02/02 05:45:33 Syncing topic map
2. | 2019/02/02 05:45:36 Invoke function: hello
3. | [#1] Received on [faas-request,0]: 'a'
4. | [200] faas-request => hello
5. | 2019/02/02 05:45:36 connector-sdk got result: [200] faas-request => hello (6) bytes
6. | 2019/02/02 05:45:36 Syncing topic map
7. | 2019/02/02 05:45:39 Syncing topic map
8. | 2019/02/02 05:45:42 Syncing topic map
9. | "Hello\n"[#2] Received on [faas-request,0]: 'b'
10. | 2019/02/02 05:45:42 Invoke function: hello
11. | 2019/02/02 05:45:42 connector-sdk got result: [200] faas-request => hello (6) bytes
12. | [200] faas-request => hello
13. | 2019/02/02 05:45:45 Syncing topic map
Line 4 is part of printing the response body, and the expected behavior is that the "Hello\n" message would be printed on the next line. However, the message is not printed until line 9. This is the next point at which a newline is printed to stdout. Note that the message for the second invocation is not printed at all.
To verify that the problem is indeed related to printing a newline to stdout, I created a ResponseSubscriber whose implementation was simply fmt.Println()
and subscribed it to the controller. The result exhibits the expected behavior:
1. | 2019/02/02 06:24:08 Syncing topic map
2. | [#1] Received on [faas-request,0]: 'a'
3. | 2019/02/02 06:24:09 Invoke function: hello
4. | [200] faas-request => hello
5. | "Hello\n"
6. | 2019/02/02 06:24:09 connector-sdk got result: [200] faas-request => hello (6) bytes
7. | [#2] Received on [faas-request,0]: 'b'
8. | 2019/02/02 06:24:11 Invoke function: hello
9. | 2019/02/02 06:24:11 connector-sdk got result: [200] faas-request => hello (6) bytes
10. | [200] faas-request => hello
11. | "Hello\n"
12. | 2019/02/02 06:24:11 Syncing topic map
I did a pair session with Alex some weeks ago about the [cloudnative bot] and
I added a check to validate the content type: https://github.com/cpanato/cloudnative-bot/blob/main/functions/k8s-patch-release/handler.go#L59
I disable that because looks like the connector-sdk looks like does not set the content type
I’ve made a simple project with the connector-sdk + a very simple function and i can see that is not set
I checked the code for connector-sdk and did not see anything that sets this header
Set the content-type header header
not setting the content-type header
Add and option when invoking the function to pass a set of headers that the user want to forward to the function
maybe add another parementer in the invoke function: https://github.com/openfaas/connector-sdk/blob/master/cmd/tester/main.go#L50
func Handle(req []byte) string {
log.Printf("Content Type: %s", os.Getenv("Http_Content_Type"))
if os.Getenv("Http_Content_Type") != "application/json" {
return "Invalid Content Type"
}
log.Println(string(req))
if req == nil {
log.Println("nothing to process")
return "nothing to process"
}
return fmt.Sprintf("Hello, Go. You said: %s", string(req))
}
FaaS-CLI version ( Full output from: faas-cli version
): 0.13.10-2-g6aa5d832
Docker version docker version
(e.g. Docker 17.0.05 ): 20.10.6
Are you using Docker Swarm or Kubernetes (FaaS-netes)? no
Operating System and version (e.g. Linux, Windows, MacOS): linux/macos
Code example or link to GitHub repo or gist to reproduce problem: --
Other diagnostic information / logs from troubleshooting guide
You may join Slack for community support.
If we move to sync.RWMutex then we can use a RLock()
when constructing the topic list for external callers.
Alex
cron-connector queries the list of functions (code) and calls functions using the REST API (code), it can probably be improved by including these functionalities inside the connector SDK.
Add the following functions to connector SDK
Currently, a ResponseSubscriber
gets no contextual information whatsoever about a given request in its Response
method. This means that if, for example, I am writing a connector for a given message broker, I cannot mark a given message as having been processed / requiring a retry based on the response(s) because I have no way of correlating responses with whatever message/... originated them.
As a (hopefully) more concrete example, we are building a connector for AWS SQS queue that receives messages from a queue and invokes a function with each message's payload, and we need to...
I thought about two ways of working around this, both of which require code changes.
correlationID
parameter which would be passed to Invoke
and be echoed back in each response.I chose to go with solution (2) as #25 because it seems to be the most flexible one to me.
It seems that the connector-sdk
maps all functions ignoring the namespace they are in. This is a problem, specially in multi-tenant environments.
Configuration should accept a namespace in order to get only the functions deployed in that namespace.
At this moment, all functions are mapped without taking the function's namespace into account.
Adding a Namespace
attribute to ControllerConfig
would allow to filter functions by namespace. If the namespace is not set, the namespace value would be empty (by default) and everything would work as usual, so it wouldn't be a breaking change but a feature. Connectors (mqtt-connector, rabbitmq-connector...) will need to add support to set a namespace, e.g. using an environment variable.
I added a pull request to add support for namespace filtering that shows how I've tried to solve this problem.
I'm working in a multi-tenant platform that allows users to deploy their own functions. In order to group functions by users, they are deployed in different namespaces. The same way, users should be able to deploy connectors that must only invoke the functions deployed in the user's namespace.
FaaS-CLI version ( Full output from: faas-cli version
):
commit: c12d57c39ac4cc6eef3c9bba2fb45113d882432f
version: 0.12.14
Docker version docker version
(e.g. Docker 17.0.05 ): Docker 19.03.12
Are you using Docker Swarm or Kubernetes (FaaS-netes)? No
Operating System and version (e.g. Linux, Windows, MacOS): Linux
Currently the controller does not offer a way for coordinated/graceful shutdown. There's also a risk of go routine leaks, e.g. here: https://github.com/openfaas-incubator/connector-sdk/blob/bb7e84c27a54ab641317d26961300ed92aea739e/types/controller.go#L61
Proposal: accept a context and propagate/react to ctx.Done()
:
func NewController(ctx context.Context, credentials *auth.BasicAuthCredentials, config *ControllerConfig) *Controller {
invoker := Invoker{
PrintResponse: config.PrintResponse,
Client: MakeClient(config.UpstreamTimeout),
GatewayURL: config.GatewayURL,
}
topicMap := NewTopicMap()
return &Controller{
Config: config,
Invoker: &invoker,
TopicMap: &topicMap,
Credentials: credentials,
}
}
Please comment on whether this would be an improvement, and how we should make use of context
, e.g. embedding it into the controller struct
, change downstream function signatures to enforce the context, use a <-stopCh
or bool to signal shutdown, etc.
Issue: Incase the openfaas gateway is down or function does not scale in time from 0 to 1. The messages might be lost.
Proposal: While calling the gateway is it a good idea to add retries and on multiple failures add the message back to the queue.
The kafka-connector project added a Topics function to TopicMap after the code was pulled into the connector-sdk project. I am currently working on migrating the Kafka connector onto the connector SDK and would like this function to be available as part of the SDK. I think the function would also be useful to other connectors.
However, I do think there is one problem with pulling the function in as-is. To access the current list of topics, a connector would use controller.TopicMap.Topics()
. I would argue that this promotes inappropriate intimacy between the connector and the TopicMap, and that an additional function should be added to Controller to get the current list of topics, perhaps also named Topics
. This was not an issue in the kafka-connector because it interacted with a TopicMap directly.
Do you think this is an appropriate change for the SDK?
At the moment there's no possibility to add HTTP header fields to the invoke
function.
I've found this issue addressing the same feature, but it was solved in a way I don't understand (afaik the http
package does not allow you to set header fields via the Context
) and probably solved only his own issue (the tracing). Additionally, there was already a PR with a possible solution, but it died halfway.
Our case: We're working with CloudEvents and need to set the Content-Type
to application/cloudevents+json
to make relating sdk (https://github.com/cloudevents/sdk-go) at the function work properly.
I think the custom header fields could be set in the ControllerConfig
, but maybe that's to general, or passed as an argument for Invoke
and InvokeWithContext
methods. Another option to keep the method's signature lean would be to support custom header fields in the Context
passed by InvokeWithContext
(a bit diverting though), or establish some kind of InvocationOptions
type as an argument which holds custom headers and maybe other settings in future.
/system/namespaces
is implemented.
/system/namespaces
is not implemented (returns 404 page not found
).
/system/namespaces
.This prevents the latest release of https://github.com/openfaas-incubator/connector-sdk from working at all, as it cannot list namespaces:
This causes applications consuming the SDK to crash:
https://github.com/openfaas-incubator/connector-sdk/blob/master/types/controller.go#L156-L159
Docker version docker version
(e.g. Docker 17.0.05 ): N/A
What version and distriubtion of Kubernetes are you using? kubectl version
Client Version: version.Info{Major:"1", Minor:"14", GitVersion:"v1.14.6", GitCommit:"96fac5cd13a5dc064f7d9f4f23030a6aeface6cc", GitTreeState:"clean", BuildDate:"2019-08-19T11:13:49Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"14+", GitVersion:"v1.14.6-eks-5047ed", GitCommit:"5047edce664593832e9b889e447ac75ab104f527", GitTreeState:"clean", BuildDate:"2019-08-21T22:32:40Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"linux/amd64"}
MacOS 10.14.6
N/A
Enable support for adding multiple topic values to a function's topic
annotation.
Multiple topics could be defined using a CSV value so that it follows the pattern for specifying multiple topics in the connector.
The Build
method which compiles a map of topic names and functions that have advertised to receive messages on said topic would need to be changed so that rather than taking the full annotation value as the topic name it instead splits the annotation value on commas and uses each of the consequent values as topic names.
I would like to propose adding a Read/GetFromEnv
to the connector-sdk here https://github.com/openfaas-incubator/connector-sdk/blob/df5d76475412b74c3516ba912ae4522793780994/types/controller.go#L17 similar to how we did it in the faas-provider https://github.com/openfaas/faas-provider/blob/d6579bdcf7c85f4d01f398d65ea0cab37e9633d0/types/read_config.go#L77 , so that PRs like this openfaas/nats-connector#3 that copy the config from one location to another are not needed. If you update the SDK and the config changes, the propogation of the new config should be automatic. Copying config from one struct to another going to to be bug prone.
This would make configuration consistent in the various connectors and simplify upgrades downstream.
Is there a reason why the SDK is not using Go Modules? Or are we free to try migrating it.
As a user I would like to consume events from a queue and then have them processed by my existing asynchronous queue workers over NATS Streaming so that I can have one single way to process all events and asynchronous code.
Several users have requested the ability to "retry" invocations, but I think that retrying is made more difficult by the 1:* relationship functions have to topics.
Conceptual diagram of invocations
As a developer using the connector-sdk, I want to be able to retry invocations that failed
If a message comes in on a topic payment
, and only one function stripe-fn
has a matching annotation of: topic: payment
.
In the case of a failure, the connector could just retry. This would probably be OK, depending on the function, perhaps some caution should be applied here.
The Linkerd2 documentation talks about the dangers of retrying automatically and about retry budgets. See: "How Retries can Go wrong" https://linkerd.io/2/features/retries-and-timeouts/
This is the assumed default operating mode.
If a message comes in on a topic payment
, and two functions exist:stripe-fn
and datalake-fn
both with: topic: payment
.
In the case that datalake-fn
fails and stripe-fn
passes, following the logic above, retrying the whole batch may issue a second payment via stripe-fn
.
The proposed solution in #26 would act this way.
#26 proposes that any response should indicate whether to acknowledge, delete, or do something else with a message. This works for a 1:1
mapping, but fails for a 1:*
mapping.
Have the status of all the invocations returned as a batch
The Invoke
method which is called by all connectors, when they receive messages could be extended to return a set of results. By returning a set of results the caller has to wait for N*max_timeout to decide whether a set of invocations were successful or not. This may be offset by using a Go channel, but the connector may still have to wait for all of the results to come in before making a decision.
This is what 2) would look like with the example in the connector-sdk GitHub repo:
Before:
After:
I would welcome input from anyone who is considering conditional acking, retrying or doing anything else with the results from the invocations.
Allow cancellation of topic map go routine
Once the topic map is being built in its own go routine, there should be some way to cancel / pause that work.
Why? Perhaps it needs to back-off for some reason, or the broker isn't available. Perhaps it should pause and restart at a later time.
Once started, there is no way to stop the topic map from being rebuilt.
Introduce a Context or cancellation channel to the SDK.
The map is rebuilt in this go routine using a timer:
https://github.com/openfaas/connector-sdk/blob/master/types/controller.go#L141
Then each result is published to the subscribers via this Go routine:
https://github.com/openfaas/connector-sdk/blob/master/types/controller.go#L102
It's a best practice for Go APIs to provide cancellation.
Users have requested this in the past.
invokefunction
to return the response headersInvoke
to return the body, statusCode and errorsSuggested by Simon Pezcler and Rishabh Gupta so that connectors can do better debug and potentially use the result for something.
Right now the result is abandoned.
Trying to run the default tester fails due to a type mismatch on the auth creds.
It does whatever it's supposed to do when it runs successfully
$ go run cmd/tester/main.go
# command-line-arguments
cmd/tester/main.go:37:35: cannot use creds (type *"connector-sdk/vendor/github.com/openfaas/faas-provider/auth".BasicAuthCredentials) as type *"github.com/openfaas-incubator/connector-sdk/vendor/github.com/openfaas/faas-provider/auth".BasicAuthCredentials in argument to "github.com/openfaas-incubator/connector-sdk/types".NewController
I haven't had a chance to look into this in detail, but as the other connectors don't have this issue, assume it's just a dependency issue in the connector-sdk
tree.
I ran into this while developing a new connector but ultimately was able to get going with the MQTT and Kafka connectors as a reference instead. While it hasn't had a great impact, I don't believe there is anything extraordinary about my development environment, so others are likely to hit this as well.
I read all the docs but this was never mentioned. How are execution failures handled? What happens after the function that (for example) reacts to a Kafka message fails due to a runtime error? Will the message be lost? Do we have retries? What's the best practice for handling such situations?
Since the SDK is used as a library and dependency for building connectors, it should leave logging and error handling in the responsibility of the user of the SDK, e.g.
log.Fatal
calls, e.g. hereIt would be a blast to have the option to add HTTP Headers to the Invoke function. (To use Opentracing via Headers)
Currently Invoke()
only prints out errors but does not return any errors to the caller, i.e. the initiating connector. It is exactly-once messaging, which the docs are not clear about.
func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) {
...
log.Printf("Invoke function: %s", matchedFunction)
gwURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction)
reader := bytes.NewReader(*message)
body, statusCode, doErr := invokefunction(i.Client, gwURL, reader)
if doErr != nil {
log.Printf("Unable to invoke from %s, error: %s\n", matchedFunction, doErr)
return
}
There could be many reasons why an Invoke
would fail, e.g. OpenFaaS down/maintenance, network hiccups, issues with the receiving function (downstream down, bug), etc.
Returning an error would help the initiating connector to queue the event/call and retry as per its own needs.
I am using the mqtt-connector to send async requests, but it lacks some features, at least compared to HTTP async requests.
My test scenario is that a client (IoT device) is invoking a function asynchronously by sending some data, say temperature value, and its identifier. The client expects the function to run some processing based on that value and return the response to the client. However, the mqtt-connector just can receive the value as a message and no call-back can be introduced, because no headers exist as in HTTP. In HTTP, the gateway is passing the request to Nats with headers (we also can include additional headers in HTTP). I think the problem is that MQTT messages directly go to Nats and do not go inside the gateway so that massages become augmented with such features (call-back, etc.).
Async mqtt requests are expected to let Nats (or gateway--I'm not clear about the design) know about the callback and passing it to the function. Moreover, in HTTP, I needed to send a request identifier along with the request that was achievable by setting a header, but in MQTT this one is also not possible. The identifier in a header was sent back along with the callback to the client and the client was able to recognize which request is this response for.
The MQTT-connector needs to be able to receive callbacks and custom data as well as messages.
The Nats server needs to treat async MQTT requests as it does for HTTP request, I mean understanding callback and headers.
Check at: https://github.com/sponsors/openfaas
I am unsure about my understanding of OpenFaaS design, so just to give an idea, I am telling you some possible solutions.
Solution 1: Simulating HTTP headers in MQTT messages. For instance, a string pattern like "---header-X-Callback-Url:url---" inside the MQTT message. Then, once Nats receives an async MQTT message, it searches for the pattern and does the routine it does for HTTP requests, including passing the headers to queue-worker (if the routine is implemented in Nats server, not gateway).
Solution 2: Direct communication with Nats be enabled where MQTT clients do not need an MQTT broker and can just publish the request to Nats by a Nat client as encouraged in MQTT Nats. But, still sending callback and custom data remain unsolved. In this case, Nats can get the function execution output and publish it on a special subject in cae the client needs it. For instance, if the invocation subject is function/func1, then the reply from Nats is function/func1/reply.
Solution 3: As this missing callback (and other headers) might appear for other connectors (I have not tested), perhaps the best idea would be to give developers the async requests handling procedure in OpenFaaS so they can implement their own solutions. For example, instruction on how the requests are augmented by gateway (defining callback and other headers) when delivering to Nats (if a gateway to Nats communication exists in OpenFaaS); then how the Nats server is storing and augmenting the request and sending it to the queue-worker...
Solution 3
I really think giving a demonstration on how users can implement a procedure (from client to nats --> queue --> function --> queue --> client) for handling async requests will be helpful. For instance, how I can run a nats client that sends async requests to openfaas and receives the callback.
Mqtt-connector deployment
helm upgrade --install mqtt-connector mqtt-connector/ -n openfaas --namespace openfaas --set broker=tcp://10.0.0.90:1883 --set gateway_url=http://10.0.0.90:31112 --set upstream_timeout=35s --set asyncInvoke=true --set nodeSelector."kubernetes\.io/hostname"=master --set topic="/function/#"
Function annotation (my function name is 'w1-short' and does some random math and returns a string text)
topic: /function/w1-short
Queue-worker log (I can see that X-Callback-URL header is null)
Smart Agriculture and Smart Manufacturing, where IoT devices send requests with identifiers and expect a corresponding reply from the function. Devices also care about each request and need to know which one is missed, timeout, etc. In HTTP mode, they achieve these using headers, but in MQTT they are unable to do so.
* FaaS-CLI version ( Full output from: `faas-cli version` ):
CLI:
commit: d94600d2d2be52a66e0a15c219634f3bcac27318
version: 0.14.1
Gateway
uri: http://10.0.0.90:31112
version: 0.21.1
sha: a9a77f0ecaa8d5a242606ceaf6deed5d8703249b
Provider
name: openfaas-operator
orchestration: kubernetes
version: 0.14.1
sha: 3ab2d57899704498183c8fcb30a42b35d41de8a1
Docker version docker version
(e.g. Docker 17.0.05 ):
Are you using OpenFaaS on Kubernetes or faasd?
OpenFaaS on Kubernetes K3s
Operating System and version (e.g. Linux, Windows, MacOS):
Linux Arm v7
Code example or link to GitHub repo or gist to reproduce problem:
Other diagnostic information / logs from troubleshooting guide
package types
makes use of the errors.Wrap
function, which is an external dependency:
https://github.com/openfaas-incubator/connector-sdk/blob/ed620ed7ce564746683ad24664a791eda6b1f693/types/invoker.go#L58
Neither Gopkg.toml nor *.lock specify this package.
This leads to problems contributing to this package as contributors could potentially add wrong package versions:
dep status
Gopkg.lock is out of sync with imports and/or Gopkg.toml. Run `dep check` for details.
PROJECT MISSING PACKAGES
github.com/pkg/errors [github.com/pkg/errors]
input-digest mismatch
If a provider doesn't support namespaces, then the previous behaviour or a default behaviour should be used
As outlined by @bmcstdio, the code will throw a fatal error instead.
https://github.com/openfaas-incubator/connector-sdk/blob/master/types/controller.go#L156-L159
faas-netes is unaffected, but users wanting to bring in the latest release of the connector-sdk will encounter issues when used with the operator or faas-swarm.
Currently when invoking a function on a topic with controller.Invoke()
and there are no matching functions (probably due to function still being loaded, topic-map not refreshed) there is no way to acknowledge upstream. Consider using amqp which uses manual auto-ack, there is no way to determine the failure and do a negative ack.
faas-cli version
):Add support for multiple namespaces in Kubernetes
I would like to provide a wildcard in my Function Topic Annotation.
When creating a Function I would like to get all events with some sort of grouping. IE. Adding order.* in the topic annotation should match with a message that has a subject of order.Purchased
You have to be exact with your topic / subject. So the topicMap matches it.
Add basic globing syntax in the topicMap.Match Function.
This feature would be great when doing things such as Catch All + Message Observing as well as supporting my lazy programmer needs so I can listen to all messages from a particular Bounded Context.
Just tried with the latest "connector-sdk" library in my fork of nats-connector.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.