Giter Site home page Giter Site logo

nats-io / nats.go Goto Github PK

View Code? Open in Web Editor NEW
5.3K 170.0 674.0 4.45 MB

Golang client for NATS, the cloud native messaging system.

Home Page: https://nats.io

License: Apache License 2.0

Go 99.92% Shell 0.06% Dockerfile 0.02%
go golang nats microservices microservices-architecture pub-sub cloud-native cloud-native-architectures cloud-native-microservices

nats.go's People

Contributors

0xflotus avatar aricart avatar bruth avatar codegangsta avatar colinsullivan1 avatar derekcollison avatar gcolliso avatar ido-gold-apolicy avatar jarema avatar jduhamel avatar jnmoyne avatar josephwoodward avatar kozlovic avatar krobertson avatar matthewdevenny avatar matthiashanel avatar mkorolyov avatar msoap avatar neilalexander avatar nsurfer avatar nussjustin avatar piotrpio avatar pires avatar ripienaar avatar scottf avatar tbeets avatar teh-cmc avatar tylertreat avatar variadico avatar wallyqs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nats.go's Issues

Unsubscribe does not allow consumption of already fetched messages

Perhaps there is a good reason for this, but it appears that Unsubscribe sets it's message channel to nil which is potentially a destructive action.

Unless there is another way to consume messages that have already been fetched by the client. I would expect the channel to be closed to prevent future writes but to remain available for reads.

s.unsubscribe()
...consume remaining messages in channel

I am working on a PR that addresses this. I am curious though if this was intended behavior and if I am misusing it.

ACK subscription hitting consumer too slow in pub-req

Hello,

This is more of an FYI and follow up question, but I expect others will run into this as well.

The docs show an example like..

// Requests
var response string
err := nc.Request("help", "help me", &response, 10*time.Millisecond)

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

the request and reply use the same connection, which generally works fine. But in my stress tests the consumer wasn't firing fast enough, and the publish couldn't send anymore messages since "Slow Consumer Detected".

The solution was to have two separate connections to the nats server, one for the producers and one for the consumers. This solved my problem, but it makes me think if the nats client should be connection pooling?

reconnet problem and fix

First of all, sorry for my English, I am not a English speaker..

This bug will happen when client try to reconnect to a new server, I have 20 clients to connect to 3 gnatsd servers, when I switch gnatsd service in 3 servers (kill and start which clients connected to ) client connections become less and less....

reconnect will do

  1. run readLoop, this function will check health of connection between client and cluster.
  2. check nc is Closed or reconnect and nc.conn == nil, sometimes because of latency create connection will spend a bit more time, and then:
    code๏ผš
    "
    if sb || conn == nil {
    break
    }
    "
    will make function readLoop exit before reconnect success.

I change "break" to "continue". I try to switch service between 3 gnatsd servers, reconnect success 100%.

Order or lack thereof of callbacks

It seems a bit weird that in most cases I will see a ClosedCB and maybe a ReconnectedCB before the DisconnectedCB. Because DCB is async, there is no order guarantee and it makes it difficult to actually do anything when you do see it since you cannot easily determine a reliable sequencing between the various callbacks.

goroutine leaking when calls BindRecvChan

in netchan.go, the caller can pass a channel which reads data back via Subscribe on a topic.
https://github.com/apcera/nats/blob/master/netchan.go#L54

