Giter Site home page Giter Site logo

streamdal / plumber Goto Github PK

View Code? Open in Web Editor NEW
2.1K 22.0 71.0 50.94 MB

A swiss army knife CLI tool for interacting with Kafka, RabbitMQ and other messaging systems.

Home Page: https://streamdal.com

License: MIT License

Makefile 0.75% Go 98.92% Shell 0.17% Dockerfile 0.09% Thrift 0.07%
event-driven kafka rabbitmq message-queue message-bus event-bus protobuf golang hacktoberfest

plumber's Introduction

Brief Demo

Master build status Go Report Card slack

plumber is a CLI devtool for inspecting, piping, messaging and redirecting data in message systems like Kafka, RabbitMQ , GCP PubSub and many more. [1]

The tool enables you to:

  • Safely view the contents of your data streams
  • Write plain or encoded data to any system
  • Route data from one place to another
  • Decode protobuf/avro/thrift/JSON data in real-time
    • Support for both Deep and Shallow protobuf envelope types
    • Support for google.protobuf.Any fields
  • Relay data to the Streamdal platform
  • Ship change data capture events to Streamdal platform
  • Replay events into a message system on your local network
  • And many other features (for a full list: plumber -h)

[1] It's like curl for messaging systems.

Why do you need it?

Messaging systems are black boxes - gaining visibility into what is passing through them is an involved process that requires you to write brittle consumer code that you will eventually throw away.

plumber enables you to stop wasting time writing throw-away code - use it to look into your queues and data streams, use it to connect disparate systems together or use it for debugging your event driven systems.

Demo

Brief Demo

Install

Via brew

$ brew tap streamdal/public
$ brew install plumber

Manually

Plumber is a single binary, to install you simply need to download it, give it executable permissions and call it from your shell. Here's an example set of commands to do this:

$ curl -L -o plumber https://github.com/streamdal/plumber/releases/latest/download/plumber-darwin
$ chmod +x plumber
$ mv plumber /usr/local/bin/plumber

Usage

Write messages

❯ plumber write kafka --topics test --input foo
INFO[0000] Successfully wrote message to topic 'test'    backend=kafka
INFO[0000] Successfully wrote '1' message(s)             pkg=plumber

Read message(s)

❯ plumber read kafka --topics test
INFO[0000] Initializing (could take a minute or two) ...  backend=kafka

------------- [Count: 1 Received at: 2021-11-30T12:51:32-08:00] -------------------

+----------------------+------------------------------------------+
| Key                  |                                     NONE |
| topic                |                                     test |
| Offset               |                                        8 |
| Partition            |                                        0 |
| Header(s)            |                                     NONE |
+----------------------+------------------------------------------+

foo

NOTE: Add -f to perform a continuous read (like tail -f)

Write messages via pipe

Write multiple messages

NOTE: Multiple messages are separated by a newline.

$ cat mydata.txt
line1
line2
line3

$ cat mydata.txt | plumber write kafka --topics foo

INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go
INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go
INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go

Write each element of a JSON array as a message

$ cat mydata.json
[{"key": "value1"},{"key": "value2"}]

$ cat mydata.json | plumber write kafka --topics foo --input-as-json-array

INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go
INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go

Documentation

Getting Help

A full list of available flags can be displayed by using the --help flag after different parts of the command:

$ plumber --help
$ plumber read --help
$ plumber read kafka --help

Features

  • Encode & decode for multiple formats
    • Protobuf (Deep and Shallow envelope)
    • Avro
    • Thrift
    • Flatbuffer
    • GZip
    • JSON
    • JSONPB (protobuf serialized as JSON)
    • Base64
  • --continuous support (ie. tail -f)
  • Support for most messaging systems
  • Supports writing via string, file or pipe
  • Observe, relay and archive messaging data
  • Single-binary, zero-config, easy-install

Hmm, what is this Streamdal thing?

We are distributed system enthusiasts that started a company called Streamdal.

Our company focuses on solving data stream observability for complex systems and workflows. Our goal is to allow everyone to build asynchronous systems, without the fear of introducing too much complexity.

While working on our company, we built a tool for reading and writing messages from our messaging systems and realized that there is a serious lack of tooling in this space.

