Giter Site home page Giter Site logo

panini's Introduction

Panini

Panini is a python microframework based on the nats.py library. Its goal is to offer developers an easy way to create NATS microservices with a lower barrier of entry. It provides a specific template for creating microservices, similarly to FastAPI, Aiohttp, or Flask. Like all of the above frameworks, Panini has its design limits and edge cases. In the case that you become restricted by Panini's capabilities, we recommend switching to nats.py.

docs Build Status Versions License Apache 2.0

Panini was inspired by Faust project.


Documentation

Documentation is here.

How to install

Before getting started make sure you have all the prerequisites installed:

pip install panini

A simple listener app example

A minimal app with one stream endpoint, one request endpoint, and one periodic task might look like this:

from panini import app as panini_app

app = panini_app.App(
        service_name='listener_app',
        host='127.0.0.1',
        port=4222,
)

@app.listen("some.subject.for.request")
async def request_listener(msg):
    """ request endpoint """
    print(f"request {msg.data} from {msg.subject} has been processed")
    return {"success": True, "message": "request has been processed"}

@app.listen("some.subject.for.stream")
async def stream_listener(msg):
    """ stream endpoint """
    print(f"event {msg.data} from {msg.subject} has been processed")

if __name__ == "__main__":
    app.start()

What's going on here?

  1. Imported Panini.
  2. Initialized app. Created an instance of class App from module panini with any microservice name, NATS host, and port.
  3. First @app.listen registers the listening subject "some.subject.for.request" with request_listener. Every time this app receives a request addressed to "some.subject.for.request", the function request_listener is called to process it, then it sends a return response back to an addressee.
  4. Secondly @app.listen register the listening subject "some.subject.for.stream" with stream_listener. Same as with request_listener but without sending the result back.
  5. app.start() runs an app. No code under this command will ever be called.

Save the above code to file listener_app.py.

💡 The current function expects only JSON formattable returns, dict or list. However, you can also specify it as string or bytes. More details about this in Datatypes section._

Make sure that you have all prerequisites from Install. Open the terminal to run the app:

> python3 listener_app.py
======================================================================================
Panini service connected to NATS..
id: 3
name: listener_app__non_docker_env_270377__75017

NATS brokers:
*  nats://127.0.0.1:4222
======================================================================================

That's it. Now let's create something that will generate messages.

A simple app example that generates messages

Our goal here is to trigger endpoints from listener app above:

  • "some.subject.for.request" - request something, receive response
  • "some.subject.for.stream" - send some event without waiting for response
from panini import app as panini_app

app = panini_app.App(
        service_name='sender_app',
        host='127.0.0.1',
        port=4222,
)

@app.task(interval=1)
async def request_periodically():
		message = {"data":"request1234567890"}
    response = await app.request(
        subject="some.subject.for.request", 
        message=message,
    )
    print(response)
		

@app.task(interval=1)
async def publish_periodically():
		message = {"data":"event1234567890"}
    await app.publish(
        subject="some.subject.for.stream", 
        message=message,
    )

if __name__ == "__main__":
    app.start()

What's new here:

  1. First, @app.task registers function request_periodically to call it periodically at given interval, each 1 second in the example.
  2. Function app.request sends requests, asynchronously waits for a response.
  3. The second @app.task does the same as the first one but for publishing.
  4. Function app.publish sends a message like a request but without expecting any response. Fire and forget.

Save the code to new file sender_app.py.

Make sure that listener_app.py keeps running, then open a new terminal session to run the sender app:

> python3 sender_app.py
======================================================================================
Panini service connected to NATS..
id: 3
name: sender_app__non_docker_env_270377__75017

NATS brokers:
*  nats://127.0.0.1:4222
======================================================================================
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}
{'success': True, 'message': 'request has been processed'}

Note that in the terminal session where you run listener_app.py you should see received requests and events:

