Giter Site home page Giter Site logo

kimsehwan96 / car-iot-platform-from-kpu Goto Github PK

View Code? Open in Web Editor NEW
17.0 2.0 0.0 37.48 MB

한국산업기술대학교 졸업작품 팀 레포지토리 Korea Polytechnic University Graduation Project Repo (산기대)

Python 81.62% Shell 7.35% HTML 4.60% Jupyter Notebook 6.43%
serverless aws greengrass react obd can obd2 awsiot lambda aws-greengrass-provisioner car iot python

car-iot-platform-from-kpu's Introduction

산기대 졸업작품 팀 레포지토리

Cloud Realtime Dasboard

링크 : https://dashboard.driving-mate.com

3D Realtime Dasboard

링크 : https://3d.driving-mate.com/

프로모션(랜딩)페이지

링크 : https://www.driving-mate.com

실시간 차량 주행 정보 모니터링 (진행중)

demoapp

전체적인 아키텍쳐

라즈베리 파이 내부 코드 구성

자동차 데이터 수집

자동차에 있는 OBD 단자를 통해 CAN 신호를 전달받는다. 이 때 CAN 신호를 pican 이라는 보드를 통하여 라즈베리파이가 읽을 수 있는 신호로 변환, python-can라이브러리를 통하여 자동차와 OBD(can)통신을 연결함.

자가 진단 점검 단자의 3, 4, 11 핀을 활용하여 연결

OBD-2 케이블의 반대편을 잘라서, 필요한 pin(3, 4, 11)에 대한 선만 pican 보드에 연결

데이터 수집을 위한 CAN 통신(OBD) 코드 작성

OBD 프로토콜에 맞춰, 차량의 실시간 데이터를 요청하는(쿼리) 코드를 작성해야함.

쿼리

응답

실제 구현한 코드

따라서 사용 가능한 OBD PID를 분리하고, 이 데이터를 수집하기 위한 파이썬 코드를 작성하였다. (/greengrass/canPlugin/can_plugin.py 및 /greengrass/canPlugin/canutil.py)

  • canutil.py
from enum import Enum
import can

DATA_TYPE_INDEX = 2


class NotSupportedDataTypeException(Exception):
    def __init__(self):
        super().__init__("지원하지 않는 데이터 타입입니다.")


class CanDataType(Enum):
    """
    OBD2 PIDS : https://en.wikipedia.org/wiki/OBD-II_PIDs

    차량 제조사별 확장 PID 및 다른 PID가 존재할 수 있다. 확인 필요
    """
    ENGINE_LOAD = 0x04
    SHORT_FUEL_TRIM_BANK = 0x06
    LONG_FUEL_TRIM_BANK = 0x07
    INTAKE_MANIFOLD_ABSOLUTE_PRESSURE = 0x0b
    ENGINE_RPM = 0x0C
    VEHICLE_SPEED = 0x0d
    INTAKE_AIR_TEMPERATURE = 0x0F
    MAF_SENSOR = 0x10
    THROTTLE = 0x11
    ENGINE_RUNTIME = 0x1F
    TRAVELED_DISTANCE = 0x31  # since Error code cleared !
    FUEL_TANK_LEVEL = 0x2F
    OXYGEN_SENSOR = 0x34
    AMBIENT_AIR_TEMPERATURE = 0x46
    SHORT_TERM_FUEL_EFFICIENCY = 0x00  # this is dummy
    AVERAGE_FUEL_EFFICIENCY = 0x00  # this is dummy

    # Request & Response
    PID_REQUEST = 0x7DF
    PID_REPLY = 0x7E8

    # PID MODES
    SHOW_CURRENT_DATA = 0x01
    SHOW_FREEZE_FRAME_DATA = 0x02
    SHOW_TROUBLE_CODES = 0x03


class CanRequestMessage:
    # TODO: change this class as singleton
    def __str__(self) -> str:
        return "{}".format(self.message)

    def __init__(self, data_type) -> None:
        self.data_type = data_type  # ENUM
        self.message = [0x02,
                        CanDataType.SHOW_CURRENT_DATA.value,
                        self.data_type.value,
                        0x00,
                        0x00,
                        0x00,
                        0x00,
                        0x00]

    def get_type(self):
        return self.data_type.name


class CanDataConvert:
    def __init__(self):
        pass

    @staticmethod
    def convert(recv_msg) -> int:
        data_type = recv_msg.data[DATA_TYPE_INDEX]
        try:
            handler = getattr(CalculateData, CanDataType(data_type).name.lower())
            return handler(recv_msg.data)
        except AttributeError:
            raise NotSupportedDataTypeException
        except Exception as e:
            print('error occur : ', e)