We wanted a swiss army knife type of tool for working with messaging systems (we use Kafka and RabbitMQ internally), so we created plumber.

Why the name plumber?

We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)

Supported Messaging Systems

  • Kafka
  • RabbitMQ
  • RabbitMQ Streams
  • Google Cloud Platform PubSub
  • MQTT
  • Amazon Kinesis Streams
  • Amazon SQS
  • Amazon SNS (Publishing)
  • ActiveMQ (STOMP protocol)
  • Azure Service Bus
  • Azure Event Hub
  • NATS
  • NATS Streaming (Jetstream)
  • Redis-PubSub
  • Redis-Streams
  • Postgres CDC (Change Data Capture)
  • MongoDB CDC (Change Data Capture)
  • Apache Pulsar
  • NSQ
  • KubeMQ
  • Memphis - NEW!

NOTE: If your messaging tech is not supported - submit an issue and we'll do our best to make it happen!

Kafka

You need to ensure that you are using the same consumer group on all plumber instances.

RabbitMQ

Make sure that all instances of plumber are pointed to the same queue.

Note on boolean flags

In order to flip a boolean flag to false, prepend --no to the flag.

ie. --queue-declare is true by default. To make it false, use --no-queue-declare.

Tunnels

plumber can now act as a replay destination (tunnel). Tunnel mode allows you to run an instance of plumber, on your local network, which will then be available in the Streamdal platform as a replay destination.

This mitigates the need make firewall changes to replay messages from a Streamdal collection back to your message bus.

See https://docs.streamdal.com/what-are/what-are-destinations/plumber-as-a-destination for full documentation.

High Performance & High Availability

plumber comes with a "server" mode which will cause plumber to operate as a highly available cluster.

You can read more about "server mode" here.

Server mode examples can be found in docs/server.md

Acknowledgments

Huge shoutout to jhump and for his excellent protoreflect library, without which plumber would not be anywhere near as easy to implement. Thank you!

Release

To push a new plumber release:

  1. git tag v0.18.0 master
  2. git push origin v0.18.0
  3. Watch the github action
  4. New release should be automatically created under https://github.com/streamdal/plumber/releases/
  5. Update release to include any relevant info
  6. Update homebrew SHA and version references

Contribute

We love contributions! Prior to sending us a PR, open an issue to discuss what you intend to work on. When ready to open PR - add good tests and let's get this thing merged! For further guidance check out our contributing guide.

plumber's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

plumber's Issues

Support for Azure EventHub

I believe this should be straightforward as it will be very similar to Service Bus.

I am willing to contribute if this feature is wanted.

Support input from STDIN

It would be cool if I could pipe data into plumber. Something like:

$ cat data.json | plumber write kafka --topic stdinpls

Azure Event Hub - unable to generate connection config

Describe the bug

When trying to connect to the Azure Event hub the following error occurs:

unable to generate connection config: unable to lookup connection info for backendName 'azure-event-hub': Unable to find the key

To Reproduce
Steps to reproduce the behavior:

  1. Run export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=..." (with an actual Azure connection string)
  2. plumber read azure-event-hub

Using a direct connection

  1. plumber read azure-event-hub --connection-string=""Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=..." (with an actual Azure connection string)

Expected behavior
To be able to connect to the Azure event hub.

Screenshots
2022-02-11 at 15 00 22@2x

CLI (please complete the following information):

  • OS: Mac OS Big Sur
  • Version 1.2.0

Support Kafka TLS

https://github.com/segmentio/kafka-go/ supports tls.Config (see here and here where one can pass in a slice of tls.Certificate. I'm not sure how most people would expect to use this in practice, but I see some prior art here, where only one cert is supported. In the case of Benthos, it's done like this: https://www.benthos.dev/docs/components/inputs/kafka#tlsclient_certs via the yaml config, but for a CLI tool it's probably overkill to support more than one cert.

My colleagues said that they used https://www.cloudkarafka.com/docs/kafkacat.html in the past, which also seems to support only one cert.

Improve functional test suite

Current functional test suite is very hit or miss - very likely needs to be redone altogether.

Would probably leave kafka for last due to slow initialization times (and thus difficult to quickly iterate).