event {'data': 'event1234567890'} from some.subject.for.stream has been processed
request {'data': 'request1234567890'} from some.subject.for.request has been processed
event {'data': 'event1234567890'} from some.subject.for.stream has been processed
request {'data': 'request1234567890'} from some.subject.for.request has been processed
event {'data': 'event1234567890'} from some.subject.for.stream has been processed
request {'data': 'request1234567890'} from some.subject.for.request has been processed
event {'data': 'event1234567890'} from some.subject.for.stream has been processed
request {'data': 'request1234567890'} from some.subject.for.request has been processed
event {'data': 'event1234567890'} from some.subject.for.stream has been processed
request {'data': 'request1234567890'} from some.subject.for.request has been processed
event {'data': 'event1234567890'} from some.subject.for.stream has been processed
request {'data': 'request1234567890'} from some.subject.for.request has been processed

More possibilities

In the first example, we created an application that listens for messages, in the second example, an application that sends messages. Panini allows you to freely combine sending and receiving messages in one application.

Let's check out what else you can do with Panini using a minimal interface:

  • One-time tasks on start. Similar to the above periodic task but without interval argument
@app.task()
async def publish():
    while True:
        message = get_some_update()
        await app.publish(subject='some.subject', message=message)
  • Synchronous endpoints
@app.task(interval=2)
def your_periodic_task():
    for _ in range(10):
        app.publish_sync(
            subject='some.publish.subject', 
            message={'some':'data'}
        )
  • Accept different datatypes: dict, str, bytes
@app.timer_task(interval=2)
def your_periodic_task():
    for _ in range(10):
        app.publish_sync(
            subject='some.publish.subject', 
            message=b'messageinbytestosend', 
            data_type=bytes
        )
  • Create middlewares for NATS messages
from panini.middleware import Middleware

class MyMiddleware(Middleware):

    async def send_publish(self, subject, message, publish_func, **kwargs):
        print('do something before publish')
        await publish_func(subject, message, **kwargs)
        print('do something after publish')

    async def listen_publish(self, msg, cb):
        print('do something before listen')
        await cb(msg)
        print('do something after listen')

    async def send_request(self, subject, message, request_func, **kwargs):
        print('do something before send request')
        result = await request_func(subject, message, **kwargs)
        print('do something after send request')
        return result

    async def listen_request(self, msg, cb):
        print('do something before listen request')
        result = await cb(msg)
        print('do something after listen request')
        return result
  • Create HTTP endpoints with Aiohttp and NATS endpoints all together in one microservice

    from aiohttp import web
    
    @app.listen('some.publish.subject')
    async def subject_for_requests_listener(msg):
        handle_incoming_message(msg.subject, msg.data)
    
    @app.http.get('/get')
    async def web_endpoint_listener(request):
        """
        Single HTTP endpoint
        """
        return web.Response(text="Hello, world")
    
    @app.http.view('/path/to/rest/endpoints')
    class MyView(web.View):
        """
        HTTP endpoints for REST schema
        """
        async def get(self):
            request = self.request
            return web.Response(text="Hello, REST world")
    
        async def post(self):
            request = self.request
            return web.Response(text="Hello, REST world")
  • Built-in traffic balancing between instances of the microservice if you have high loads

app = panini_app.App(
        service_name='async_publish',
        host='127.0.0.1',
        allocation_queue_group='group24', 
        port=4222,
)

# incoming traffic will be distributed among 
# all microservices that are in the "group24"

Need more examples? Check here.

Testing

We use pytest for testing

To run tests (notice, that nats-server must be running on port 4222 for tests):

pytest

Contributing

Welcome contributor! We are looking developers to make Panini a great project.

Working on your first Pull Request? You can learn how from this free series, How to Contribute to an Open Source Project on GitHub.

Here's how you can help:

  • suggest new updates or report about bug here
  • review a pull request
  • fix an issue
  • write a tutorial
  • always follow by this guide for your contributions

At this point, you're ready to make your changes! Feel free to ask for help 😸

panini's People

Contributors

artas728 avatar bugimprover avatar danylott avatar dependabot[bot] avatar eugenesokol avatar i2gor87 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

panini's Issues

RuntimeWarning in time sending panini_event.*.*.started with NATSTimeoutMiddleware

Try to add NATSTimeoutMiddleware and run the app

There is a warning:

panini/panini/app.py:215: RuntimeWarning: coroutine '_MiddlewareManager._wrap_function_by_middleware..wrap_function_by_send_middleware..aio_next_wrapper' was never awaited
self.connector.publish_sync(f'panini_events.{self.service_name}.{self.client_id}.started', {})
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Panini dashboard