# cal fef https://www.sciencedirect.com/science/article/pii/S2352484719308649
# https://stackoverflow.com/questions/44794181/fuel-consumption-and-mileage-from-obd2-port-parameters
class CalculateData:
    maf = 0
    speed = 0
    present_fuel_efficiency = 0
    average_fuel_efficiency = 0
    count = 0
    avg_buf = 0

    def __init__(self):
        pass

    @classmethod
    def average_fuel_efficiency(cls, recv_msg) -> float:
        cls.avg_buf += cls.present_fuel_efficiency
        cls.count += 1
        cls.average_fuel_efficiency = round(avg_buf / count, 2)
        return cls.average_fuel_efficiency

    @classmethod
    def short_term_fuel_efficiency(cls, recv_msg) -> float:
        cls.present_fuel_efficiency = round(cls.speed * (1 / 3600) * (1 / cls.maf) * 14.7 * 710, 2)
        return cls.present_fuel_efficiency

    @staticmethod
    def oxygen_sensor(recv_msg) -> float:
        # we can make two data. first one is ratio, the other is mA
        return round((256 * recv_msg[3] + recv_msg[4]) * (2 / 65536), 2)

    @staticmethod
    def intake_manifold_absolute_pressure(recv_msg) -> int:
        return recv_msg[3]

    @classmethod
    def maf_sensor(cls, recv_msg) -> float:
        cal = round((recv_msg[3] * 256 + recv_msg[4]) / 100, 2)
        cls.maf = cal
        return cal

    @staticmethod
    def engine_load(recv_msg) -> float:
        return round((100 / 255) * recv_msg[3], 2)

    @staticmethod
    def engine_rpm(recv_msg) -> float:
        return round(((recv_msg[3] * 256) + recv_msg[4]) / 4, 2)

    @classmethod
    def vehicle_speed(cls, recv_msg) -> int:
        cls.speed = recv_msg[3]
        return recv_msg[3]

    @staticmethod
    def throttle(recv_msg) -> float:
        return round((recv_msg[3] * 100) / 255, 2)

    @staticmethod
    def short_fuel_trim_bank(recv_msg) -> float:
        return round(((100 / 128) * recv_msg[3]) - 100, 2)

    @staticmethod
    def long_fuel_trim_bank(recv_msg) -> float:
        return round(((100 / 128) * recv_msg[3]) - 100, 2)

    @staticmethod
    def short_fuel_trim_bank(recv_msg) -> float:
        return round(((100 / 128) * recv_msg[3]) - 100, 2)

    @staticmethod
    def intake_air_temperature(recv_msg) -> int:
        return recv_msg[3] - 40

    @staticmethod
    def throttle_position(recv_msg) -> float:
        return round((100 / 256) * recv_msg[3], 2)

    @staticmethod
    def engine_runtime(recv_msg) -> int:
        return 256 * recv_msg[3] + recv_msg[4]

    @staticmethod
    def traveled_distance(recv_msg) -> int:
        return 256 * recv_msg[3] + recv_msg[4]

    @staticmethod
    def fuel_tank_level(recv_msg) -> float:
        return round((100 / 255) * recv_msg[3], 2)

    @staticmethod
    def ambient_air_temperature(recv_msg) -> int:
        return recv_msg[3] - 40

CanDataType 이라는 Enum 클래스를 생성하여 각 PID를 관리하였고, 각 데이터 별로 응답에 대한 변환식이 다르기 때문에 변환을 담당할 클래스또한 작성하였다.

CanDataConvert 클래스는 각 PID에 맞게 적절한 변환식을 호출하기 위해서 응답에 맞는 handler를 생성하고 호출하는 형태로 구현하였다.

  • can_plugin.py
import can
import os
import subprocess
from libs import util
from libs.plugin import run_plugin_thread
from collections import deque
from time import sleep
from canutil import CanDataType, CanRequestMessage, CanDataConvert
from libs.base_plugin import BasePlugin
from typing import List

TOPIC = util.get_ipc_topic()

TEST_FIELDS = [
    'engine_load',
    'engine_rpm',
    'intake_manifold_absolute_pressure',
    'vehicle_speed',
    'throttle',
    'short_fuel_trim_bank',
    'engine_runtime',
    'traveled_distance',
    'fuel_tank_level',
    'ambient_air_temperature',
    'maf_sensor',
    'oxygen_sensor',
    'short_term_fuel_efficiency',
    'average_fuel_efficiency'
]

OPTION = {
    'channel': 'can0',
    'busType': 'socketcan_native'
}

INIT_COMMAND = '/sbin/ip link set can0 up type can bitrate 500000'


class SocketCanInitFailedException(Exception):
    def __init__(self):
        super().__init__('소켓 캔 초기화 패')


class CanPlugin(BasePlugin):
    # TODO: inherite base class and refactor below methods !
    def __init__(self, fields: List[str], option=None) -> None:
        super().__init__(fields, option=option)
        self.data_list = [
            x.upper() for x in fields
        ]
        self.enum_list = [
            getattr(CanDataType, x) for x in self.data_list
        ]
        self.req_messages_for_data = [
            getattr(CanRequestMessage(x), 'message') for x in self.enum_list
        ]
        self.data_len = len(self.data_list)
        self.recv_buffer = deque()
        self.return_buffer = deque()
        self._channel = option.get('channel', 'can0')
        self._bus_type = option.get('busType', 'socketcan_native')
        self._init_can()
        self.bus = can.interface.Bus(channel=self._channel, bustype=self._bus_type)

    def _init_can(self) -> None:
        os.system(INIT_COMMAND)
        print('socket can init complete')

    # TODO:
    # 1. set return_buffer & recv_buffer with property

    def _send_request(self):
        self.return_buffer.clear()
        print("버퍼 초기화")

        def is_valid_reply(message) -> bool:
            if message.arbitration_id != CanDataType.PID_REPLY.value:
                return False
            else:
                return True

        for message in self.req_messages_for_data:
            msg = can.Message(arbitration_id=CanDataType.PID_REQUEST.value, data=message, extended_id=False)
            #           print("this is will send can msg " , msg)
            self.bus.send(msg)
            sleep(0.01)
            while True:
                recv_data = self.bus.recv()
                if is_valid_reply(recv_data):
                    self.recv_buffer.append(recv_data)
                    break
                sleep(0.01)
                continue
        while len(self.recv_buffer) < self.data_len:
            recv_data = self.bus.recv()
            if is_valid_reply(recv_data):
                self.recv_buffer.append(recv_data)
        while self.recv_buffer:
            self.return_buffer.append(
                CanDataConvert.convert(
                    self.recv_buffer.popleft()
                )
            )

        return self.return_buffer

    def collect_data(self) -> None:
        try:
            self.data = list(self._send_request())
        except Exception as e:
            print('error occured when collect data ', e)

    # TODO : Calculate Fuel Efficiency before relay.
    def cal_fuel_efficiency(self):
        """
        https://stackoverflow.com/questions/44794181/fuel-consumption-and-mileage-from-obd2-port-parameters
        """
        maf = self.data[self.data_list.index('MAF_SENSOR')]
        speed = self.data[self.data_list.index('VEHICLE_SPEED')]

        fuel_efficiency = speed * (1/3600) * (1/maf) * 14.7 * 710

        return fuel_efficiency