Also need to figure out how we'll test messaging systems that can't be run locally (such as GCP PubSub, SQS, Azure).

EDIT: Now that we have relay mode and plumber is expected to run without issues long-term - we must have a reliable testing story.

Listen to Kafka cluster messages

Hello, is there any way to listen messages from one of the instances from Kafka cluster?
I tried with

./plumber-linux -d read message kafka --topic=my-topic-name --address=one.of.the.nodes:9092 

but I get message:

INFO[0000] Initializing (could take a minute or two) ...  pkg=kafka/read.go

Nothing happens even after 10 minutes

Can we read messages in kafka as a consumer group?

Can we read messages in kafka as a consumer group?

CLI for this

sh-4.2$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka.com --topic loader-db.inventory.customers --group inventory-reload-loader
message1
message2
message

Unable to start plumber

After following the steps mentioned to download the binary and make it executable, I get the error: /usr/local/bin/plumber: 1: Not: not found upon running the command: plumber -h or plumber -v

ActiveMQ - "Unable to complete command: unable to create client: unable to create activemq client"

I'm running an ActiveMQ 5.16.0 instance, and I can't make plumber connect to it. The queue is protected by a username and a password.

telnet 10.75.93.20 61616
Trying 10.75.93.20...
Connected to 10.75.93.20.
Escape character is '^]'.
�;�ActiveMQ
           ��)
              �TcpNoDelayEnabled���SizePrefixDisabled�	CacheSize�
                                                                  ProviderName ActiveMQ�StackTraceEnabled��PlatformDetails	�Java
                                                                                                                                     CacheEnabled���TightEncodingEnabled��
                                                                                                                                                                          MaxFrameSize��@�MaxInactivityDuration�u0 MaxInactivityDurationInitalDelay�'�ProviderVersion	�5.16.0^C^Z^CConnection closed by foreign host.
./plumber-linux read activemq --address="10.75.93.20:61616" --queue=myqueue --debug
INFO[0000] 
█▀█ █   █ █ █▀▄▀█ █▄▄ █▀▀ █▀█
█▀▀ █▄▄ █▄█ █ ▀ █ █▄█ ██▄ █▀▄ 
FATA[0000] Unable to complete command: unable to create client: unable to create activemq client: EOF

rabbitmq x-dead-letter-exchange

time="2021-12-14T19:44:23+08:00" level=error msg="unable to complete read for backend 'rabbitmq': unable to create new rabbit consumer: unable to initialize rabbitmq consumer: unable to get initial delivery channel: unable to create new server channel: Exception (406) Reason: \"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'xxxx' in vhost '/': received none but current is the value 'xxx' of type 'longstr'\"" pkg=plumber

[Windows] --with-line-numbers didnt work

kafka read --with-line-numbers didnt work with an error of

time="2020-08-08T11:07:34+07:00" level=fatal msg="Unable to handle CLI input: unable to parse command: unknown long flag '--with-line-numbers'"

command used
plumber read messages kafka --topic foo --with-line-numbers --follow

not buildable from source

Describe the bug

Version v1.0.0 is not buildable from source, because one of its dependencies is not public available:

go: github.com/batchcorp/[email protected]: reading github.com/batchcorp/inferschema/go.mod at revision v0.0.4: git ls-remote -q origin in /build/go/pkg/mod/cache/vcs/cee0f32bc125cd938ce5343397a3d327fcb90b9d314ebdf991db6d045ed551af: exit status 128:
        fatal: could not read Username for 'https://github.com': terminal prompts disabled
Confirm the import path was entered correctly.
If this is a private repository, see https://golang.org/doc/faq#git_https for additional information.

To Reproduce
Steps to reproduce the behavior:

  1. Download the software
  2. Run go build -o plumber .

Expected behavior

It should be possible to compile this software from source.

Support to specify multiple directories via `--protobuf-dir`

Hey,

Thanks for this great tool !

I'm trying to do the read messages in protobuf format from google pubsub, however I noticed that since some my protobuf definitions import extra things like http annotations which are either are fetched from a vendor folder or from the system library.
So I see something like this

