yomorun / yomo Goto Github PK
View Code? Open in Web Editor NEW🦖 Stateful Serverless Framework for Geo-distributed Edge AI Infra. with function calling support, write once, run on any model.
Home Page: https://yomo.run
License: Apache License 2.0
🦖 Stateful Serverless Framework for Geo-distributed Edge AI Infra. with function calling support, write once, run on any model.
Home Page: https://yomo.run
License: Apache License 2.0
Feedbacks from the community, developers consider writing Serverless
functions rather than implementing & maintaining microservices on the edge. Consider build a version on feat/sl
branch for this.
In order to decrease the code complexity, we need to remove the customized QUIC interfaces in core/quic
, and directly use quic-go as QUIC client & server.
See skeleton
project for reference.
After running yomo dev
for 2 minutes, the cpu is 100%
Currently, zipper and sfn creates individual QUIC streams for each data, we need to replace it with one persistent QUIC stream.
The stream.Write()
should also add lock to prevent concurrent writing in the same QUIC stream.
By getting rid of Go plugins, is friendly to developers:
$GOPATH
yomo dev xxx.go
It's inconvenient to use the current logger, we need to replace it with the one from skeleton project.
Add contributing file about yomo project
Run yomo dev
on Manjaro, failed to sufficiently increase receive buffer size
error.
~/.../yomo/example >>> ../bin/yomo dev app.go
2020/12/21 13:01:37 Building the Serverless Function File...
2020/12/21 13:01:42 failed to sufficiently increase receive buffer size (was: 208 kiB, wanted: 2048 kiB, got: 416 kiB). See https://github.com/lucas-clemente/quic-go/wiki/UDP-Receive-Buffer-Size for details.
2020/12/21 13:01:42 QUIC Server listens on 0.0.0.0:4242
^C
~/.../yomo/example >>> cat /etc/lsb-release
DISTRIB_ID=ManjaroLinux
DISTRIB_RELEASE=20.2
DISTRIB_CODENAME=Nibia
DISTRIB_DESCRIPTION="Manjaro Linux"
Run sudo sysctl -w net.core.rmem_max=2500000
to solve this problem followed by quic-go wiki
Count the amount of data frame
when zipper
receiving the data frame
from source
, and record it in a local file.
on 170ca40
Currently, zipper is using Rx to dispatch the data from source
to stream-functions
, we need to remove Rx in zipper and use a clean way to dispatch the data.
When developers write Rx code, logic can be implemented easily and quickly, when they run yomo dev
, they get test data streams directly, especially for user onboarding. after v2.0
, we consider provide a way for custom data types.
Currently, zipper
and sfn
are communicating with raw bytes, we need to replace it with Data Frame
.
Streaming is not like HTTP's Request/Response, by introducing ReactiveX, developers could write concise codes.
Consider rewrite YoMo official website by Nextra.
In order to debug the QUIC
transport information, we need to set QUIC_GO_LOG_LEVEL
environment variable in CLI.
yomo-zipper
should be immutable, the stream functions
shouldn't change the raw stream.flow
and sink
as stream function
, it will be executed one by one by zipper.When running the CLI
, we need to print UDP Receive Buffer Size
in the current OS and validate if it doesn't fit the recommended size.
also from the community, related to #58 , developers can save more time when they can yomo run app.go
also from the community, related to #58. Developers can chain-up their yomo-flows
by yomo run workflow.yaml
As YoMo Server
can be started by the -m
parameter for loading a remote configuration, a YoMo Server
w/o functions
section should work.
When functions
section present, only listed yomo-functions
can be connected to.
when I use cmd "yomo dev", I got some problem
1.mkdir demo
2.open my IDE goland to open my demo project
3.go mod tidy
4.yomo dev
流处理使用io.CopyN,当设置较少的拷贝字节数时,会出现消息阻塞和消息切断的现象。
Prj/Yomo >> run.go >> RunDev >> 修改io.CopyN参数值:
io.CopyN(pluginStream.Writer, deStream1.Reader, 64)
Prj/yomo-echo-plugin >> go run echo.go, 出现消息阻塞和消息切断:
name:yomo!-new
name:yom
设置的值越小则情况越严重,甚至一开始就出现阻塞
后续测试: 默认1024的值也会出现消息阻塞和消息切断,应该是处理消息量达到一定程度就会出现该现象。
io.CopyN(pluginStream.Writer, deStream1.Reader, 64)
In some cases, when the connection was interrupted between yomo-zipper
and yomo-flow
, a new stream will be created. And currently yomo-zipper
keeps multiple streams for all yomo-flow
, yomo-sink
and yomo-source
.
It causes a performance issue when there are too many streams between yomo-zipper
and yomo-flow/sink
.
But actually, yomo-zipper
should only support multiple streams for yomo-source
, and one stream for yomo-flow
/ yomo-sink
.
It is difficult and confused for users to understand. the source/flow/sink concepts in YoMo.
It's better to simplify the naming of source/flow/sink/zipper.
Connectors
provides the common and reusable services for input/output data in YoMo.
The users can import the YoMo SDK to implement the input/output connector and add the main()
function. Then you can also run the connectors
by Go cli, f.e. go run main.go
.
Input
connector sends the input
data from the external system (f.e. IoT Sensors) to yomo
.
source
to input connector
.Output
connector sends the output
data from yomo
to an external system (f.e. database).
sink
to output connector
.Stream Functions
are the streaming Serverless
functions in YoMo, it contains a Handler
function which the users can write the business logic there.
The users can use yomo cli
to run the function
locally or deploy it to YCloud
.
flow
to stream function
.YoMo Server
is an orchestrator which receives the data from sources (input connectors) and routes the data to Stream Functions
one by one. The result of real-time stream computing will be sent to Output Connectors
.
zipper
to yomo server
.There are two scenarios in sfn
:
sfn
receives data from zipper
. In this case, we don't need to use Rx.We need to remove Rx in sfn
and use a different way to support both raw bytes
and rx stream
.
I thought it would be straightforward, but I'm facing some difficulties...
I have a data source app that sends 5 packets encoded with 5 different keys:
func generateAndSendData(stream quic.Stream) {
keys := []byte{0x10, 0x11, 0x13, 0x7E, 0x7F}
for i, key := range keys {
time.Sleep(100 * time.Millisecond)
codec := y3.NewCodec(key)
sendingBuf, _ := codec.Marshal(int64(i))
_, err := stream.Write(sendingBuf)
if err != nil {
log.Printf("Couldn't send buffer with i=%v", i)
} else {
fmt.Print(".")
}
}
}
And the flow app that has 5 streams that are subscribed to each individual key and then zipped together:
var zipper = func(_ context.Context, a interface{}, b interface{}) (interface{}, error) {
accumulator, ok := a.([]interface{})
if !ok {
fmt.Printf("No accumulator: %v + %v\n", a, b)
return []interface{}{a, b}, nil
}
fmt.Printf("With accumulator: %v + %v\n", accumulator, b)
accumulator = append(accumulator, b)
return accumulator, nil
}
var convert = func(v []byte) (interface{}, error) {
fmt.Printf("Got: %v\n", v)
return y3.ToInt64(v)
}
// Handler will handle data in Rx way
func Handler(rxstream rx.RxStream) rx.RxStream {
streamA:= rxstream.Subscribe(0x10).OnObserve(convert)
streamB:= rxstream.Subscribe(0x11).OnObserve(convert)
streamC:= rxstream.Subscribe(0x13).OnObserve(convert)
streamD:= rxstream.Subscribe(0x7E).OnObserve(convert)
streamE:= rxstream.Subscribe(0x7F).OnObserve(convert)
return streamA.
ZipFromIterable(streamB, zipper).
ZipFromIterable(streamC, zipper).
ZipFromIterable(streamD, zipper).
ZipFromIterable(streamE, zipper).
StdOut().
Encode(0x11)
}
The problem, is that only the first two packets sent by the source are received on the flow. The order of zipping doesn't matter, the order of sending by the source matters only, for what I can see...
What can I do? (Also, I'm zipping sequentially, because there's no Merge operator... Is it really missing?)
on 9a46852: for WebSocket
/ Crawler
scenario, applications always running on JSON codec, implement basic support and give examples for users get there easily.
As many CLI can do like this, developers love copy & past 😄
quick start demo make cpu 100%
yomo.Run
We will rename source/flow/sink in #173, and it's also not needed to place sinks
in workflow.yaml
.
sinks
flows
to functions
output
connectors to yomo
and receive the output data in parallel.name: Service
host: localhost
port: 9000
functions:
- name: func1
- name: func2
The input/output connectors in YoMo.
RunDev
support OutputFormatter: HexString/PacketPrinter/EchoData/ThermometerDataFor edge-mesh
feature, YoMo-Zipper will be geo-distributed and closer to end users, all YoMo-Zippers need to communicate with each other.
Each YoMo-Zipper is both a Sender
and a Receiver
:
Sender
: send the output stream
to other yomp-zippers in edge-mesh
network.Receiver
: receive the input stream
from other yomp-zippers.HandshakeFrame
PayloadFrame
with MetaFrame
and DataFrame
TransactionID
to MetaFrame
, improve traceabilityabout Server-Sent Events: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
无法成功执行执行demo中echo.go
1.在$HOME/go/yomo中执行go mod init yomo
2.执行go get -u github.com/yomorun/yomo
3.按文档创建echo.go
4.执行go run echo.go
./echo.go:16:14: cannot use &EchoPlugin literal (type *EchoPlugin) as type plugin.YomoObjectPlugin in argument to yomo.RunDev:
*EchoPlugin does not implement plugin.YomoObjectPlugin (missing Mold method)
Developers can install yomo by brew install yomo
When source
, sfn
, and upstream zipper
are connected/disconnected to zipper, we need to log them in Info
level.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.