def handler(event, context) -> None:
    pass


try:
    cp = CanPlugin(TEST_FIELDS, OPTION)
except Exception as e:
    print('failed to make can plugin :', e)


def run():
    run_plugin_thread(cp.entry)


if __name__ == '__main__':
    pass
else:
    run()

필요한 데이터를 호출하고, 응답하는 과정에서 우리가 원하는 데이터가 아닌경우 버리는 로직을 구성하였다.

데이터를 호출하는 코드를 한번 수행하고, 원하는 응답이 올 때 까지 기다리는 로직이 존재하며, 이 데이터들을 deque로 관리하여 데이터를 수집/저장 하는 과정에서의 불필요한 시간복잡도를 줄이고자 노력했다. (큰 의미는 없는 것 같다.)

수집한 데이터 송신 부분(실시간 앱 / mqtt / S3 등)

이렇게 수집된 데이터들은 모두 binder라고 불리는 프로세스로 넘겨지게 되며, binder 의경우 미리 설정되어있는 dispatcher들로 데이터를 전달한다.

즉 각 데이터들은 dispatch 되어서 각자의 용도에 맞는 형태로 변형되고 저장되고 송신된다.

storage dispatcher 의 경우. 라즈베리파이 로컬에 csv파일을 저장할 뿐 아니라, S3에도 csv파일을 저장하는 역할을 한다.

https://github.com/kimsehwan96/car-iot-platform-from-kpu/blob/master/greengrass/binder/dispatcher/storage_dispatcher.py.

websocket dispatcher의 경우 실시간 리액트 앱에 socketio를 통해 데이터를 전달하는 역할을 한다.

https://github.com/kimsehwan96/car-iot-platform-from-kpu/blob/master/greengrass/binder/dispatcher/websocket_dispatcher.py

실시간 리액트 앱 구성

실시간 리액트앱은 https://github.com/kwhong95/kpu_sp_rt_dashboard 에서 구현중이다.

데이터 수집 / 가공 프론트 코드

import { createContext, useState, useEffect, useContext } from "react";
import title from '../libs/dataTitle.json';
import unit from '../libs/dataUnit.json';
import io from 'socket.io-client';

const URL = 'http://localhost:5000/binder'
const socket = io(URL)

const RealtimeDataContext = createContext([]);

const RealtimeDataProvider = ({ children }) => {
    const [ payloads, setPayloads ] = useState([]);

    return (
        <RealtimeDataContext.Provider value={[ payloads, setPayloads ]}>
            {children}
        </RealtimeDataContext.Provider>
    );
};

function useRealtimeData () {
    const context  = useContext(RealtimeDataContext);
    const [payloads, setPayloads] = context;
    const [temp, setTemp] = useState({});
    const [drivingData, setDrivingData] = useState([]);
    const [fuel, setFuel] = useState([]);
    const [realtimeFuelEfficiency, setRealtimeFuelEfficiency] = useState({});
    const [speed, setSpeed] = useState({});
    const [RPM, setRPM] = useState({});
    const [ResidualFuel, setResidualFuel] = useState({});

    useEffect(() => {
        socket.on('rtdata', data => {
            const jsonData = JSON.parse(data);
            const result = jsonData.fields.map(
                (field, idx) => Object({
                    id: field,
                    title: title[field],
                    value: jsonData.values[idx],
                    unit: unit[field],
                }))


            setPayloads(result);

            function findPayload(title) {
                for(let i = 0; i<result.length; ++i) {
                    if(result[i].title === title) {
                        return result[i];
                    }
                }
            }

            setTemp(findPayload('외부 공기 온도'));
            setDrivingData([findPayload('주행 거리'), findPayload('운행 시간')]);
            setFuel([findPayload('평균 연비'), findPayload('잔여 연료량')]);
            setRealtimeFuelEfficiency(findPayload('순간 연비'));
            setSpeed(findPayload('속도'));
            setRPM(findPayload('엔진 rpm'));
            setResidualFuel(findPayload('잔여 연료량'));

        })


    }, [ setPayloads ]);

    return {
        payloads,
        temp,
        drivingData,
        fuel,
        realtimeFuelEfficiency,
        speed,
        RPM,
        ResidualFuel,
    };
}

export { RealtimeDataProvider, useRealtimeData }

현재 websocket dispatcherlocalhost:5000/binder 라는 url을 통해 데이터를 보내주고 있다.

따라서 프론트 코드 또한 해당 url로부터 데이터를 전달 받을 수 있다.

위 데이터를 context로 관리하여 여러 컴포넌트에서 직접 context에 저장된 real time data를 손쉽게 접근 / 시각화 가능하다.

import React from "react";
import styled from 'styled-components';
import {commonCard, commonHeader, commonWrapper} from '../../theme/commonStyles';
import {useRealtimeData} from "../../context/RealtimeDataProvider";
import Gauge from "../../assets/Gauge";

const FuelEfficiencyBox = () => {
    const { realtimeFuelEfficiency } = useRealtimeData();
    return (
        <Wrapper>
            <Card>
                <Header>
                    {realtimeFuelEfficiency.title}
                </Header>
                <Content>
                    <Gauge
                        value={realtimeFuelEfficiency.value}
                        units={realtimeFuelEfficiency.unit}
                        min={0}
                        max={100}
                    />
                </Content>
            </Card>
            <Card>
                <Header>
                    유류비
                </Header>
                <Content>
                    계산로직필요!
                </Content>
            </Card>
        </Wrapper>
    );
}