func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) error {

in the underlying implementation, nats defines a callback (i.e. cb) and then call the Subscribe from a conn
https://github.com/apcera/nats/blob/master/netchan.go#L81

_, err := c.Conn.subscribe(subject, queue, cb)

in the conn.subscribe, nats spawns a go routine to send the message:
https://github.com/apcera/nats/blob/master/nats.go#L1198

go nc.deliverMsgs(sub.mch)

and that go routine will be terminated "only if sub.mch is closed", which is called via conn.Unsubscribe().
https://github.com/apcera/nats/blob/master/nats.go#L1244

func (nc *Conn) unsubscribe(sub *Subscription, max int) error {

The bug actually happened, when we look BindRecvChan again: even the caller has closed the channel, the go routine who serves deliverMsg will never be destroyed. We actually have an additional zombie go routine when we call BindReceChan every time.

Possible problem with flusher() not exiting

I saw an issue where the flusher() seem not to exit. A test case running with "-race" also often shows this:

Ivans-MacBook-Pro:nats ivan$ go test -race -v -run AuthServers

=== RUN TestAuthServers

WARNING: DATA RACE
Write by goroutine 6:
github.com/nats-io/nats.(_Conn).processConnectInit()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:517 +0x1ad
github.com/nats-io/nats.(_Conn).connect()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:546 +0x23e
github.com/nats-io/nats.Options.Connect()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:263 +0x1e5
github.com/nats-io/nats.TestAuthServers()
/Users/ivan/dev/go/src/github.com/nats-io/nats/cluster_test.go:113 +0x265
testing.tRunner()
/usr/local/Cellar/go/1.5/libexec/src/testing/testing.go:456 +0xdc

Previous read by goroutine 11:
github.com/nats-io/nats.(*Conn).flusher()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:1066 +0x7a

Goroutine 6 (running) created at:
testing.RunTests()
/usr/local/Cellar/go/1.5/libexec/src/testing/testing.go:561 +0xaa3
testing.(*M).Run()
/usr/local/Cellar/go/1.5/libexec/src/testing/testing.go:494 +0xe4
main.main()
github.com/nats-io/nats/_test/_testmain.go:268 +0x20f

Goroutine 11 (running) created at:
github.com/nats-io/nats.(*Conn).spinUpSocketWatchers()

/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:466 +0xa0

--- FAIL: TestAuthServers (2.02s)
cluster_test.go:120: Wrong error, wanted Auth failure, got 'nats: Timeout'
FAIL
exit status 1
FAIL github.com/nats-io/nats 2.030s

Should we have a CONNECTING status?

When attempting to connect, the CONNECTED status is set early on. If for any reason during the connect and error is triggered (by processOpErr()), this may trigger a reconnect attempt, while we were in the middle of a connect. Maybe if we were using a CONNECTING status, we could use this in processOpErr() to simply return and not try to reconnect, since an error during connect() would correctly return the wrong status (and move on to the next on the server pool).

Expose ability to reconnect

I'd like to be able to get the client to reconnect to the server. Our use case is to rebalance connections to a cluster that's behind HAProxy.

Rename to nats-client-go

Just a thought: if the repo only has a client in it, perhaps include that in the project name. If it is a go client, perhaps include that in the project name.

Or are you also rewriting the nats server in Go? Going to make it distributed?

Better capture pending writes when getting disconnected

Imagine doing nc.Publish() in a loop. The server is stopped/restarted and the library tries to reconnect. The pending buffer is created only in doReconnect(). That leaves a window where nc.bw.write() calls may accumulate to the point of causing a flush (socket write) which will likely result in an error returned from the nc,Publish() call.
Also, the use of a buffered writer may cause situations where the buffer is flushed and some left over is then appended to the now empty buffer. If a reconnect occurs at the "right" time, this data will then be sent to the server, which may cause a parse error (if what was in the buffer was a partial), which would result in another reconnect.

TLS/SSL Support

Client should support secure communications to the server.

Panic on 386 systems

2015/09/14 15:19:26 http: panic serving 127.0.0.1:34640: runtime error: invalid memory address or nil pointer dereference
goroutine 19 [running]:
net/http.(_conn).serve.func1(0x18902060, 0xf76aa860, 0x188fc040)
/opt/go/src/net/http/server.go:1287 +0xa2
sync/atomic.AddUint64(0x1887b964, 0x1, 0x0, 0x188fe240, 0x4)
/opt/go/src/sync/atomic/asm_386.s:112 +0xc
github.com/nats-io/nats.(_Conn).subscribe(0x1887b880, 0x1892c0f0, 0x21, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0)
/home/philipp/projects/go/src/github.com/nats-io/nats/nats.go:1298 +0x23d
github.com/nats-io/nats.(*Conn).Request(0x1887b880, 0x188ce620, 0x19, 0x188f6320, 0x12, 0x20, 0x540be400, 0x2, 0x0, 0x0, ...)
/home/philipp/projects/go/src/github.com/nats-io/nats/nats.go:1254 +0x6f

It's a known issue on 386 system that handle 64bit numbers: golang/go#6404

Error in Publish after JSON unmarshal error in subscribed message

When using EncodedConn with JSON encoding, a decoding error will set Conn.err: https://github.com/nats-io/nats/blob/ef5165913ac88c53c2e2be39f73f8314490ea5dd/enc.go#L196

After err is set, publishing messages will not succeed because publish() will abort if a global error exists:
https://github.com/nats-io/nats/blob/ef5165913ac88c53c2e2be39f73f8314490ea5dd/nats.go#L1216

Use this test code to reproduce:

package main

import (
    "fmt"
    "github.com/nats-io/nats"
    "time"
)

func main() {
    nc, _ := nats.Connect("nats://localhost:4222")
    c, _ := nats.NewEncodedConn(nc, "json")

    c.Subscribe("test", func(msg interface{}) {
        fmt.Println("Received", msg)
    })
    time.Sleep(500 * time.Millisecond)

    toPublish := [][]byte{
        []byte(`{"msg": 1}`),
        []byte(`{"msg": 2}`),
        []byte(`foo`),
        []byte(`{"msg": 4}`),
        []byte(`{"msg": 5}`),
    }

    for i, data := range toPublish {
        err := nc.PublishMsg(&nats.Msg{
            Subject: "test",
            Data:    data,
        })
        fmt.Printf("Published #%d %s, error: %v\n", i, data, err)
        time.Sleep(500 * time.Millisecond)

    }

}

Also, I wonder how exactly the error handling is meant to work. If i do a Publish() (or any other method), I'd expect to only receive errors regarding the current operation, not any global errors. On the other hand, when the connection to gnatsd goes away, Publish() while the client ist disconnected will succeed without returning an error although the message was not sent to gnatsd.

ReadLine() does not guarantee that the full line is read, which may cause parser to error out

When receiving the INFO protocol message, the call to ReadLine() assumes that the line will be received entirely, or the "pre" boolean will be true should the buffer be not big enough.
Actually, ReadLine() may return with a partial read, which if it contains at least "INFO" would make processExpectedInfo() to return ok, but then would make the readLoop fail on processing the remaining of the protocol.
This can be demonstrated by having gnatsd's sendInfo() send part of the buffer, pause for a second and then send the rest of the buffer.

Anonymous sync.Mutex in Conn structure

The Conn structure uses anonymous sync.Mutex so Conn.Lock() is exposed, can see in godoc. Should not be exposed if it should be used only by the internal implementation but not by the user application.

the subscribe cannot receive more than one message

Note that this issue is related to #22

the commit ( c0bed13) actually solve the go routine leak problem, but we have another problem now:
we cannot receive more than one message using "BindRecvChan"
I also found that in c0bed13#diff-386dae9aaed25b55f29b68c9c0015aa6R79
you actually do

defer func() {
  m.Sub.Unsubscribe()
  recover()
}()

so as far as I can understand, you actually do unsubscribtion and then do recover( this recover is supposed to recover the error of sending message to a closed channel)

How about this:

defer func(){
    if r := recover(); r != nil {
      m.Sub.Unsubscribe()
    }
 }()

, which when the chan is being closed, you can do unsubscribe then.

Reconnect logic flawed when servers in the list require authentication

As I was trying to change the C client to do connect asynchronously, it occurred to me that there is a problem due to the asynchronous nature during the reconnect process in the GO client.
If the client is connected to a server, and one in the list requires authentication while the client does not have it (or does not match), then when the client is disconnected and try to connect to that server, as long as the TCP connection succeeds and the expected INFO is ok, it will assume that it is connected. In reality, when the readLoop is started, it will get a -ERR with Authorization Violation, which will cause the connection to be closed. If we were doing the (re)connect process "synchronous", then we would better handle this situation.

TestAuthFailToReconnect.txt

All async subscribers share Go routine

All async subscribers share a single Go routine for callbacks, making it fairly easy to starve the msg delivery if you are inside a callback waiting for another callback. They should probably be bound to a Subscription.

Reconnect Logic

The client should be able to auto-reconnect similar to the ruby client with options that can suppress the behavior and dictate how long between attempts and how many attempts total before giving up.

make nats.Conn as an interface

[ actually it is not an issue, more like a new feature ]

The current design is very clean, I can easy trace it.
However, I have such requirement that I want to use my own nats.Conn in the EncodedConn.
But the current implementation forbids me to do this,
In specifically, nats.Conn is implemented by a well-defined structure.

// A Conn represents a bare connection to a nats-server. It will send and receive
// []byte payloads.
type Conn struct {
...
}

and an EncodedConn is based on that Conn and provides an encoding support like JSON.

// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
// a nats server and have an extendable encoder system that will encode and decode messages
// from raw Go types.
type EncodedConn struct {
Conn *Conn
Enc Encoder
}

Since EncodedConn use directly the type of nats.Conn, I cannot do such wrap.

Therefore, my idea is I will make such change and send a PR to you if you have such requirement as well.
Any suggestions?

Client is stuck in the readLoop

Looks like we might need to set a read deadline in the client. All we know is that this client is dead to NATs but its not budging off the read.

goroutine 55 [IO wait, 317 minutes]:
net.runtime_pollWait(0x7f4827bc75f8, 0x72, 0x0)
/usr/local/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(_pollDesc).Wait(0xc208085100, 0x72, 0x0, 0x0)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(_pollDesc).WaitRead(0xc208085100, 0x0, 0x0)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(_netFD).Read(0xc2080850a0, 0xc208122000, 0x8000, 0x8000, 0x0, 0x7f4827bc6418, 0xb)
/usr/local/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(_conn).Read(0xc20802c120, 0xc208122000, 0x8000, 0x8000, 0x0, 0x0, 0x0)
/usr/local/go/src/pkg/net/net.go:122 +0xe7
github.com/apcera/nats.(_Conn).readLoop(0xc20802ec00)
/var/vcap/packages/stager/src/github.com/apcera/nats/nats.go:865 +0x24c
created by github.com/apcera/nats.(_Conn).spinUpSocketWatchers
/var/vcap/packages/stager/src/github.com/apcera/nats/nats.go:428 +0x9f

No token based auth support

The gnatsd server supports token based auth via a token_auth json field, however the client does not support this. PR incoming...

Need to seed random to create unique INBOXes

Go by default seeds random as Seed(1). Need to seed it using unique seed to use rand() when creating INBOXes.

Additionally, it may be a good idea to embed IP and PID into INBOX to ensure uniqueness and not rely on randoms only.

check if key exists

Can I check if a data exists in the system?

example: "func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (m *Msg, err error)"
Request("subject", string("data"),...) check if "data" exists.

Proper Locking

Current locking scheme protects writes to the socket and buffer but does not protect subscription and other state in case they are changed in multiple Go routines (subscribe, flush, etc).

Readme.md code has a bug.

While I know the Readme sample code is just sample. It is incorrect. The line that reads
err := nc.Request("help", "help me", &response, 10*time.Millisecond)

should be
err := c.Request("help", "help me", &response, 10*time.Millisecond)

also (very minor nit.) err is defined but not used. ;)

Close on Stale Connection

Currently all errors are treated as hard closes for the client. I can see where it makes sense for most of the failures, e.g., Authn/Authz. However, for something like Stale Connection it seems a bit extreme. I was wondering if it would be ok to reconnect on a Stale Connection.

Payload size

The gnatsd enforces a maximum payload size. In looking at the gnatsd debugging output, I can see that a message I am publishing exceeds this limit. But the call to PublishMsg I'm making does not return an error.

Am I correct that nats doesn't detect when the server rejects a message (at least based on side)? If so, is there some reason that the client doesn't/can't check for this?

Callbacks are triggered when satisfied but not guaranteed to still be true

Both the reconnected and disconnected callbacks are invoked outside the Connection lock. By doing so, the state of the connection may be different when actually invoked. In most cases, the connection has been closed which leads to no harm. However, in the case of the reconnect, another reconnect can occur since the socketwatchers have been spun up already.

"go test" for nats: 5 test failures against gnatsd

5e5b726 : on OSX 10.10.2/amd64. go1.5.1.

Five nats tests fail out of the box against gnatsd. Details below.

NB, I didn't want to mess with ruby and the ruby nats server, so I changed conn_test to use gnatsd. Shouldn't this be the default since the ruby version is deprecated?

failed tests:

--- FAIL: TestServerSecureConnections (10.04s)
    conn_test.go:47: Timed out trying to connect to gnatsd
...
--- FAIL: TestServerAutoUnsub (0.00s)
    sub_test.go:29: Received 0 msgs, wanted only 10
...
--- FAIL: TestClientASyncAutoUnsub (0.00s)
    sub_test.go:75: Received 0 msgs, wanted only 10
...
--- FAIL: TestAsyncErrHandler (2.54s)
    sub_test.go:197: Failed to call async err handler
...
--- FAIL: TestAsyncSubscribersOnClose (0.01s)
    sub_test.go:259: Expected only one callback, received 0 callbacks
...

my modification to use gnatsd:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ git diff
WARNING: terminal is not fully functional
-  (press RETURN) 
diff --git a/conn_test.go b/conn_test.go
index 883ff76..619c461 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -10,7 +10,7 @@ import (
        "time"
 )

-const natsServer = "nats-server"
+const natsServer = "gnatsd"

 type server struct {
        args []string
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $

the full test run:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ go test -v
=== RUN   TestAuthServerStart
--- PASS: TestAuthServerStart (0.06s)
=== RUN   TestAuthConnectionFail
--- PASS: TestAuthConnectionFail (0.00s)
=== RUN   TestAuthConnectionSuccess
--- PASS: TestAuthConnectionSuccess (0.00s)
=== RUN   TestAuthServerStop
--- PASS: TestAuthServerStop (0.00s)
=== RUN   TestServersRandomize
--- PASS: TestServersRandomize (0.00s)
=== RUN   TestServersOption
--- PASS: TestServersOption (0.13s)
=== RUN   TestAuthServers
--- PASS: TestAuthServers (0.12s)
=== RUN   TestSelectNextServer
--- PASS: TestSelectNextServer (0.00s)
=== RUN   TestBasicClusterReconnect
--- PASS: TestBasicClusterReconnect (0.12s)
=== RUN   TestHotSpotReconnect
--- PASS: TestHotSpotReconnect (5.35s)
=== RUN   TestProperReconnectDelay
--- PASS: TestProperReconnectDelay (1.07s)
=== RUN   TestProperFalloutAfterMaxAttempts
--- PASS: TestProperFalloutAfterMaxAttempts (0.82s)
=== RUN   TestTimeoutOnNoServers
--- PASS: TestTimeoutOnNoServers (0.17s)
=== RUN   TestPingReconnect
--- PASS: TestPingReconnect (1.08s)
=== RUN   TestDefaultConnection
--- PASS: TestDefaultConnection (0.06s)
=== RUN   TestConnectionStatus
--- PASS: TestConnectionStatus (0.00s)
=== RUN   TestConnClosedCB
--- PASS: TestConnClosedCB (0.00s)
=== RUN   TestCloseDisconnectedCB
--- PASS: TestCloseDisconnectedCB (0.00s)
=== RUN   TestServerStopDisconnectedCB
--- PASS: TestServerStopDisconnectedCB (0.00s)
=== RUN   TestRestartServer
--- PASS: TestRestartServer (0.06s)
=== RUN   TestServerSecureConnections
--- FAIL: TestServerSecureConnections (10.04s)
    conn_test.go:47: Timed out trying to connect to gnatsd
=== RUN   TestClosedConnections
--- PASS: TestClosedConnections (0.00s)
=== RUN   TestErrOnConnectAndDeadlock
--- PASS: TestErrOnConnectAndDeadlock (0.00s)
=== RUN   TestErrOnMaxPayloadLimit
--- PASS: TestErrOnMaxPayloadLimit (0.00s)
=== RUN   TestConstructorErrs
--- PASS: TestConstructorErrs (0.00s)
=== RUN   TestMarshalString
--- PASS: TestMarshalString (0.00s)
=== RUN   TestMarshalBytes
--- PASS: TestMarshalBytes (0.00s)
=== RUN   TestMarshalInt
--- PASS: TestMarshalInt (0.00s)
=== RUN   TestMarshalInt32
--- PASS: TestMarshalInt32 (0.00s)
=== RUN   TestMarshalInt64
--- PASS: TestMarshalInt64 (0.00s)
=== RUN   TestMarshalFloat32
--- PASS: TestMarshalFloat32 (0.00s)
=== RUN   TestMarshalFloat64
--- PASS: TestMarshalFloat64 (0.00s)
=== RUN   TestMarshalBool
--- PASS: TestMarshalBool (0.00s)
=== RUN   TestExtendedSubscribeCB
--- PASS: TestExtendedSubscribeCB (0.00s)
=== RUN   TestExtendedSubscribeCB2
--- PASS: TestExtendedSubscribeCB2 (0.00s)
=== RUN   TestRawMsgSubscribeCB
--- PASS: TestRawMsgSubscribeCB (0.00s)
=== RUN   TestEncRequest
--- PASS: TestEncRequest (0.00s)
=== RUN   TestEncRequestReceivesMsg
--- PASS: TestEncRequestReceivesMsg (0.00s)
=== RUN   TestAsyncMarshalErr
--- PASS: TestAsyncMarshalErr (0.00s)
=== RUN   TestEncodeNil
--- PASS: TestEncodeNil (0.00s)
=== RUN   TestDecodeDefault
--- PASS: TestDecodeDefault (0.00s)
=== RUN   TestGobMarshalString
--- PASS: TestGobMarshalString (0.00s)
=== RUN   TestGobMarshalInt
--- PASS: TestGobMarshalInt (0.00s)
=== RUN   TestGobMarshalStruct
--- PASS: TestGobMarshalStruct (0.00s)
=== RUN   TestJsonMarshalString
--- PASS: TestJsonMarshalString (0.00s)
=== RUN   TestJsonMarshalInt
--- PASS: TestJsonMarshalInt (0.00s)
=== RUN   TestJsonMarshalStruct
--- PASS: TestJsonMarshalStruct (0.00s)
=== RUN   TestNotMarshableToJson
--- PASS: TestNotMarshableToJson (0.00s)
=== RUN   TestFailedEncodedPublish
--- PASS: TestFailedEncodedPublish (0.00s)
=== RUN   TestDecodeConditionals
--- PASS: TestDecodeConditionals (0.00s)
=== RUN   TestCloseLeakingGoRoutines
--- PASS: TestCloseLeakingGoRoutines (0.02s)
=== RUN   TestConnectedServer
--- PASS: TestConnectedServer (0.00s)
=== RUN   TestMultipleClose
--- PASS: TestMultipleClose (0.00s)
=== RUN   TestBadOptionTimeoutConnect
--- PASS: TestBadOptionTimeoutConnect (0.00s)
=== RUN   TestSimplePublish
--- PASS: TestSimplePublish (0.00s)
=== RUN   TestSimplePublishNoData
--- PASS: TestSimplePublishNoData (0.00s)
=== RUN   TestAsyncSubscribe
--- PASS: TestAsyncSubscribe (0.00s)
=== RUN   TestSyncSubscribe
--- PASS: TestSyncSubscribe (0.00s)
=== RUN   TestPubSubWithReply
--- PASS: TestPubSubWithReply (0.00s)
=== RUN   TestFlush
--- PASS: TestFlush (0.00s)
=== RUN   TestQueueSubscriber
--- PASS: TestQueueSubscriber (0.00s)
=== RUN   TestReplyArg
--- PASS: TestReplyArg (0.00s)
=== RUN   TestSyncReplyArg
--- PASS: TestSyncReplyArg (0.00s)
=== RUN   TestUnsubscribe
--- PASS: TestUnsubscribe (0.00s)
=== RUN   TestDoubleUnsubscribe
--- PASS: TestDoubleUnsubscribe (0.00s)
=== RUN   TestRequestTimeout
--- PASS: TestRequestTimeout (0.01s)
=== RUN   TestRequest
--- PASS: TestRequest (0.00s)
=== RUN   TestRequestNoBody
--- PASS: TestRequestNoBody (0.00s)
=== RUN   TestFlushInCB
--- PASS: TestFlushInCB (0.00s)
=== RUN   TestReleaseFlush
--- PASS: TestReleaseFlush (0.00s)
=== RUN   TestInbox
--- PASS: TestInbox (0.00s)
=== RUN   TestStats
--- PASS: TestStats (0.00s)
=== RUN   TestRaceSafeStats
--- PASS: TestRaceSafeStats (0.20s)
=== RUN   TestBadChan
--- PASS: TestBadChan (0.02s)
=== RUN   TestSimpleSendChan
--- PASS: TestSimpleSendChan (0.00s)
=== RUN   TestFailedChannelSend
--- PASS: TestFailedChannelSend (0.00s)
=== RUN   TestSimpleRecvChan
--- PASS: TestSimpleRecvChan (0.00s)
=== RUN   TestQueueRecvChan
--- PASS: TestQueueRecvChan (0.00s)
=== RUN   TestDecoderErrRecvChan
--- PASS: TestDecoderErrRecvChan (0.00s)
=== RUN   TestRecvChanPanicOnClosedChan
--- PASS: TestRecvChanPanicOnClosedChan (0.00s)
=== RUN   TestRecvChanAsyncLeakGoRoutines
--- PASS: TestRecvChanAsyncLeakGoRoutines (0.05s)
=== RUN   TestRecvChanLeakGoRoutines
--- PASS: TestRecvChanLeakGoRoutines (0.10s)
=== RUN   TestRecvChanMultipleMessages
--- PASS: TestRecvChanMultipleMessages (0.01s)
=== RUN   TestReconnectTotalTime
--- PASS: TestReconnectTotalTime (0.00s)
=== RUN   TestReconnectDisallowedFlags
--- PASS: TestReconnectDisallowedFlags (0.06s)
=== RUN   TestReconnectAllowedFlags
--- PASS: TestReconnectAllowedFlags (0.56s)
=== RUN   TestBasicReconnectFunctionality
--- PASS: TestBasicReconnectFunctionality (0.68s)
=== RUN   TestExtendedReconnectFunctionality
--- PASS: TestExtendedReconnectFunctionality (0.17s)
=== RUN   TestParseStateReconnectFunctionality
--- PASS: TestParseStateReconnectFunctionality (0.68s)
=== RUN   TestQueueSubsOnReconnect
--- PASS: TestQueueSubsOnReconnect (0.23s)
=== RUN   TestIsClosed
--- PASS: TestIsClosed (0.12s)
=== RUN   TestIsReconnectingAndStatus
--- PASS: TestIsReconnectingAndStatus (0.16s)
=== RUN   TestServerAutoUnsub
--- FAIL: TestServerAutoUnsub (0.00s)
    sub_test.go:29: Received 0 msgs, wanted only 10
=== RUN   TestClientSyncAutoUnsub
--- PASS: TestClientSyncAutoUnsub (0.00s)
=== RUN   TestClientASyncAutoUnsub
--- FAIL: TestClientASyncAutoUnsub (0.00s)
    sub_test.go:75: Received 0 msgs, wanted only 10
=== RUN   TestCloseSubRelease
--- PASS: TestCloseSubRelease (0.01s)
=== RUN   TestIsValidSubscriber
--- PASS: TestIsValidSubscriber (0.00s)
=== RUN   TestSlowSubscriber
--- PASS: TestSlowSubscriber (0.06s)
=== RUN   TestSlowAsyncSubscriber
--- PASS: TestSlowAsyncSubscriber (0.05s)
=== RUN   TestAsyncErrHandler
--- FAIL: TestAsyncErrHandler (2.54s)
    sub_test.go:197: Failed to call async err handler
=== RUN   TestAsyncSubscriberStarvation
--- PASS: TestAsyncSubscriberStarvation (0.00s)
=== RUN   TestAsyncSubscribersOnClose
--- FAIL: TestAsyncSubscribersOnClose (0.01s)
    sub_test.go:259: Expected only one callback, received 0 callbacks
=== RUN   TestNextMsgCallOnAsyncSub
--- PASS: TestNextMsgCallOnAsyncSub (0.00s)
=== RUN   TestStopServer
--- PASS: TestStopServer (0.00s)
FAIL
exit status 1
FAIL    github.com/nats-io/nats 25.020s
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ 

versions:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ uname -a
Darwin jasonsmac.local 14.1.1 Darwin Kernel Version 14.1.1: Thu Feb 26 22:41:49 PST 2015; root:xnu-2782.15.5~1/RELEASE_X86_64 x86_64
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ go version
go version go1.5.1 darwin/amd64
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ git log|head
commit 5e5b726926cb1640630fb80c3420187be0b45da8
Merge: f815164 5ad2583
Author: Derek Collison <[email protected]>
Date:   Sat Sep 5 10:27:09 2015 -0500

    Merge pull request #65 from wallyqs/delivered-compare-data-race
    ...

show that gnatsd is working:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ which gnatsd
/Users/jason/bin/gnatsd
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ gnatsd
[47224] 2015/09/12 13:00:45.594616 [INF] Starting gnatsd version 0.6.6
[47224] 2015/09/12 13:00:45.594703 [INF] Listening for client connections on 0.0.0.0:4222
[47224] 2015/09/12 13:00:45.594804 [INF] gnatsd is ready [ctrl-c]
  C-c C-c[47224] 2015/09/12 13:00:46.348888 [INF] Server Exiting..

show gnatsd version:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ cd ../gnatsd/
jason@jasonsmac ~/src/github.com/nats-io/gnatsd (master) $ git log
WARNING: terminal is not fully functional
-  (press RETURN)
commit 7620f4acc7eb832eac01fb7f0857a0ebaf0bcafb
Merge: 139ce42 093548c
Author: Derek Collison <[email protected]>
Date:   Thu Sep 10 11:18:46 2015 -0400

    Merge pull request #115 from wallyqs/update-max-payload-test
...

Subscribe("foo", nil) inconsistency between Conn and EncodedConn

EncodedConn.Subscribe("foo", nil) throws a nil pointer exception instead of deleting the subscription as documented in the README.

Conn.Subscribe("foo", nil) returns normally but does not delete the subscription.

Since there is a Subscription.Unsubscribe() then this 'nil' behavior should be avoided and either return an error in the error field or throw an exception. Looking into the Subscribe code there is some code that looks for a nil callback and some that throws an exception when reflecting.

Can't customize TLS connection configuration

In makeTLSConn() it hard codes the following for the TLS Client configuration:

&tls.Config{InsecureSkipVerify: true}

If there was an instance of tls.Config in the Opts structure, the makeTLSConn() could use it (with the default as above). The user could then override the TLS configuration to allow for custom certificates, specifying TLS versions, Cipher suites etc).

Reconnect has potential race conditions

When a Reconnect ocurrs, relying on state works for the one routine managing the state, but there are other active goroutines which may not see the state changes and will continue to operate under the wrong conditions.

doReconnect() calls createConn() which will establish a new connection (nc.conn) and buffered writer (nc.bw). readLoop could be line 868 waiting for the lock and grab the new connection rather than always using the old connection. It would make more sense to move line 873 out of the for loop and never obtain the conn again.
The flusher has a similar issue.

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x663cf]