FATA[0000] Unable to complete command: unable to find root message descriptor: unable to read file descriptors: unable to parse files: myservice/service.proto:27:8: google/api/annotations.proto: file does not exist 

Would it be possible to specify these directories separately by just repeating the same protobuf-dir option ?

Improve docs, add examples

Would be nice to have a wiki of examples.

When using protobuf - plumber syntax can get pretty unruly. Would be nice to provide some in-depth examples.

Maybe create an EXAMPLES.md or put examples into wiki?

Move away from urfave/cli

Move to kingpin. urfave/cli is difficult to test/creating a mock *cli.Context is not very intuitive - have to perform tests via app.Run and inject writer to capture output. Annoying.

Support to show kafka consumer group lag

Please add support to show consumer group lag.

Here is the code we use to calculate consumer group lag.

package consumer

import (
	"fmt"
	"sync"
	"time"

	"github.com/Shopify/sarama"
	"github.com/practo/klog/v2"
)

type KafkaWatcher interface {
	Topics() ([]string, error)
	ConsumerGroupLag(id string, topic string, partition int32) (int64, error)
}

type kafkaWatch struct {
	client               sarama.Client
	cacheValidity        time.Duration
	lastTopicRefreshTime *int64

	// mutex protects the following the mutable state
	mutex sync.Mutex

	topics []string
}

func NewKafkaWatcher(brokers []string, version string) (KafkaWatcher, error) {
	v, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		return nil, fmt.Errorf("Error parsing Kafka version: %v\n", err)
	}

	c := sarama.NewConfig()
	c.Version = v

	client, err := sarama.NewClient(brokers, c)
	if err != nil {
		return nil, fmt.Errorf("Error creating client: %v\n", err)
	}

	return &kafkaWatch{
		client:               client,
		cacheValidity:        time.Second * time.Duration(30),
		lastTopicRefreshTime: nil,
	}, nil
}

// Topics get the latest topics after refreshing the client with the latest
// it caches it for t.cacheValidity
func (t *kafkaWatch) Topics() ([]string, error) {
	if cacheValid(t.cacheValidity, t.lastTopicRefreshTime) {
		return t.topics, nil
	}

	klog.V(4).Info("Refreshing kafka topic cache")
	// empty so that it refresh all topics
	emptyTopics := []string{}
	err := t.client.RefreshMetadata(emptyTopics...)
	if err != nil {
		return []string{}, err
	}

	topics, err := t.client.Topics()
	if err != nil {
		return []string{}, err
	}

	t.mutex.Lock()
	defer t.mutex.Unlock()

	t.topics = topics
	now := time.Now().UnixNano()
	t.lastTopicRefreshTime = &now

	return t.topics, nil
}

func (t *kafkaWatch) ConsumerGroupLag(
	id string,
	topic string,
	partition int32,
) (
	int64,
	error,
) {
	defaultLag := int64(-1)

	lastOffset, err := t.client.GetOffset(topic, partition, sarama.OffsetNewest)
	if err != nil {
		return defaultLag, err
	}

	offsetFetchRequest := sarama.OffsetFetchRequest{
		ConsumerGroup: id,
		Version:       1,
	}
	offsetFetchRequest.AddPartition(topic, partition)

	err = t.client.RefreshMetadata(topic)
	if err != nil {
		return defaultLag, err
	}

	broker, err := t.client.Leader(topic, partition)
	if err != nil {
		return defaultLag, fmt.Errorf(
			"Error getting the leader broker, err: %v", err)
	}

	offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
	if err != nil {
		return defaultLag, err
	}
	if offsetFetchResponse == nil {
		return defaultLag, fmt.Errorf(
			"OffsetFetch request got no response for request: %+v",
			offsetFetchRequest)
	}

	for topicInResponse, partitions := range offsetFetchResponse.Blocks {
		if topicInResponse != topic {
			continue
		}

		for partitionInResponse, offsetFetchResponseBlock := range partitions {
			if partition != partitionInResponse {
				continue
			}
			// Kafka will return -1 if there is no offset associated
			// with a topic-partition under that consumer group
			if offsetFetchResponseBlock.Offset == -1 {
				klog.Warningf("%s not consumed by group: %v", topic, id)
				return defaultLag, nil
			}
			if offsetFetchResponseBlock.Err != sarama.ErrNoError {
				return defaultLag, err
			}
			return lastOffset - offsetFetchResponseBlock.Offset, nil
		}
	}

	klog.Warningf("%s for group is not active or present in Kafka", topic)
	return defaultLag, nil
}