export default FuelEfficiencyBox;

const Wrapper = styled.div`
  ${commonWrapper}
`;

const Card = styled.div`
  ${commonCard}
`;

const Header = styled.div`
  ${commonHeader}
`;

const Content = styled.div`
  margin-top: 20px;
  margin-right: 10px;
  display: flex;
  float: right;
`;

사용 기술 스택

  1. AWS

    • Lambda
    • GreenGrasss
    • S3
    • CloudWatch
  2. Front

    • React
    • Ant Design
    • Socket.io
    • webpack
    • yarn
  3. DevOps

    • Jenkins
    • Serverless(with AWS)
    • Docker(Greengrass Deploy)
  4. etc.

    • CAN Protocol
    • OBD2
    • Raspberry Pi
    • Linux (Debian, Raspbian)

car-iot-platform-from-kpu's People

Contributors

jungyeonpark avatar kimsehwan96 avatar kwhong95 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

car-iot-platform-from-kpu's Issues

1분 단위로 csv파일을 만들어서 S3에 저장하는 코드

greengrass 실행 환경에서(즉 추후에는 실제 차량에 장착될 하드웨어에 들어가는 소프트웨어 상태) 1분마다 수집한 실시간 데이터를 특정한 날짜 포멧 ex : kimsehwan-2020-11-01-19-01.csv와 같이

{id}-{year}-{month}-{day}-{hour}-{miniute}.csv 규격의 파일을 특정 S3 디렉터리 및 하드웨어 로컬에 저장하는 코드가 필요합니다.
예시 :

import os
import boto3

s3 = boto3.resource('s3')

class TestClass:

    def save_to_s3(self, fileName):
        self.check_s3_status()
        s3.meta.client.upload_file(
            os.getcwd() + '/' + fileName,
            S3_SAVE_BUCKET,
            DEVICE_ID + '/' + fileName
        )
        print("save local csv file into S3 !! {}".format(fileName))

이런 boto3의 s3 api를 사용해서 구현 해야 합니다.

또한 이런 코드들은 하드웨어에서 데이터 저장에 관련된 행동들을 수행하므로 하나의 Class로 구현해야 겠습니다.

class StorageManager:
    def __init__(self):
        pass

    def save_local(self, csv_data, file_name, path):
        pass

    def save_s3(self, csv_data, file_name, path):
        pass

    def save_csv(self, data):
        ``` logics
        ```
        return stored_csv

또한 이런 저장에 관련된 코드들은 메인 코드에서 해당 클래스의 instance를 호출하여 사용하고, 1초마다 실행되는 thread로 생성하여 , data를 전달 받은 것을 1초마다 처리, 60초마다 csv저장 및 local & s3 저장 하는 방식으로 구현하면 좋겠습니다.

데이터를 수집하는 플러그인 코드 (및 테스트용 코드)

실제 차량 <-> 하드웨어(플러그인 코드) <-> 바인더 <-> cloud 형태의 구조로 데이터를 전달 할 예정입니다.

플러그인 코드는 하드웨어로부터 rawdata를 수집하고, rawdata를 우리가 사용하기 편하게 변환하여 바인더로 보내는 역할을 할 예정입니다.

실제 환경 & 테스트 환경 용도로 두 셋을 만들어야 하며,

플러그인 코드는 교체 가능하도록 코드를 구현해야 합니다.

1시간, 1일, 1주일, 1달, 1년 통계 API 구성하기.

실시간 데이터는 S3에 각 디바이스 id별로 저장되고 있음.

S3로부터 1시간단위, 1일, 1주일 단위의 CSV 파일을 일괄 다운로드후 (Lambda 를 이용해야 할 듯..)
CSV파일을 이용하여 평균 수치 통계를 내고, DynamoDB 혹은 RDB(MySQL 등등)에 저장하기.

-> 고민 해야 할 조건

DB의 고유 키값으로 무엇을 정의할지 생각해보기

-> 유저 id 혹은 디바이스 명을 키로 하기에는, 한 유저 혹은 한 디바이스 내에 통계건이 여러개 존재 할 수 있음.
-> 시간을 키로 하기에도 부담스러움

-> 유저 id + 시간 과 같은 임의의 우리가 예측 가능한 규칙을 가지는 키를 구성하고,
각 필드별 평균값을 DynamoDB에 저장하는 구조가 제일 유리해 보이긴 하는데...

@jungyeonPark

Can. 플러그인 로직 변경

Can은 req & res 구조입니다,

req를 20개정도 하고, 이후에 res를 받으려고하면 이미 req 보낸 즉시 응답한 데이터들이 날아가게됩니다.

현재 제 코드는 req를 20개 한 이후 바로 res를 20개정도 기다리는데, res가 20개가 채워지지 않습니다. (이미 req하는 그 당시에 res를 보내고있었기 때문에). 즉 타이밍 이슈가 발생합니다.

해결하기 위해서는 스레드를 2개로 분리, tx thread와 rx thread를 사용하고, 스레드간 queue를 이용하여 req에 대한 응답을 그 즉시 rx 스레드 큐에 넣어주어야 합니다ㅣ..

프론트에서 받을 json 양식 정의

highchar나 D3같은 시각화 라이브러리에서 필요로 하는 json 양식들이 있을 겁니다. 이를 공유해주면 백엔드에서 올바른 형식으로 제공해주는 작업을 해야 합니다.

웹 UI & UX 명세화

  1. 하림이와 같이 UI UX구조를 구성해보기
    (선행 조건 -> @kimsehwan96 CAN 통신 후 받을 수 있는 데이터 모두 나열하고,
    사용자가 어떤 데이터를 원할지 고민 후 구성하기)

