#------------------------------------------------------------------------------- # Author: Nelso G. Jost (nelsojost@gmail.com) # License: GPL # Purpose: Get data from the board via serial and send it to the server. #------------------------------------------------------------------------------- from __future__ import print_function, division from datetime import datetime from pprint import pprint import logging import os import requests import subprocess import serial import sys import time import json import yaml import jinja2 SETTINGS_YAML_SCHEMA =\ """ SERVER: API_POST_URL: str LOGGER: READING_INTERVAL: {seconds: int, minutes: int, hours: int, days: int} DATETIME_FORMAT: str SENSORS: - {nickname: str, data_format: str} ARDUINO: SERIAL_PORT: str BAUD_RATE: int FILES: DATALOG_CSV: str DATALOG_CSV_SEP: str SERVER_OUTGOING_JSON: str """ def make_current_file_path(filename): ''' Append filename to the current __file__ path. ''' return os.path.join(os.path.abspath(os.path.dirname(__file__)), filename) class Meteorologger: ''' Provides a series of mechanisms to collect sensor data from the board via serial port, save locally on the machine and upload to the server. The functionality of this class relies heavily on the config file given by the SETTINGS_FILENAME attribute, which uses YAML syntax. Call the run() method to start the logging process. ''' EXECUTION_LOG_FILENAME = 'logs/execution.log' SETTINGS_FILENAME = 'settings.yaml' CSV_SEP = ',' DATA_FORMATS = {'int': int, 'float': float, 'str': str} SERIAL_READ_TIMEOUT = 1.5 # seconds FIND_PORT_TIMEOUT = 10 # seconds BOARD_RESET_TIMEOUT = 2 # seconds BOARD_RESPONSE_DELAY = 0 # seconds verbose = True def __init__(self, verbose=False): self.verbose = verbose self.load_settings() def _decode_bytes(self, raw_bytes, encoding='ascii'): result = None try: result = raw_bytes.decode(encoding).strip() except: if self.verbose: print("Invalid bytes!") return result def _getCFG(self, *args, expected_type=None): r = self.CFG for key in args: try: r = r[key] except: raise Exception("Configuration file '{}' is missing the " "required key\n {}" .format(self.SETTINGS_FILENAME, ': '.join(args))) if expected_type is not None: try: if expected_type is int and isinstance(r, float): raise TypeError elif expected_type is str and not isinstance(r, str): raise TypeError else: return expected_type(r) except: raise TypeError("Expected {} on key {} but got\n {}" .format(expected_type, '/'.join(args), r)) return r def load_settings(self): ''' Load the configuration file onto the self.CFG attribute. Some keys will be tested and filenames will be normalized. ''' with open(self.SETTINGS_FILENAME) as f: self.CFG = yaml.safe_load(f) self.API_POST_URL = self._getCFG('SERVER', 'API_POST_URL', expected_type=str) self.SENSORS_CSV_LINE = 'read' + self.CSV_SEP + self.CSV_SEP.join( [d['nickname'] for d in self._getCFG('SENSORS', expected_type=list)]) self.DATALOG_CSV_FILENAME = self._getCFG( 'FILES', 'DATALOG_CSV', expected_type=str) self.SERVER_OUTGOING_FILENAME = self._getCFG( 'FILES', 'SERVER_OUTGOING_JSON', expected_type=str) # convert raw str into normal escaped str (e.g., r'\\t' --> '\t') self.DATALOG_CSV_SEP = bytes( self._getCFG('FILES', 'DATALOG_CSV_SEP', expected_type=str), 'utf8').decode('unicode_escape') self.SERIAL_PORTS = [p.strip() for p in self._getCFG('ARDUINO', 'SERIAL_PORT', expected_type=str).split(',')] self.BOARD_RESPONSE_DELAY = self._getCFG('ARDUINO', 'RESPONSE_DELAY', expected_type=int) self.READING_INTERVAL_SECONDS =\ self._getCFG('LOGGER', 'READING_INTERVAL', 'seconds', expected_type=int) + \ 60 * self._getCFG('LOGGER', 'READING_INTERVAL', 'minutes', expected_type=int) + \ 3600 * self._getCFG('LOGGER', 'READING_INTERVAL', 'hours', expected_type=int) + \ 86400 * self._getCFG('LOGGER', 'READING_INTERVAL', 'days', expected_type=int) self.DATETIME_FORMAT = self._getCFG('LOGGER', 'DATETIME_FORMAT', expected_type=str) def create_json_data(self, raw_line): ''' Given the raw serial line response (expected to be a CSV line), returns a JSON dict with sensor data including the datetime field. ''' raw_sensor_data = { 'datetime': datetime.now().strftime(self.DATETIME_FORMAT), 'sensors': {}} for i, v in enumerate(raw_line.split(self.CSV_SEP)): v = v.strip() type_ = self.DATA_FORMATS[self.CFG['SENSORS'][i]['data_format']] nickname = self.CFG['SENSORS'][i]['nickname'] try: v = type_(v) except: logging.error("Cannot convert value '{}' read from {} to {}" .format(v, nickname, type_)) continue raw_sensor_data['sensors'][nickname] = v logging.info("Resulting JSON: {}".format(raw_sensor_data)) return raw_sensor_data def write_data_log(self, json_data): ''' For backup purposes, write the given JSON data onto the file DATALOG_CSV as specficied on self.SETTINGS_FILENAME. ''' csv_line = json_data['datetime'] + self.DATALOG_CSV_SEP for sensor in self.CFG['SENSORS']: if sensor['nickname'] in json_data['sensors']: csv_line += str(json_data['sensors'] [sensor['nickname']]) csv_line += self.DATALOG_CSV_SEP csv_line = csv_line[:-1] try: with open(self.CFG['FILES']['DATALOG_CSV'], 'a') as f: f.write(csv_line + '\n') logging.info("Done! CSV line '{}' was appended to the file '{}'" .format(csv_line, self.CFG['FILES']['DATALOG_CSV'])) except: logging.error("Unable to write data log at '{}'" .format(self.CFG['FILES']['DATALOG_CSV'])) def send_data_to_server(self, json_data): logging.info("URL: {}".format(self.API_POST_URL)) r = None try: if os.path.exists(self.CFG['FILES']['SERVER_OUTGOING_JSON']): logging.info("Outgoing data exists! Trying to send it first..") with open(self.CFG['FILES']['SERVER_OUTGOING_JSON']) as f: for i, line in enumerate(f): r = requests.post(self.API_POST_URL, json=json.loads(line, encoding='utf-8')) if r.status_code != 200: raise Exception logging.info('Line {}: {}'.format(i, r)) os.remove(self.CFG['FILES']['SERVER_OUTGOING_JSON']) logging.info("Done! Server data should be up to date.") r = requests.post(self.API_POST_URL, json=json_data) if r.status_code != 200: raise Exception logging.info("Done! Request: {}".format(r)) except: logging.error("Unable to reach the server.") logging.info("Response request: {}".format(r)) logging.info("Attempting to write current data on local file '{}'" .format(self.CFG['FILES']['SERVER_OUTGOING_JSON'])) with open(self.CFG['FILES']['SERVER_OUTGOING_JSON'], 'a') as f: f.write(json.dumps(json_data) + '\n') logging.info("Done! Data is saved an will be shipped as soon as " "the server is back on.") return def serial_read_sensors(self, port_index=None): ''' Sends the 'csv_nickname_list' string to the serial port of index 'port_index' (for self.SERIAL_PORTS) and returns the response line. csv_nickname_list example: str Example: 'DHT22_TEMP,DHT22_AH,BMP085_PRESSURE,LDR' ''' result_line, ser = None, None try: if isinstance(port_index, int): serial_port = self.SERIAL_PORTS[port_index] else: serial_port = self.SERIAL_PORTS[0] # if present, the board will be reseted ser = serial.Serial(serial_port, self._getCFG('ARDUINO', 'BAUD_RATE', expected_type=int), timeout = self.SERIAL_READ_TIMEOUT, xonxoff=True) logging.info(str(ser)) time.sleep(self.BOARD_RESET_TIMEOUT) while bool(result_line) is False: result = ser.write(bytes(self.SENSORS_CSV_LINE, 'utf8')) logging.info("sent: '{}' ({} bytes)".format( self.SENSORS_CSV_LINE, result)) time.sleep(self.BOARD_RESPONSE_DELAY) result_line = ser.readline() logging.info("read: {} ({} bytes)".format(result_line, len(result_line))) result_line = self._decode_bytes(result_line) if result_line is None: logging.error("Unable to decode line as ASCII.") logging.info("Trying a new reading..") continue ser.close() return result_line except KeyboardInterrupt: raise KeyboardInterrupt except: logging.error("Unable to open serial port '{}'" .format(serial_port)) finally: if ser: ser.close() logging.info("serial closed") return None def setup_logging(self): if self.verbose: # logging.basicConfig( # level=logging.DEBUG) root = logging.getLogger('') root.setLevel(logging.DEBUG) console = logging.StreamHandler() console.setFormatter(logging.Formatter( fmt='%(asctime)s : %(levelname)s : %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) root.addHandler(console) else: logging.basicConfig( level=logging.DEBUG, filename=self.EXECUTION_LOG_FILENAME, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%Y-%m-%d %H:%M:%S') def run(self): ''' Starts the logger main loop, which iterate over the procedures: 1. Read sensor data via serial port; 2. If successful, save data on ''' self.setup_logging() logging.info('='*40) logging.info('EXECUTION START') port_index = 0 try: while True: logging.info('='*40) logging.info('Attempting to read from serial') csv_result = self.serial_read_sensors(port_index) if csv_result is not None: logging.info("csv_result: '{}'".format(csv_result)) logging.info('-'*40) logging.info('Attempting create valid JSON data') json_data = self.create_json_data(csv_result) logging.info('-'*40) logging.info('Attempting to write local data log') self.write_data_log(json_data) logging.info('-'*40) logging.info('Attempting to send data to the server') self.send_data_to_server(json_data) else: if port_index < len(self.SERIAL_PORTS) - 1: port_index += 1 else: port_index = 0 logging.info("Trying another port in about {} seconds.." .format(self.FIND_PORT_TIMEOUT)) time.sleep(self.FIND_PORT_TIMEOUT) continue logging.info('-'*40) logging.info("Going to sleep now for {days} days, {hours} " "hours, {minutes} minutes and {seconds} seconds.." .format(**self.CFG['LOGGER']['READING_INTERVAL'])) time.sleep(self.READING_INTERVAL_SECONDS) except KeyboardInterrupt: logging.info('KeyboardInterrupt: EXECUTION FINISHED') pass if __name__ == '__main__': Meteorologger(verbose = False if '-s' in sys.argv else True).run()