goroutine 118 [running]:
runtime.panic(0x3adb80, 0x5fb184)
/usr/local/Cellar/go/1.3.3/libexec/src/pkg/runtime/panic.c:279 +0xf5
github.com/apcera/nats.(_Conn).parse(0xc208072400, 0xc2080a4000, 0x6, 0x8000, 0x0, 0x0)
/Users/fraenkel/workspace/diego-release/src/github.com/apcera/nats/parser.go:70 +0x12f
github.com/apcera/nats.(_Conn).readLoop(0xc208072400)
/Users/fraenkel/workspace/diego-release/src/github.com/apcera/nats/nats.go:905 +0x454
created by github.com/apcera/nats.(*Conn).spinUpSocketWatchers
/Users/fraenkel/workspace/diego-release/src/github.com/apcera/nats/nats.go:456 +0x157

TestRecvChanLeakGoRoutines racey with checking goroutines

Periodic failure seems to happen with 1 goroutine going away that it didn't expect.

=== RUN TestRecvChanLeakGoRoutines
--- FAIL: TestRecvChanLeakGoRoutines (0.11 seconds)
    netchan_test.go:226: Leaked Go routine(s) : -1, closing channel should have closed them

AutoUnsubscribe() leaves deliverMsgs() go routine running

Request() uses AutoUnsubscribe(1) and after the reply is received (or has timed-out), the subscription is closed: s.Unsubscribe(). This will close the channel and the go routine will fall out.