github action aws 토큰 교체 이슈

현재 기존 token은 deprecated된 토큰이므로, 새로운 토큰을 github secret에 등록해서, 프론트앱 배포 가능하도록 조치하겠습니다.

1시간 단위 통계 작성 후 GraphQL API 작성

1시간 단위 통계 데이터를 DynamoDB에 저장한 이후에,

GraphQL 을 이용해 클라이언트가 소비 할 수 있게 설계 해야 합니다.

현재 구조는

  1. 통계 데이터를 만들 람다에, 주기적으로 호출되는 트리거용 람다(1시간 단위로 이벤트를 발생시키는 람다)를 이용해 통계 데이터 람다를 호출합니다.

  2. 통계데이터를 만들 람다는 1시간마다 이벤트를 받아서, 해당 시간대의 datetime 정보를 확인 한 이후, 1시간 단위의 raw_data를 다운로드 받아서
    pandas를 이용해 평균, 최소, 최댓값을 계산하고, DynamoDB에 직접 입력합니다.

functions:
  lmd_trigger:
    handler: lmd_trigger.handler
    name: ${self:service}-stat-trigger
    events:
      - schedule: cron(0 * * * ? *) # every 60 mins
  lmd_get_trigger:
    handler: lmd_get_trigger.handler
    name: ${self:service}-stat-get-trigger

@jungyeonPark

우리 플랫폼 코드의 Lambda 디렉터리를 보면 serverless.yml이 있습니다.

여기서 예전에 우리가 rest api 만들 때는 event가 http 였는데, 여기는 schedule 이죠?

schedule은 cron 표현식을 이용해 주기적으로 람다에 이벤트를 발생시키는 방법입니다.

schedule: cron(0 * * * ? *) # every 60 mins 여기서 cron(0 * * * ? *) 이 60분마다 동작하겠다는 의미구

cron(0/30 * * * ? *) 이건 30분마다 동작시키겠다는 의미에요. 60분마다라는건 언제 배포하든

12시 00분, 13 시 00분 이렇게 동작합니다! 주기적으로

여기서는 lmd_trigger 라는 람다가 1시간 단위로 동작할거구, 이 1시간마다 동작하는 람다 코드를 설명해줄게요

import boto3
import abc
import os
import datetime
import time
import json

BUCKET_NAME = os.environ.get('RAW_BUCKET', 'if this texts show, Env var wasnt detectd')
TRIGGERD_LAMDA = os.environ.get('TRIGGERD_LAMDA', 'batch-lambda-dev-stat-get-trigger')
os.environ['TZ'] = 'Asia/Seoul'

lmd = boto3.client('lambda')

def timenow_dt_strftime():
    time_now = time.time()
    dt = datetime.datetime.fromtimestamp(time_now)
    return dt.strftime('%Y-%m-%d-%H-%M')
    #invoke 요청 할 당시의 datetime object
    
def handler(event, context):
    print("this lambda has invoked!! (cron test lambda)")
    print("this is BUCKET_NAME {}".format(BUCKET_NAME))

    #dt = timenow_dt()
    #dt = datetime.datetime(2020,9, 29, 22, 20)
    print(timenow_dt_strftime())

    try:
        lmd.invoke(
            FunctionName=TRIGGERD_LAMDA,
            InvocationType='Event', Payload=json.dumps({'dt' : timenow_dt_strftime()}))
    except  Exception as e:
        print("Error occured when triggering another lambda! {}".format(e))

여기서 보면 이 람다가 1시간마다 호출 되었을 때

lmd.invoke <- 이부분ㅇ ㅣ실행되는데, invoke라는건 람다를 호출한다는 의미에요.

그러니까 1시간 마다 다른 람다를 호출하도록 만든거에요.

다른 람다는 1시간마다 호출 받아서 S3의 sehwan-an2-edge-dev-rawdata 라는 S3 버킷에 담겨있는 raw data를 다운로드 받고

pandas 이용해서 통계를 낼거에요 (정연이가 지금 하고있는 작업)

rawdata를 내려받는 코드는 test.py코드에 있는데

if __name__=="__main__":
    S3_KEYS = [
        'test-group_Core/rawdata_2020-09-29-22-20.csv',
        'test-group_Core/rawdata_2020-09-29-22-21.csv',
        'test-group_Core/rawdata_2020-09-29-22-23.csv',
        'test-group_Core/rawdata_2020-09-29-22-24.csv'
    ] # 향후 트리거된 람다로부터 이 키들을 생성 할 예정임.
    raw_ary = []
    for v in S3_KEYS:
        try:
            obj = s3.get_object(
                Bucket = BUCKET_NAME,
                #Key = '{}/rawdata_2020-09-29-22-20.csv'.format(AWS_THINS_NAME)
                Key = v
            )
        except s3.exceptions.NoSuchKey as e:
            print("error occured , No such Key") #key 없을 때 예외 처리 로직.
        try:
            raw_data = obj.get('Body').read()
            raw_ary.append(make_stream_obj_to_ary(raw_data))
        except Exception as e:
            print("error occured {}".format(e))

이 코드에 있죠?

obj = s3.get_object(
    Bucket = 버킷 이름
    Key = 다운로드 받을 파일 이름
)

이런 구조고 응답중에서 Body라는 친구가 실제 바이너리 데이터를 담고 있어서

obj.get('Body').read()

이렇게 접근하면 b ' ~~~~' 이 데이터를 볼 수 있는거에요

그리고 우리 플랫폼 코드 lambda 디렉터리에 있는

lmd_get_trigger.py 가 1시간마다 호출을 '당할' 람다구여. 이 람다에 통계데이터를 만들고, DynamoDB에 저장하는 로직이 들어가야겠죠