func cacheValid(validity time.Duration, lastCachedTime *int64) bool {
	if lastCachedTime == nil {
		return false
	}

	if (*lastCachedTime + validity.Nanoseconds()) > time.Now().UnixNano() {
		return true
	}

	return false
}

Support for Amazon SQS

Let's get this in here, ASAP. Lots of folks use SQS; this should be supported out of the box (especially since GCP PubSub is supported).

Support of KubeMQ Backend

Hi Guys,
Some of our users asked us for integration with your platform.
Thanks for your great architecture, it was very easy and we have a ready PR to merge.
The KubeMQ backend support read, write, dynamic and relay. The only missing piece is the support of PB and the gRPC part that needs your attention and add the required schema.
Let me know what you need from us in order to complete the integration.

Lior,
KubeMQ CTO

Read from kafka as consumer group is not working

Works

I am trying to read using https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go it works.

go run main.go --brokers=$KAFKA --group=inventory-reload-batcher-a1c3de --topics=db.inventory.customers

The sarma consumer here does not require one minute or two, it just starts working.

Does not work

It hangs at trying to initialize the request. I saw the code has an infinite loop and does not log any error so that the user can fix something.
But when I read using plumber it does not.

plumber read kafka --address $KAFKA --topic db.inventory.customers  --group-id inventory-reload-batcher-a1c3de
INFO[0000] Initializing (could take a minute or two) ...  pkg=kafka/read.go

Please help out here.

Installation on macOS causes Apple to issue a warning

When I followed the installation instructions, even though I set the binary to be executable Apple threw their "This binary was downloaded from the internet and isn't signed" warning message and wouldn't allow me to execute it. I had to navigate to the binary in finder and then right click to open it before Apple gave me the option to trust it. Perhaps we need to look into some other signing/packaging solution to make the installation smoother for users?

Support for AWS SQS FIFO Queues

To write to a FIFO queue in AWS SQS, you must supply a MessageGroupId. It looks like Plumber doesn't currently support this (I could be wrong - I'm definitely new to golang).

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html

If you don't supply it, you get rejected with a 400 like this:

FATA[0000] Unable to complete command: unable to complete message send: MissingParameter: The request must contain the parameter MessageGroupId.
status code: 400, request id: c667b6b4-42d9-5bb3-be3e-ee5d2e8888ca 

CLI Params required for SQS

Describe the bug
Previously plumber would just pick up my AWS_PROFILE env var and authenticate and read from an SQS queue.
Recently I upgraded and it is no longer working
To Reproduce
Steps to reproduce the behavior:
➜ plumber read aws-sqs --queue-name=<QUEUE_NAME> -f
FATA[0000] Unable to handle CLI input: unable to parse CLI options: missing flags: --aws-access-key-id=STRING, --aws-region=STRING, --aws-secret-access-key=STRING

Expected behavior
Messages should be produced

Screenshots
If applicable, add screenshots to help explain your problem.

CLI (please complete the following information):

  • OS: [e.g. iOS] MacOS
  • Version [e.g. 22] 1.1.1

Additional context
Add any other context about the problem here.

Ability to emit JSON

When reading from a topic, MQTT in my case, it would be nice to have a mode to emit one JSON line per event rather then the table

------------- [Count: 29 Received at: 2021-12-10T12:39:21-05:00] -------------------

+----------------------+------------------------------------------+
| ID                   |                                        0 |
| Topic                |             aqualinkd/Solar_Heater/delay |
| QoS                  |                                        0 |
| Retain               |                                    false |
+----------------------+------------------------------------------+
52.00

would become

{
  "count" : 29,
  "received" : "2021-12-10T12:39:21-05:00", 
  "id" :0, 
  "topic" : "aqualinkd/Solar_Heater/delay", 
  "qos" : 0, 
  "retain": false, 
  "payload" : 52.00 
} 

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.