However, if an user uses AutoUnsubscribe() because he wants to receive at most 10 messages, and does not worry about how many messages have been received, then the go routine will stay even though it is not doing anything.

I wonder if deliverMsgs() should not exit when (delivered > max)

Subject of empty string will cause the nats connection closed

Hello,
I found that if I give an empty string to the subject parameter of "Publish","Request","Subscribe" function, the function will return no error.
However, operations like these will cause an ERR in gnatsd, and gnatsd will close the connection. After that, calling any function of the connection will return an error of "nats: Connection Closed", and the client won't do any reconnection attempt.
The test code following will cause this condition.

// testNats project main.go
package main

import (
    "fmt"
    "time"

    "github.com/nats-io/nats"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        panic(err)
    }
    defer nc.Close()
    //subscribe 'foo'
    handle := func(m *nats.Msg) {
        fmt.Println(string(m.Data))
    }
    if _, err := nc.Subscribe("foo", handle); err != nil {
        fmt.Println(err)
    }

    //these are invalid operation
    if _, err := nc.Subscribe("", handle); err != nil {
        fmt.Println(err)
    }
    if err := nc.Publish("", nil); err != nil {
        fmt.Println(err)
    }

    //wait for client to send invalid operation to gnatsd
    time.Sleep(time.Second)

    //publish something
    if err := nc.Publish("foo", []byte("bar")); err != nil {
        fmt.Println(err)
    }
    time.Sleep(time.Second)
}