지금은 test.py에서 해당 작업을 하고 있는거에요

스크린샷 2020-10-04 오전 1 20 20

여기 보면 dynamodb-resource 로 시작하는 세개의 데이터 베이스가 바로 통계 데이터를 저장할 데이터 베이스입니다.

데이터베이스 구조는

해시키 : deviceid
레인지키 : timestamp

이렇게 구성되어있는데. 둘다 일종의 데이터베이스에서의 '키' 라고 보면 되구.

deviceid는 동일하더라도, 통계가 만들어진 timestamp가 다를 것 이기 때문에 DB에서 구분하는데 문제가 없을거에요.

예 :
deviceid timestamp 평균 최대 최소
test_device 21시 00분 100 200 0
test_device 22시 00분 120 150 20

이런식으로 말이에요 .

그리고 나서 GraphQL 인터페이스를 만들 건데 이건 또 자세히 설명 해줄게요

Graphql은 restapi와 다르게 하나의 엔드포인트에서 모든 작업을 처리하기 때문에

우리가 직접 resolver 라는걸 만들어서, 어떤 쿼리에 어떤 응답을 줄지 만들어 줘야 하고.

이 resolver는 python 코드 혹은 vtl 이라고 부르는 언어로 작성 할 수 있어요.

이거는 한번 찾아보면 좋을듯

1시간 단위로 통계를 내 DB에 저장하는 AWS lambda

1시간 단위로 이미 s3에 저장된 데이터들을 이용하여 통계를 내는 람다를 만들어야 합니다.

첫번째로 1시간마다 invoke 되는 람다가 필요합니다. 이 람다는 serverless 에서 cron 표현식을 통해 1시간마다 실행 될 수 있으며,

이 1시간마다 실행되는 람다는, 실제 통계를 만들어 낼 다른 람다를 invoke(호출)하여, 그 람다는 S3에 저장된 csv데이터를 모두 다운로드 받아, 통계를 내고, 특정 Dynamodb에 id, time, 통계값 들을 저장 하는 코드가 되어야 합니다.

local view 테스트 방법 공유.

localview 에서 소모할 socketio 데이터를 테스트 하는 방법을 공유드립니다.

<!doctype html>
<html>
<body>
<script src="https://cdn.socket.io/socket.io-1.2.0.js"></script>
<script>
    const binder = io("http://3.34.87.77:5000/binder");

        binder.on('rtdata', (data) => {
            console.log('binder buffered: ', data);
        });

</script>
<H1> Hello world</H1>
</body>
</html>

우선 이렇게 간단한 코드로 테스트 완료하였습니다.

제가 개인적으로 사용하는 서버의 ip가 3.34.87.77 입니다.

그리고 socketio를 사용하기 위한 포트는 5000번입니다.

그리고 실시간 데이터가 들어오는 네임스페이스는 /binder 입니다.

추후 실시간 유류비 계산과 같은 데이터는 네임스페이스가 달라지겠지만, 그 외 실시간 데이터는 다 여기서 들어옵니다.

{
  "fields": [
    "engine_load",
    "engine_coolant_temp",
    "engine_rpm",
    "vehicle_speed",
    "maf_sensor",
    "o2_voltage",
    "throttle",
    "short_fuel_trim_bank",
    "long_fuel_trim_bank",
    "intake_air_temperature",
    "engine_runtime",
    "traveled_distance",
    "fuel_tank_level",
    "ambient_air_temperature",
    "engine_oil_temperature",
    "transmission_actual_gear"
  ],
  "values": [71, 86, 70, 99, 43, 49, 40, 71, 2, 39, 98, 61, 5, 60, 90, 70],
  "timestamp": 1618669837.2445838
}

위와 같은 형태로 데이터를 받아볼 수 있습니다. 프론트 코드쪽에서 const binder = io("http://3.34.87.77:5000/binder"); 부분의 url 부분 (3.34.87.77) 부분만 추후 로컬에서 사용할때 localhost로 바꿔주면 동일한 동작을 할 것입니다.

정리내용

테스트를 위한 url은 http://3.34.87.77:5000 (추후 로컬에서 사용 시 http://localhost:5000 으로 변경할 예정)

실시간 데이터의 네임스페이스는 /binder, 이벤트 명은 rtdata

데이터 구조는 위에 작성한 json 데이터와 동일함.

유류비 관련 데이터는 개발 진행중, 개발 완료후 공유 예정

@kwhong95 확인 부탁드립니다.

Influxdb 데이터 구조 설계 및 분석

현재 InfluxDB에 데이터를 다음과 같은 코드가 넣어주고 있습니다.

class InfluxDispatcher(BaseDispatcher):
    def __init__(self):
        super().__init__()
        self.client = InfluxDBClient(url=URL, token=TOKEN)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)

    def make_points(self, data: str):
        dict_data = json.loads(data)
        point_queue = deque([])
        fields = dict_data.get('fields')
        values = dict_data.get('values')
        for i, v in enumerate(fields):
            point_queue.append(
                Point(v)\
                .tag('user', 'kim')\
                .field(v, values[i])\
                .time(datetime.utcnow(), WritePrecision.NS)
            )
        return point_queue

    def relay(self, data: str):
        points = self.make_points(data)
        for point in points:
            self.write_api.write(BUCKET, ORG, point)

각 데이터의 field명은 우리가 수집한 데이터의 각각의 이름이며 (rpm, 속도 등등)
tag에는 user : kim 과 같이 넣어주고 있습니다.

스크린샷 2021-04-13 오전 12 40 12

InfluxDB 공식 문서를 더 참고하여 어떻게 데이터 설계를 하는것이 좋을지 분석한 이후

field / measurement / tag 등을 올바르게 정의 할 필요가 있습니다.

