Giter Site home page Giter Site logo

Ansyc branch about bac0 HOT 8 CLOSED

bbartling avatar bbartling commented on August 22, 2024
Ansyc branch

from bac0.

Comments (8)

ChristianTremblay avatar ChristianTremblay commented on August 22, 2024 1

Be sure to pull, I made some modification to handle disocnnection better

It gives me this on Windows :

image

from bac0.

bbartling avatar bbartling commented on August 22, 2024

Hi @ChristianTremblay, hows this looking?

import BAC0
import asyncio
from signal import SIGINT, SIGTERM

def read_sensor_value(address, sensor_object_type, sensor_object_instance):
    bacnet = BAC0.lite()
    
    try:
        # Formulate the read request string
        read_sensor_str = f'{address} {sensor_object_type} {sensor_object_instance} presentValue'
        print("Executing read_sensor_str statement:", read_sensor_str)
        
        # Perform the read operation
        sensor_value = bacnet.read(read_sensor_str)
        print("Sensor Value: ", sensor_value)
        
    finally:
        # Ensure the BACnet connection is properly closed
        bacnet.disconnect()
    
    # Return the sensor value to the calling function
    return sensor_value



if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    for sig in (SIGINT, SIGTERM):
        loop.add_signal_handler(sig, handler)

    loop.create_task(main())
    loop.run_forever()

    tasks = asyncio.all_tasks(loop=loop)
    for t in tasks:
        t.cancel()
    group = asyncio.gather(*tasks, return_exceptions=True)
    loop.run_until_complete(group)
    loop.close()

from bac0.

ChristianTremblay avatar ChristianTremblay commented on August 22, 2024

Not sure why I would do this, but, let say it is a proof of concept :-)

import BAC0
import asyncio
from signal import SIGINT, SIGTERM, signal
import  click

bacnet = None
loop = None

async def get_units(address:str = None, object_type:str = None, object_instance:str = None) -> str:
    units = await bacnet.read(f"{address} {object_type} {object_instance} units")
    return units

@click.command()
@click.option('--address', default="2:5", help='BACnet device address')
@click.option('--object_type', default="analogInput", help='BACnet object type')
@click.option('--object_instance', default="1022", help='BACnet object instance')
def main(address, object_type, object_instance):
    global loop
    loop = asyncio.get_event_loop()
    #for sig in (SIGINT, SIGTERM):
    #    loop.add_signal_handler(sig, handler)
    signal(SIGINT, handler)
    signal(SIGTERM, handler)
    
    loop.create_task(_main(address, object_type, object_instance))
    loop.run_forever()
    tasks = asyncio.all_tasks(loop=loop)
    for t in tasks:
        t.cancel()
    group = asyncio.gather(*tasks, return_exceptions=True)
    loop.run_until_complete(group)
    loop.close()


async def _main(address:str = None, object_type:str = None, object_instance:str = None) -> None:
    global bacnet
    bacnet = BAC0.lite()
    while True:
        val = await bacnet.read(f"{address} {object_type} {object_instance} presentValue")
        units = await get_units(address, object_type, object_instance)
        print(f"Sensor Value: {val} {units}...waiting 5 seconds")
        await asyncio.sleep(5)

def handler(sig, sig2):
    print(f"Got signal: {sig!s}, shutting down.")
    bacnet.disconnect()
    loop.stop()
    #loop.remove_signal_handler(SIGTERM)
    #loop.add_signal_handler(SIGINT, lambda: None)

if __name__ == "__main__":
    main()

from bac0.

bbartling avatar bbartling commented on August 22, 2024

would you ever recommend to just using IPython for BAC0? Sort of like this YouTube video series which is pretty cool BTW for BACnet tutorial they demo'd BAC0: https://youtu.be/HOapQkCxCHk?si=s7Yw7r9czL9ZnDcg

from bac0.

ChristianTremblay avatar ChristianTremblay commented on August 22, 2024

I wasn't aware of those videos.

I mostly use two use cases

  • Notebook for exploratory work, commissioning jobs, balancing, testing sequences, etc.

  • scripts that run as service for gateways

IPython REPL was my favorite because of tab completion and Colors but with bacpypes3 I haven't been able to make it work. My lack of knowledge in async/await world is pretty annoying...

This is why now I do all the development in Notebook (inside VSCode). There is an event loop that already run and it just work.

And I like to be able to test things on the fly.

from bac0.

bbartling avatar bbartling commented on August 22, 2024

cool stuff thanks Christian.

from bac0.

ChristianTremblay avatar ChristianTremblay commented on August 22, 2024

@bbartling
I don't know it is easier... but I kinda like the fact that a context manager will deal with all disconnections...

import BAC0
import asyncio
from signal import SIGINT, SIGTERM, signal
import click

loop = None
fini = False

async def get_units(
    bacnet, address: str = None, object_type: str = None, object_instance: str = None
) -> str:
    units = await bacnet.read(f"{address} {object_type} {object_instance} units")
    return units


@click.command()
@click.option("--ip", default="", help="Interface IP address with subnet mask ")
@click.option("--address", default="2:5", help="BACnet device address")
@click.option("--object_type", default="analogInput", help="BACnet object type")
@click.option("--object_instance", default="1022", help="BACnet object instance")
def main(ip, address, object_type, object_instance):
    global loop
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    signal(SIGINT, handler)
    signal(SIGTERM, handler)

    loop.create_task(_main(ip, address, object_type, object_instance))
    loop.run_forever()
    tasks = asyncio.all_tasks(loop=loop)
    for t in tasks:
        t.cancel()
    group = asyncio.gather(*tasks, return_exceptions=True)
    loop.run_until_complete(group)
    loop.close()


async def _main(
    ip:str, address: str = None, object_type: str = None, object_instance: str = None
) -> None:
    global bacnet
    global fini
    async with BAC0.lite(ip=ip) as bacnet:
        #await bacnet._discover()
        while not fini:
            val = await bacnet.read(
            f"{address} {object_type} {object_instance} presentValue"
            )
            units = await get_units(bacnet, address, object_type, object_instance)
            print(f"Sensor Value: {val} {units}...waiting 5 seconds")
            await asyncio.sleep(5)


def handler(sig, sig2):
    global fini
    fini = True
    print(f"Got signal: {sig!s}, shutting down.")
    loop.stop()


if __name__ == "__main__":
    main()

from bac0.

bbartling avatar bbartling commented on August 22, 2024

Looks great thanks

from bac0.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.