What do we need in order to reach good observability by default:
1)Latency
2)Response: success, timeout, another error kind
3)Response Error rate
4)Messages size
5)NATS numbers
- uptime
- pending_bytes
- in_msgs
- out_msgs
- in_bytes
- out_bytes
- subscriptions
- rate_in_msgs_per_min
- rate_out_msgs_per_min
- rate_in_bytes_per_min
- rate_out_bytes_per_min
6)Logs

Any thought?

Cannot run app with 'from_another_thread'

round brackets in start() method have to be removed:
start_thread(self._start()) -> start_thread(self._start)

After removal I got this error:

Exception in thread Thread-1: Traceback (most recent call last): File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/Users/oleksii/opreturn-dev/panini/panini/app.py", line 226, in _start NATSClient.**init**(self, **self.nats_config) File "/Users/oleksii/opreturn-dev/panini/panini/nats_client/nats_client.py", line 58, in **init** self.connector: NATSClientInterface = _AsyncioNATSClient( File "/Users/oleksii/opreturn-dev/panini/panini/nats_client/_asyncio_cli.py", line 83, in_ _init__ self.loop = asyncio.get_event_loop() File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop raise RuntimeError('There is no current event loop in thread %r.' RuntimeError: There is no current event loop in thread 'Thread-1'.

middleware inheritance

Let's assume that we have ParentMiddleware which has overloaded method listen_any and ChildMiddleware which inherited from ParentMiddleware and has not any of required methods.

In that case, ChildMiddleware cannot be used

test_client sleep_time - depends on computer power

The problem is, that I need to know when panini has started.
To run the tests only after that happened, for that we use for know time.sleep(),
But this is a really bad way because my laptop is not so powerful - and I need to specify greater time each time manually.

Additional(optional) datatypes for messages: str, bytes

Example of usage:

@app.timer_task(interval=1)
async def publish_periodically():
    for _ in range(10):
        await app.publish(
            subject="some.publish.subject", 
            data_type=str,                                  #expected string in message then
            message=json.dumps(message), 
        )

@app.listen("some.publish.subject", data_type=str)
async def receive_messages(msg):
    log.warning(f"got subject {msg.subject}")    
    log.warning(f"got message {msg.data}")      #msg.data - string


@app.listen("some.publish.subject", data_type=bytes)
async def receive_messages(msg):
    log.warning(f"got subject {msg.subject}")
    log.warning(f"got message {msg.data}")    #msg.data - bytes

Initialize logger only at _start()

if using tests and importing app object - it will be great to have a possibility to change app.logger_files_path,
Because if not do this - Read-only file system error will raise
Add close() method to test_client, which will fully close test_client activity

Used Python Libraries

Is it make sense to add python librairies version used by Panini in the documentation?

slow customers

anthill client get 'slow customer' error and disconnect from the server if internet get disconnected for few seconds.
Then an application is failed

Traceback (most recent call last):
  File "/Users/nelson/dev/ms_general2/strategy_liquidity_provider/run_euro.py", line 186, in <module>
    app.start()
  File "/Users/nelson/dev/ms_general2/anthill/app.py", line 181, in start
    self._start()
  File "/Users/nelson/dev/ms_general2/anthill/app.py", line 218, in _start
    self._start_tasks()
  File "/Users/nelson/dev/ms_general2/anthill/app.py", line 236, in _start_tasks
    loop.run_until_complete(asyncio.gather(*tasks))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError

Middleware

Implement ability to add middleware
middleware types we need:
-send_publish_middleware
-recieve_publish_middleware
-send_request_middleware
-recieve_request_middleware

Each middleware allows to proceed:
-before event
-after event

Usage example:

import os
import asyncio
from anthill import app as ant_app
from anthill.middleware import MiddlewareBase

app = ant_app.App(
    service_name='microservice2',
    host='nats-server' if 'HOSTNAME' in os.environ else '127.0.0.1',
    port=4222,
    app_strategy='asyncio',
)

log = app.logger.log



@app.listen('some.publish.topic')
async def recieve_messages(topic, message, **kwargs):
    return {'success':True}