이 부분이 끝난 이후에는 API 호출로 원하는 범위의 데이터를 추출한다던지, 평균, 최소, 최댓값을 받아온다던지, 사전에 정의된 함수를 수행한 결과값을 받는다던지 여러 응용하여 받을 수 있는 기능이 있는지 분석할 예정입니다.

graph ql API 테스트용 코드

통계 데이터를 react client에서 사용하기 위해서는, Dynamodb에 있는 값을 리턴 해 줄 수 있는 graphql 인터페이스가 필요합니다.

Appsync를 이용해 구현 할 예정이며,

구현해야 하는 내용은

graphql 스키마,
graphql 리졸버 입니다.

스키마는 일종에 데이터가 어떤 형식으로 구조가 되어있는지 정의 해주는 부분이고,

리졸버는 실제 쿼리문이 왔을 때 어떻게 처리해야하는지에 대한 내용이 서술된 부분입니다.

schema {
    query: Query
    mutation: Mutation
}

type Query{
    getProfile(userID: String!): Profile
}

type Mutation {
    createProfile(input: ProfileInput): Profile
    deleteProfile(input: deleteProfileInput): Profile
    updateProfile(input: updateProfileInput): Profile
}

type Profile {
    userID: ID!
    fields: [String]
    carType: String
    madeAt: AWSDateTime
    updatedAt: AWSDateTime
}

위와 같은 양식이 graphql 스키마이고, 실제 객체들을 정의하는 부분입니다.

resolver는 쿼리나 뮤테이션 필드(명령)이 들어왔을때 어떤 db에 접근하고, 어떻게 리턴해줄 것인지에 대한 내용들을 정의하는 부분입니다.

{
  "version" : "2017-02-28",
  "operation" : "PutItem",
  "key" : {
    "userId": $util.toJson($ctx.args.input.userID),
    "madeAt" : $util.dynamodb.toDynamoDBJson($util.time.nowISO8601())
  },
  "attributeValues" : $util.dynamodb.toMapValuesJson($ctx.args.input)
}

위 예제는 createProfile 명령이 들어왔을때 서버에서 어떻게 응답할지를 정의 한 내용입니다.

service: "appsync-api-cariot-dev"

provider:
  name: aws
  runtime: python3.7

plugins:
  - serverless-appsync-plugin

custom:
  appSync:
    name: cariot-api-dev
    authenticationType: API_KEY
    mappingTemplatesLocation: mapping-templates
    schema: 
      - schema.gql
    dataSources:
      - type: AMAZON_DYNAMODB
        name: ProfileTable
        config:
          tableName: ProfileTable

      - dataSource: ProfileTable
        type: Mutation 
        field: createProfile
        request: Profile.createProfile.request.vtl
        response: Profile.createProfile.response.vtl

위는 serverless yml입니다. 이렇게 어떤 명령어가 들어올때 어떤 vtl을 사용할 것인지 정의 해줄 수 있습니다.

아키텍쳐 재설계

베이스가 되는 기능부터 구현한 이후, 시간이 될 때 추가 기능을 구현하기로 방향이 결정되었음.

따라서 확장성있으면서, 베이스가 되는 기능은 먼저 구현해야 함

베이스가 되는 기능

  1. Mqtt로 AWS Iot Broker 혹은 개인 구축 Mqtt Broker에 데이터 Publish
  2. 디바이스에 들어가는 코드는 여러 Dispatcher들에게 추가 기능을 위임
  3. Dispatcher에는 S3 CSV업로드, InfluxDB, MySQL등 DB에 데이터 전송, 필요할경우 socketio 및 http 엔드포인트 제공 등을 할 예정

계획

  1. 아키텍쳐 구성도 작성
  2. 기한 설정
  3. 우선 순위 결정 (Project 칸반보드)

H/w -> cloud 간 코드 아키텍쳐 정의 필요

H/W에서 aws로 데이터를 보내는 코드들의 구조를 생각해볼 필요가 있습니다.

dispathcer 개념으로, 1초마다 데이터를 전달 받아 실행하는 클래스 (s3 & local에 저장하는 클래스 / Machine Learning 클래스 등등) 을 구현하고,

main 파이썬 코드에서는 dispatcher를 호출해서 사용합니다. (디스패쳐는 향후 확장간 여러개가 생길 수 있습니다)

그리고 main 코드에서 바인더 코드를 호출해 사용하고, 바인더 코드는 plugin 코드를 호출해 사용하는 방식으로 가는 것이 확장성에 용이 할 것 같습니다.

전체적인 개발 flow는 OOP로 진행 될 예정입니다.

실시간 유류비 계산을 위한 전국 주유소 유가 평균정보 API 도입

스크린샷 2021-04-12 오후 9 28 30

@kwhong95

오피넷이라는 곳에서 유가 정보 관련한 API를 호출 할 수 있습니다.

무료버전 API는 일 1500건 무료 호출이 가능하므로, 기기 최초 가동시 1번 호출하는것을 가정했을 때 졸업작품 내에서는 한도를 초과하지 않겠습니다.

이 API는 greengrass 람다 (라즈베리파이 디바이스 내에서 돌아가는 코드)에서 호출할 예정이며, 프론트에서 소비 가능하게끔 localhost로 호출 가능한 API 엔드포인트를 제공하겠습니다.

스크린샷 2021-04-12 오후 9 30 26

React 내에서 appsync 연동 (amplify 이용하지 않고)

https://github.com/benawad/aws-appsync-example

우선 구조는 실제 모든것이 그려질 index.js 위에 Apollo client를 씌워서 구현하는 듯.

import * as React from "react";
import * as ReactDOM from "react-dom";
import Amplify from "@aws-amplify/core";
import ApolloClient from "apollo-boost";
import { ApolloProvider } from "react-apollo";