Result:

C:/myGO/src/testNats/testNats.exe  [C:/myGO/src/testNats]
nats: Connection Closed
Success: process exited with code 0.

I think that the private function "subscribe" and "publish" should return an error if we give an empty string subject, but I found no checkout for the subject parameter in these function.

The dangerous condition may easily happened in this way:

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

//invalid Request
//nc.Publish("help", []byte("help me"))

If other Programs use 'Publish' instead of 'Request' to publish something to the subject 'help', then m.Reply == "" .

Sorry for my poor English. ^_^

Will not reconnect on "connection reset by peer" errors

We've got a error msg like

write tcp x.y.z.w:4222: connection reset by peer
nats: Connection Closed

and never get reconnectCallBack called.

The related traceback is:

github.com/apcera/nats.(_Conn).close(0xc208084000, 0x2, 0xc20802b001)
/gopath/src/github.com/apcera/nats/nats.go:1620 +0x34e
github.com/apcera/nats.(_Conn).Close(0xc208084000)
/gopath/src/github.com/apcera/nats/nats.go:1630 +0x36
github.com/apcera/nats.(_Conn).processErr(0xc208084000, 0xc208087d20, 0x12)
/gopath/src/github.com/apcera/nats/nats.go:1129 +0x1dd
github.com/apcera/nats.(_Conn).parse(0xc208084000, 0xc2080ca000, 0x19, 0x8000, 0x0, 0x0)
/gopath/src/github.com/apcera/nats/parser.go:222 +0xb9a
github.com/apcera/nats.(_Conn).readLoop(0xc208084000)
/gopath/src/github.com/apcera/nats/nats.go:942 +0x371
created by github.com/apcera/nats.(_Conn).spinUpSocketWatchers
/gopath/src/github.com/apcera/nats/nats.go:461 +0x6a

It would be good if processErr() also checks if there's any "connection reset by peer" msg?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.