class MyMiddleware(MiddlewareBase):

    async def send_publish(self, topic, message, publish_func, **kwargs):
        #do something before
        await publish_func(topic, message, **kwargs)
        #do something after

    async def listen_publish(self, msg, cb):
        #do something before
        await cb(msg)
        # do something after

    async def send_request(self, topic, message, request_func, **kwargs):
        # do something before
        result = await request_func(topic, message, **kwargs)
        # do something after
        return result

    async def listen_request(self, msg, cb):
        # do something before
        result = cb(msg)
        # do something after
        return result

   

if __name__ == "__main__":
    app.add_middleware(MyMiddleware, some_unique_param='some_unique_param', another_unique_param='another_unique_param')
    app.start()

Also it required an alternative mechanism for specifying any after or before event like:

class MyMiddleware(MiddlewareBase):
    def send_any(self, topic, message, publish_func, **kwargs):
        #do something before
        await publish_func(topic, message, **kwargs)
        #do something after
        return topic, message

    def listen_any(self, msg, cb):
        # do something before
        result = cb(msg)
        # do something after
        return result

Inspired by https://fastapi.tiangolo.com/advanced/middleware/

Change arguments format in endpoints

Update arguments format in endpoints

from:

@app.listen("some.publish.subject")
async def receive_messages(subject, message):
    print(f"got message {message}")

to

@app.listen("some.publish.subject")
async def receive_messages(msg):
    print(f"got message {msg.parsed_data}")

where msg is an object that keeps many parameters:

msg.subject
msg.data                    #raw data
msg.parsed_data       #handled data
msg.header
msg.any_another_parameter

Silent error in middleware

When I mistakenly use function instead coroutine, I got salient error

@app.listen("some.publish.subject")
async def receive_messages(msg):
    log.warning(f"got subject {msg.subject}")
    log.warning(f"got message {msg.data}")

@app.listen("some.request.subject")
async def receive_messages(msg):
    return {'success': True, "data":"some data you asked for"}

class MyMiddleware(Middleware):

    async def send_publish(self, subject, message, publish_func, **kwargs):
        print('do something before publish')
        await publish_func(subject, message, **kwargs)
        print('do something after publish')

    async def listen_publish(self, msg, cb):
        print('do something before listen')
        await cb(msg)
        print('do something after listen')

    async def send_request(self, subject, message, request_func, **kwargs):
        print('do something before send request')
        result = await request_func(subject, message, **kwargs)
        print('do something after send request')
        return result

    async def listen_request(self, msg, cb):
        print('do something before listen request')
        result = cb(msg)                                              # error. expected "result = await cb(msg)"
        print('do something after listen request')
        return result

Not logic sub title

Is it for switching from asynchronous to synchronous?

  • sync & async
@app.timer_task(interval=2)
def your_periodic_task():
    for _ in range(10):
        app.publish_sync(subject='some.publish.subject', message={'some':'data'})

Use Black8 as default project formatter

It is good practice to use the same code prettier for the whole project,
It will be cool to configure it with some tool, that performs code formatting before each commit,
So for a developer - there is no need to remember about code style, just write as you like, and Black8 will do the thing

send_any, send_publish, send_request in one middleware

It doesn't work at the moment, Middleware reneger just ignore send_any
Should we let use it or make an exception in order to clarify that it's unsupported?

class MyMiddleware(Middleware):

    async def send_publish(self, subject, message, publish_func, **kwargs):
        print('do something before publish')
        await publish_func(subject, message, **kwargs)
        print('do something after publish')

    async def listen_publish(self, msg, cb):
        print('do something before listen')
        await cb(msg)
        print('do something after listen')

    async def send_request(self, subject, message, request_func, **kwargs):
        print('do something before send request')
        result = await request_func(subject, message, **kwargs)
        print('do something after send request')
        return result

    async def listen_request(self, msg, cb):
        print('do something before listen request')
        result = await cb(msg)
        print('do something after listen request')
        return result

    async def send_any(self, subject, message, publish_func, **kwargs):
        print('do something before send_any')
        await publish_func(subject, message, **kwargs)
        print('do something after send_any')

Create AppFactory for Panini

make able to create App() from AppFactory class in order to have different ways to create app (from config file, json, env, etc)
Also, there we can add creation of Middleware

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.