import config from "./aws-exports";
import App from "./App";
import registerServiceWorker from "./registerServiceWorker";

Amplify.configure(config);

const client = new ApolloClient({
  uri:
    "https://lpttylfopzdczo5gi7fcwoqs2m.appsync-api.us-east-1.amazonaws.com/graphql",
  headers: {
    "X-Api-Key": "da2-rvyi25lb4natdhd2igwdvtalee"
  }
});

ReactDOM.render(
  <ApolloProvider client={client}>
    <App />
  </ApolloProvider>,
  document.getElementById("root") as HTMLElement
);
registerServiceWorker();

uri는 우리가 배포한 graphql 엔드포인트 주소,
x-api-key는 백엔드 스택 배포할때 나오는 키를 넣어주면 됨
apollo 클라이언트 이용하는 구현 방법임

import * as React from "react";

import { Form } from "./Form";
import { createBlog } from "./graphql/mutations";
import { Blogs } from "./Blogs";
import { Mutation } from "react-apollo";
import gql from "graphql-tag";
import { CreateBlogMutation, CreateBlogMutationVariables } from "./API";
import { listBlogs } from "./graphql/queries";

export default class App extends React.Component {
  public render() {
    return (
      <div style={{ textAlign: "center" }}>
        <Mutation<CreateBlogMutation, CreateBlogMutationVariables>
          mutation={gql(createBlog)}
        >
          {mutate => (
            <Form
              onSubmit={async input => {
                const response = await mutate({
                  variables: { input },
                  refetchQueries: [{ query: gql(listBlogs) }]
                });
                console.log(response);
              }}
            />
          )}
        </Mutation>
        <Blogs />
      </div>
    );
  }
}

위 내용을 살펴보면, import { Mutation } from "react-apollo";를 통해 Mutation 기능을 불러옴

export const createBlog = `mutation CreateBlog($input: CreateBlogInput!) {
  createBlog(input: $input) {
    id
    name
    posts {
      items {
        id
        title
      }
      nextToken
    }
  }
}
`;

위 js파일은 query를 날리기위한 기본적인 쿼리문 작성

-> 좀 더 연구해서 알려드림

로컬 뷰 앱 배포 방법 결정. (테스트 성공)

https://github.com/kimsehwan96/car-iot-platform-from-kpu/tree/master/local_view_test

우선 로컬 뷰에 리액트 앱을 띄우는 테스트는 성공적으로 마무리 되었습니다.

빌드된 리액트 앱이 저희 레포 특정 경로에 들어가서 배포가 되어야 하는데요 (/greengrass/webServerDaemon/app)

스크린샷 2021-04-16 오전 12 31 28

리액트 앱 빌드 시점과, greengrass 람다들 빌드 시점을 결정해야 합니다.

이는 좀 고민해볼게요.. 우리 레포 저 위치에 리액트 앱을 빌드해서 업뎃하느냐.. 아니면 greengrass 람다를 빌드하는 시점에 git clone 해와서 앱을 빌드하느냐 두가지 중 하나일 듯

@kwhong95

스크린샷 2021-04-16 오전 12 29 06

위는 greengrass lambda (리액트 앱 호스팅을 위한 람다인 webServerDaemon 이 리액트 앱을 띄워주는 스샷)

React Build 이슈

스크린샷 2020-11-09 오후 4 43 29

@kwhong95 PR 올려주신 코드 merge하고 자동 배포 과정에서

common/state 가 없다는 빌드 에러가 나오는데 혹시 스샷에 해당하는 파일이 PR에 반영이 안된건가요?

json양식 정의 필요

highchar나 D3같은 시각화 라이브러리에서 필요로 하는 json 양식들이 있을 겁니다. 이를 공유해주면 백엔드에서 올바른 형식으로 제공해주는 작업을 해야 합니다.

유저 로그인 기능

-> 프로토 타입에서는 중요하지 않지만. 테스트 삼아 해보면 좋을 내용

GraphQL 코드 개발

  1. GraphQL 테스트 코드 개발 (간단한 Query, Mutation)

  2. 리졸버 개념 이해하기 (Lambda & vtl)

  3. Schema.gql 구조 회의 후 결정하기

  4. Web 혹은 Device에서 사용할 데이터 포맷 결정하기
    ex) 받을 데이터 목록 & 유저 정보 등등

  5. Dynamo DB와의 연동 -> DynamoDB 리소스 생성

  6. RDS를 써야 하는 부분이 있을지 고려하기

각 dispatcher thread에 lock을 걸어야 하는지 확인.

현재 device로부터 받은 값을 binder라는 프로세스가 mqtt 를 통한 IPC로 값을 전달 받고,

binder라는 프로세스는 각 용도의 dispatcher로 받은 데이터를 전달해줍니다. (relay method)

이 때 각 디스패쳐는 각각 구현된 로직에 따라 데이터를 처리하게 되는데

  1. DB에 값을 저장하는 쿼리 전송(mysql, influxdb 등)
  2. Device내 csv파일 저장 및 주기적으로 S3 업로드
  3. 로컬 websocket 서버에 broadcase (for socketio)

상기 네가지 정도 로직을 수행해야 합니다.

이 때 각 스레드를 생성해서 수행하도록 구현하였는데, 이 때 공유되는 자원이 있는지 파악하고 있다면 lock을 걸어주어야 합니다.

테스트를 위한 greengrass 코드.

can 통신을 하기 전, mqtt를 이용하여 AWS상에 실제 데이터와 유사한 방식으로 데이터를 올려줄 필요가 있습니다.

greengrass s/w는 virtualbox 우분투 가상 머신 위에 설치해서 사용 할 예정입니다.

우선 mqtt 로 publish하는 코드를 만들 필요가 있습니다.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.