#------------------------------------------------------------------------------- # Author: Nelso G. Jost (nelsojost@gmail.com) # License: GPL # Purpose: Get data from the board via serial and send it to the server. #------------------------------------------------------------------------------- from __future__ import print_function, division from datetime import datetime from pprint import pprint import logging import os import requests import subprocess import serial import sys import time import json from pykwalify.core import Core def make_current_file_path(filename): ''' Append filename to the current __file__ path. ''' return os.path.join(os.path.abspath(os.path.dirname(__file__)), filename) class RTCDateTime: RTC_DT_FMT = '%Y-%m-%d %H:%M:%S' __qualname__ = "RTCDateTime fmt='{}'".format(RTC_DT_FMT) def __init__(self, s): self.dt = datetime.strptime(s, self.RTC_DT_FMT) def __str__(self): return self.dt.strftime('%Y%m%d%H%M%S') class Meteorologger: ''' Provides a series of mechanisms to collect sensor data from the board via serial port, save locally on the machine and upload to the server. The functionality of this class relies heavily on the config file given by the SETTINGS_FILENAME attribute, which uses YAML syntax. Call the run() method to start the logging process. ''' SETTINGS_SCHEMA_FILENAME = make_current_file_path('settings_schema.yaml') SETTINGS_FILENAME = make_current_file_path('../../settings.yaml') DATALOG_DIR = 'data/' EXECUTION_LOG_PATH = 'logger/logs/' OUTGOING_BASENAME = 'outgoing.json' FILE_TIMESTAMP_FORMAT = '%Y-%m-%d-%H-%M-%S' SERIAL_CSV_SEP = ',' DATA_FORMATS = {'int': int, 'float': float, 'str': str, 'datetime': RTCDateTime} SERIAL_READ_TIMEOUT = 1.5 # seconds FIND_PORT_TIMEOUT = 10 # seconds BOARD_RESET_TIMEOUT = 2 # seconds BOARD_RESPONSE_DELAY = 3 # seconds verbose = True def __init__(self, verbose=False): self.verbose = verbose self.load_settings() def _decode_bytes(self, raw_bytes, encoding='ascii'): result = None try: result = raw_bytes.decode(encoding).strip() except: if self.verbose: print("Invalid bytes!") return result def load_settings(self): ''' Load the configuration file onto the self.CFG attribute. Some keys will be tested and filenames will be normalized. ''' c = Core(source_file=self.SETTINGS_FILENAME, schema_files=[self.SETTINGS_SCHEMA_FILENAME]) try: self.CFG = c.validate(raise_exception=True) except Exception as e: print('-'*60) print("There is something wrong with the configuration file '{}'" "\nKeep in mind that it uses YAML syntax which require " "proper identation.\n".format(self.SETTINGS_FILENAME)) print(e) print("\nPlease fix it up or regenerate it.") sys.exit(1) self.READING_INTERVAL_SECONDS =\ self.CFG['LOGGER']['INTERVAL']['seconds']\ + 60 * self.CFG['LOGGER']['INTERVAL']['minutes']\ + 3600 * self.CFG['LOGGER']['INTERVAL']['hours']\ + 86400 * self.CFG['LOGGER']['INTERVAL']['days'] self.DATALOG_CSV_SEP = bytes(self.CFG['DATALOG']['CSV_SEP'], 'utf8').decode('unicode_escape') def create_json(self, raw_line): ''' Given the raw serial line response (CSV string), builds and returns a JSON dict with validated, server-ready, sensor data. ''' d = {'datetime': {'format': self.CFG['DATALOG']['DATETIME_FORMAT']}, 'sensors': {}} rtc = self.CFG['LOGGER']['USE_RTC_DATETIME'] using_rtc = rtc and rtc in self.CFG['LOGGER']['SENSORS'] if using_rtc: d['datetime']['source'] = rtc rtc_datetime_fmt = self.CFG['LOGGER']['RTC_DATETIME_FORMAT'] for i, v in enumerate(raw_line.split(self.SERIAL_CSV_SEP)): nickname = self.CFG['LOGGER']['SENSORS'][i] type_name = self.CFG['SENSORS_AVAILABLE'][nickname]['data_format'] if type_name == 'datetime': if using_rtc and not 'not_found' in v.strip(): d['datetime']['value'] = datetime.strptime( v, rtc_datetime_fmt).strftime(d['datetime']['format']) continue try: v = self.DATA_FORMATS[type_name](v.strip()) except: logging.warning("[{}]: '{}' is not a valid {}" .format(nickname, v, type_name)) d['sensors'][nickname] = 'NaN' continue d['sensors'][nickname] = v if not 'value' in d['datetime']: d['datetime']['source'] = 'logger' d['datetime']['value'] = datetime.now().strftime( d['datetime']['format']) logging.info("Validated JSON: {}".format(d)) return d def write_datalog(self, json_data): ''' For backup purposes, write the given JSON data onto the file DATALOG_CSV as specficied on self.SETTINGS_FILENAME. ''' # convert raw str into normal escaped str (e.g., r'\\t' --> '\t') csv_line = json_data['datetime']['value'] + self.DATALOG_CSV_SEP for nickname in self.CFG['LOGGER']['SENSORS']: if nickname in json_data['sensors']: csv_line += str(json_data['sensors'][nickname]) csv_line += self.DATALOG_CSV_SEP csv_line = csv_line[:-1] try: datalog_filename = self.SESSION_DATALOG_FILENAME with open(datalog_filename, 'a') as f: f.write(csv_line + '\n') logging.info("Updated datalog file: '{}'".format(datalog_filename)) except Exception as e: logging.error("Unable to write datalog at '{}'" .format(datalog_filename)) logging.info("Exception: {}".format(e)) def send_to_server(self, json_data): r = None outgoing_filename = self.SESSION_OUTGOING_FILENAME URL = self.CFG['SERVER']['API_POST_URL'] try: if os.path.exists(outgoing_filename): logging.debug("Outgoing data exists! Will try to send it.") with open(outgoing_filename) as f: for i, line in enumerate(f): d = json.loads(line, encoding='utf-8') r = requests.post(URL, json=d) if r.status_code != 200: raise Exception logging.info('Line {}: {}'.format(i, r)) os.remove(outgoing_filename) logging.info("Removed file '{}'".format(outgoing_filename)) r = requests.post(URL, json=json_data) if r.status_code != 200: raise Exception logging.info("Request: {}".format(r)) except: logging.error("Unable to reach the server at '{}'. Request: {}" .format(URL, r)) try: with open(outgoing_filename, 'a') as f: f.write(json.dumps(json_data) + '\n') logging.info("Updated outgoing file '{}'" .format(outgoing_filename)) except Exception as e: logging.error("[DATALOST] Unable to write outgoing file '{}'". format(outgoing_filename)) logging.info("Exception: {}".format(e)) def serial_read(self, port_index=None): ''' Sends the 'csv_nickname_list' string to the serial port of index 'port_index' (self.CFG['ARDUINO']['SERIAL_PORTS']) and returns the response line. csv_nickname_list example: str Example: 'DHT22_TEMP,DHT22_AH,BMP085_PRESSURE,LDR' ''' result_line, ser = None, None read_command = self.SERIAL_CSV_SEP.join( ['read'] + self.CFG['LOGGER']['SENSORS']) try: if isinstance(port_index, int): serial_port = self.CFG['ARDUINO']['SERIAL_PORTS'][port_index] else: serial_port = self.SERIAL_PORTS[0] # if present, the board will be reseted ser = serial.Serial(serial_port, self.CFG['ARDUINO']['BAUD_RATE'], timeout=self.SERIAL_READ_TIMEOUT, xonxoff=True) logging.info("Serial open " .format(ser.port, ser.baudrate)) time.sleep(self.BOARD_RESET_TIMEOUT) ser.flush() while bool(result_line) is False: result = ser.write(bytes(read_command, 'utf8')) logging.info("sent: '{}' ({} bytes)".format( read_command, result)) time.sleep(self.BOARD_RESPONSE_DELAY) result_line = ser.readline() logging.info("read: {} ({} bytes)".format(result_line, len(result_line))) result_line = self._decode_bytes(result_line) if result_line is None: logging.error("Unable to decode line as ASCII.") logging.debug("Trying a new reading..") continue ser.close() return result_line except KeyboardInterrupt: raise KeyboardInterrupt except Exception as e: logging.error("Unable to open serial port '{}'" .format(serial_port)) logging.error(e) finally: if ser: ser.close() logging.debug("serial closed") return None def sync_rtc(self, port_index=None): if isinstance(port_index, int): serial_port = self.CFG['ARDUINO']['SERIAL_PORT'][port_index] else: serial_port = self.SERIAL_PORTS[0] # if present, the board will be reseted ser = serial.Serial(serial_port, self.CFG['ARDUINO']['BAUD_RATE'], timeout=self.SERIAL_READ_TIMEOUT, xonxoff=True) print("Serial open " .format(ser.port, ser.baudrate)) time.sleep(self.BOARD_RESET_TIMEOUT) ser.flush() command_ser = 'setrtc' + self.SERIAL_CSV_SEP + \ datetime.now().strftime('%Y-%m-%d-%H-%M-%S' .replace('-', self.SERIAL_CSV_SEP)) result = ser.write(bytes(command_ser, 'utf8')) print("sent: '{}' ({} bytes)".format(command_ser, result)) time.sleep(self.BOARD_RESPONSE_DELAY) result_line = ser.readline() print("read: {} ({} bytes)".format(result_line, len(result_line))) ser.close() def get_serial(self): port_index = 0 while True: serial_port = self.CFG['ARDUINO']['SERIAL_PORTS'][port_index] try: print("\n" + "="*60) print("Attempting serial connection at {}".format(serial_port)) ser = serial.Serial(port=serial_port, baudrate=self.CFG['ARDUINO']['BAUD_RATE'], timeout=self.SERIAL_READ_TIMEOUT, xonxoff=True) print("Done! Check object 'ser'") return ser except: print("Unable do establish the connection.") if port_index < len(self.CFG['ARDUINO']['SERIAL_PORTS'])-1: port_index += 1 else: port_index = 0 print("Trying another port in about {} seconds.." .format(self.FIND_PORT_TIMEOUT)) time.sleep(self.FIND_PORT_TIMEOUT) def setup_logging(self): logging.basicConfig( level=logging.INFO, filename=self.SESSION_EXECUTION_LOG_FILENAME, format='%(asctime)s : %(levelname)s : %(message)s', datefmt='%Y-%m-%d %H:%M:%S') if self.verbose: root = logging.getLogger('') root.setLevel(logging.INFO) console = logging.StreamHandler() console.setFormatter(logging.Formatter( fmt='%(asctime)s : %(levelname)s : %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) root.addHandler(console) def setup_files(self): session_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') try: os.system('mkdir -p {}'.format(self.DATALOG_DIR)) except Exception as e: print("Unable to create directory data/ to store datalog") print(e) return False self.SESSION_DATALOG_FILENAME = self.DATALOG_DIR \ + 'datalog-' + session_datetime + '.csv' self.SESSION_OUTGOING_FILENAME = self.DATALOG_DIR \ + 'outgoing/outgoing-' + session_datetime + '.json' self.SESSION_EXECUTION_LOG_FILENAME = self.EXECUTION_LOG_PATH \ + 'execution-' + session_datetime + '.log' try: with open(self.SESSION_DATALOG_FILENAME, 'w') as f: f.write(self.DATALOG_CSV_SEP.join( ['dt' + self.CFG['DATALOG']['DATETIME_FORMAT']] + [x for x in self.CFG['LOGGER']['SENSORS'] if x != 'RTC_DS1307']) + '\n') except Exception as e: print("Unable to write datalog file at data/") print(e) return False return True def run(self): ''' Starts the logger main loop, which keeps reading data from the serial port and trying to send it to the server. Basically, the loop consists of the following steps: 1. serial_read() # send a string, recieves a string 2. create_json() # validate data and make it server-ready 3. write_datalog() # write current data on local file for backup 4. send_to_server() # try to send; if fails, save data for later ''' if not self.setup_files(): return self.setup_logging() logging.info('EXECUTION START') port_index = 0 try: while True: logging.info('='*40) logging.debug('Attempting to serial connection') csv_result = self.serial_read(port_index) if csv_result is not None: logging.debug('-'*40) logging.debug('Attempting create valid JSON data') json_data = self.create_json(csv_result) logging.debug('-'*40) logging.debug('Attempting to write local data log') self.write_datalog(json_data) logging.debug('-'*40) logging.debug('Attempting to send data to the server') self.send_to_server(json_data) else: if port_index < len(self.CFG['ARDUINO']['SERIAL_PORTS'])-1: port_index += 1 else: port_index = 0 logging.info("Trying another port in about {} seconds.." .format(self.FIND_PORT_TIMEOUT)) time.sleep(self.FIND_PORT_TIMEOUT) continue logging.debug('-'*40) logging.info("Going to sleep now for {days} days, {hours} " "hours, {minutes} minutes and {seconds} seconds.." .format(**self.CFG['LOGGER']['INTERVAL'])) time.sleep(self.READING_INTERVAL_SECONDS) except KeyboardInterrupt: logging.info('KeyboardInterrupt: EXECUTION FINISHED') pass