Commit 8b4cdf43 authored by Nelso Jost's avatar Nelso Jost
Browse files

NEW: supervisor daemon deployment; better logging system

parent 7f14e322
.venv/ .venv/
*.pyc *.pyc
*.swp *.swp
datalog.csv *.csv
outgoing.json *.json
*.log
__pycache__ __pycache__
#-------------------------------------------------------------------------------
# Author: Nelso G. Jost (nelsojost@gmail.com)
# License: GPL
# Purpose: Get data from the board via serial and send it to the server.
#-------------------------------------------------------------------------------
from __future__ import print_function, division
from datetime import datetime
from pprint import pprint
import os
import requests
import serial
import time
import json
import yaml
def make_current_file_path(filename):
''' Append filename to the current __file__ path. '''
return os.path.join(os.path.abspath(os.path.dirname(__file__)), filename)
class Meteorologger:
'''
Provides a series of mechanisms to collect sensor data from the board
via serial port, save locally on the machine and upload to the server.
The functionality of this class relies heavily on the config file given
by the SETTINGS_FILENAME attribute, which uses YAML syntax.
Call the run() method to start the logging process.
'''
SETTINGS_FILENAME = make_current_file_path('settings.yaml')
CSV_SEP = ','
DATA_FORMATS = {'INTEGER': int, 'FLOAT': float, 'STRING': str}
SERIAL_READ_TIMEOUT = 1.5 # seconds
FIND_PORT_TIMEOUT = 2 # seconds
BOARD_RESET_TIMEOUT = 2 # seconds
def __init__(self):
self.loadSettings()
def _normalizePathFilename(self, key):
''' Appends __file__ basedir to config keys that don't have a basedir.
'''
if not os.path.dirname(self.CFG[key]):
self.CFG[key] = make_current_file_path(self.CFG[key])
def loadSettings(self):
'''
Load the configuration file onto the self.CFG attribute.
Some keys will be tested and filenames will be normalized.
'''
with open(self.SETTINGS_FILENAME) as f:
self.CFG = yaml.safe_load(f)
if not self.CFG['BASE_URL'].endswith('/'):
self.CFG['BASE_URL'] += '/'
self.URL = self.CFG['BASE_URL'] + 'api/post/rawsensordata/{}'\
.format(self.CFG['BOARD_ID'])
self.SENSORS_CSV_LINE = self.CSV_SEP.join([d['nickname'] for d in
self.CFG['SENSORS']])
# convert raw str into normal escaped str (e.g., r'\\t' --> '\t')
self.CFG['EXPORT_CSV_SEP'] = bytes(self.CFG['EXPORT_CSV_SEP'], 'utf8')\
.decode('unicode_escape')
self.SERIAL_PORTS = [p.strip() for p in self.CFG['SERIAL_PORT']
.split(',')]
self._normalizePathFilename('DATA_LOG_FILENAME')
self._normalizePathFilename('SERVER_OUTGOING_DATA_LOG_FILENAME')
def create_json_raw_sensor_data(self, raw_line):
'''
Given the raw serial line response (expected to be a CSV line), returns
a JSON dict with sensor data including the datetime field.
'''
raw_sensor_data = {'datetime': datetime.now().strftime("%Y%m%d%H%M%S"),
'sensors': {}}
for i, v in enumerate(raw_line.split(self.CSV_SEP)):
v = v.strip()
try:
v = self.DATA_FORMATS[self.CFG['SENSORS'][i]['data_format']](v)
except:
v = 'NaN'
raw_sensor_data['sensors'][self.CFG['SENSORS'][i]['nickname']] = v
print("\nJSON raw sensor data:")
pprint(raw_sensor_data)
return raw_sensor_data
def add_data_log(self, json_data):
'''
'''
csv_line = json_data['datetime'] + self.CFG['EXPORT_CSV_SEP']
for sensor in self.CFG['SENSORS']:
if sensor['nickname'] in json_data['sensors']:
csv_line += str(json_data['sensors']
[sensor['nickname']])
csv_line += self.CFG['EXPORT_CSV_SEP']
with open(self.CFG['DATA_LOG_FILENAME'], "a") as f:
f.write(csv_line[:-1] + '\n')
print("\nWrited data log onto\n {}".format(
self.CFG['DATA_LOG_FILENAME']))
def send_data_to_server(self, json_data):
print("\nSending data to the server now..", end='')
r = None
try:
if os.path.exists(self.CFG['SERVER_OUTGOING_DATA_LOG_FILENAME']):
print("\nTrying to send outgoing data first...")
with open(self.CFG['SERVER_OUTGOING_DATA_LOG_FILENAME']) as f:
for i, line in enumerate(f):
r = requests.post(self.URL, json=json.loads(line))
if r.status_code != 200:
raise Exception
print('Line {} :'.format(i), r)
os.remove(self.CFG['SERVER_OUTGOING_DATA_LOG_FILENAME'])
print("\ndone! Server data is up to date!")
r = requests.post(self.URL, json=json_data)
if r.status_code != 200:
raise Exception
except:
print("\nUnable to reach the server at\n {}".format(self.URL))
print("Request:", r)
with open(self.CFG['SERVER_OUTGOING_DATA_LOG_FILENAME'], 'a') as f:
f.write(json.dumps(json_data))
f.write('\n')
print("\nAdded to\n {}".format(
self.CFG['SERVER_OUTGOING_DATA_LOG_FILENAME']))
return
print(" done.\n ", r)
def _decode_bytes(self, raw_bytes):
result = None
try:
result = raw_bytes.decode('ascii').strip()
except:
print("Invalid bytes!")
return result
def serial_read_sensors(self, csv_nickname_list, serial_port=None):
result_line, ser = None, None
try:
if serial_port is None:
serial_port = self.SERIAL_PORTS[0]
elif isinstance(serial_port, int):
serial_port = self.SERIAL_PORTS[serial_port]
# if present, the board will be reseted
ser = serial.Serial(serial_port,
self.CFG['BAUD_RATE'],
timeout = self.SERIAL_READ_TIMEOUT,
xonxoff=True)
print(ser)
time.sleep(self.BOARD_RESET_TIMEOUT)
while bool(result_line) is False:
result = ser.write(bytes(csv_nickname_list, 'utf8'))
print("sent '{}':".format(csv_nickname_list), result)
result_line = ser.readline()
print("read: ", result_line)
result_line = self._decode_bytes(result_line)
if result_line is None:
continue
ser.close()
return result_line
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
print("Unable to open serial port '{}'".format(serial_port))
return None
finally:
if ser:
ser.close()
def run(self):
'''
Starts the logger main loop, which iterate over the procedures:
1. Read sensor data via serial port;
2. If successful, save data on
'''
serial_port = 0
try:
while True:
print('\n-----------------------------------------------------')
csv_result = self.serial_read_sensors(self.SENSORS_CSV_LINE,
serial_port=serial_port)
print("csv_result: '{}'".format(csv_result))
if csv_result is not None:
json_raw_data = self.create_json_raw_sensor_data(csv_result)
self.add_data_log(json_raw_data)
self.send_data_to_server(json_raw_data)
else:
if serial_port < len(self.SERIAL_PORTS) - 1:
serial_port += 1
else:
serial_port = 0
print('Trying another port..')
time.sleep(self.FIND_PORT_TIMEOUT)
continue
print('\nGoing to sleep now for {} seconds..'.format(
self.CFG['READING_INTERVAL_SECONDS']))
time.sleep(self.CFG['READING_INTERVAL_SECONDS'])
except KeyboardInterrupt:
pass
if __name__ == '__main__':
Meteorologger().run()
...@@ -3,14 +3,18 @@ VENV := .venv ...@@ -3,14 +3,18 @@ VENV := .venv
all: all:
@ echo "USAGE:" @ echo "USAGE:"
@ echo " make setup -- Create a Python 3 virtual environment (only need once)" @ echo " make deb-install -- Attempt to install required system wide"
@ echo " make log -- Launch the logger (using Python 3 from make setup)" @ echo " Debian packages via apt"
@ echo " make clean -- remove all the generated files" @ echo ""
@ echo " make setup -- Create a local Python virtual environment"
@ echo " that will hold requirements.pip modules"
@ echo ""
@ echo " make run -- Executes the logger "
setup: install-deb venv setup: install-deb venv
install-deb: deb-install:
sudo apt-get install python3 python3-pip sudo apt-get install python3 python3-pip supervisor
sudo pip3 install virtualenv sudo pip3 install virtualenv
venv: clean-venv venv: clean-venv
...@@ -30,12 +34,8 @@ venv: clean-venv ...@@ -30,12 +34,8 @@ venv: clean-venv
clean-venv: clean-venv:
rm -rf ${VENV} rm -rf ${VENV}
log: run:
@ ${VENV}/bin/python logger.py ${VENV}/bin/python3 logger.py -v
testserial: deploy:
@ ${VENV}/bin/ipython test_serial.py ${VENV}/bin/python3 deploy.py
clean-log:
rm -rf datalog.csv
rm -rf outgoing.json
import jinja2
import os
import subprocess
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
SUPERVISOR_CONFIG_FILENAME = '/etc/supervisor/conf.d/meteorologger.conf'
def deploy_supervisor():
with open('supervisor.conf') as f_temp:
template = jinja2.Template(f_temp.read())
config_file_str = template.render(base_dir=BASE_DIR)
print('\nRegistering supervisor config at \n {}'
.format(SUPERVISOR_CONFIG_FILENAME))
print('='*60)
print(config_file_str)
print('='*60)
with open(SUPERVISOR_CONFIG_FILENAME, 'w') as f:
f.write(config_file_str + '\n')
print('\nRestarting supervisor..')
proc = subprocess.Popen('supervisorctl update', shell=True)
print("PID: ", end='')
proc = subprocess.Popen('supervisorctl pid meteorologger', shell=True,
stdout=subprocess.PIPE)
proc.wait()
print(proc.stdout.read().decode('ascii').strip(), end='')
print(' [meteorologger is running]')
print('\nYou can manage it with supervisorctl tool.')
if __name__ == '__main__':
deploy_supervisor()
#-------------------------------------------------------------------------------
# Author: Nelso G. Jost (nelsojost@gmail.com)
# License: GPL
# Purpose: Get data from the board via serial and send it to the server.
#-------------------------------------------------------------------------------
from __future__ import print_function, division
from datetime import datetime
from pprint import pprint
import logging
import os
import requests
import subprocess
import serial
import sys
import time
import json
import yaml
import jinja2
SETTINGS_YAML_SCHEMA =\
"""
SERVER:
API_POST_URL: str
LOGGER:
READING_INTERVAL: {seconds: int, minutes: int, hours: int, days: int}
DATETIME_FORMAT: str
SENSORS:
- {nickname: str, data_format: str}
ARDUINO:
SERIAL_PORT: str
BAUD_RATE: int
FILES:
DATALOG_CSV: str
DATALOG_CSV_SEP: str
SERVER_OUTGOING_JSON: str
"""
def make_current_file_path(filename):
''' Append filename to the current __file__ path. '''
return os.path.join(os.path.abspath(os.path.dirname(__file__)), filename)
class Meteorologger:
'''
Provides a series of mechanisms to collect sensor data from the board
via serial port, save locally on the machine and upload to the server.
The functionality of this class relies heavily on the config file given
by the SETTINGS_FILENAME attribute, which uses YAML syntax.
Call the run() method to start the logging process.
'''
EXECUTION_LOG_FILENAME = 'logs/execution.log'
SETTINGS_FILENAME = 'settings.yaml'
CSV_SEP = ','
DATA_FORMATS = {'int': int, 'float': float, 'str': str}
SERIAL_READ_TIMEOUT = 1.5 # seconds
FIND_PORT_TIMEOUT = 10 # seconds
BOARD_RESET_TIMEOUT = 2 # seconds
verbose = True
def __init__(self, verbose=False):
self.verbose = verbose
self.load_settings()
def _decode_bytes(self, raw_bytes, encoding='ascii'):
result = None
try:
result = raw_bytes.decode(encoding).strip()
except:
if self.verbose:
print("Invalid bytes!")
return result
def _getCFG(self, *args, expected_type=None):
r = self.CFG
for key in args:
try:
r = r[key]
except:
raise Exception("Configuration file '{}' is missing the "
"required key\n {}"
.format(self.SETTINGS_FILENAME,
': '.join(args)))
if expected_type is not None:
try:
return expected_type(r)
except:
raise TypeError("Expected type {} on key {}".format(
expected_type, ':'.join(args)))
return r
def load_settings(self):
'''
Load the configuration file onto the self.CFG attribute.
Some keys will be tested and filenames will be normalized.
'''
with open(self.SETTINGS_FILENAME) as f:
self.CFG = yaml.safe_load(f)
self.API_POST_URL = self._getCFG('SERVER', 'API_POST_URL',
expected_type=str)
self.SENSORS_CSV_LINE = self.CSV_SEP.join(
[d['nickname'] for d in self._getCFG('SENSORS',
expected_type=list)])
self.DATALOG_CSV_FILENAME = self._getCFG(
'FILES', 'DATALOG_CSV', expected_type=str)
self.SERVER_OUTGOING_FILENAME = self._getCFG(
'FILES', 'SERVER_OUTGOING_JSON', expected_type=str)
# convert raw str into normal escaped str (e.g., r'\\t' --> '\t')
self.DATALOG_CSV_SEP = bytes(
self._getCFG('FILES', 'DATALOG_CSV_SEP', expected_type=str),
'utf8').decode('unicode_escape')
self.SERIAL_PORTS = [p.strip() for p in
self._getCFG('ARDUINO', 'SERIAL_PORT',
expected_type=str).split(',')]
self.READING_INTERVAL_SECONDS =\
self._getCFG('LOGGER', 'READING_INTERVAL', 'seconds',
expected_type=int) + \
60 * self._getCFG('LOGGER', 'READING_INTERVAL', 'minutes',
expected_type=int) + \
3600 * self._getCFG('LOGGER', 'READING_INTERVAL', 'hours',
expected_type=int) + \
86400 * self._getCFG('LOGGER', 'READING_INTERVAL', 'days',
expected_type=int)
self.DATETIME_FORMAT = self._getCFG('LOGGER', 'DATETIME_FORMAT',
expected_type=str)
def create_json_data(self, raw_line):
'''
Given the raw serial line response (expected to be a CSV line), returns
a JSON dict with sensor data including the datetime field.
'''
raw_sensor_data = {
'datetime': datetime.now().strftime(self.DATETIME_FORMAT),
'sensors': {}}
for i, v in enumerate(raw_line.split(self.CSV_SEP)):
v = v.strip()
type_ = self.DATA_FORMATS[self.CFG['SENSORS'][i]['data_format']]
nickname = self.CFG['SENSORS'][i]['nickname']
try:
v = type_(v)
except:
logging.error("Cannot convert value '{}' read from {} to {}"
.format(v, nickname, type_))
continue
raw_sensor_data['sensors'][nickname] = v
logging.info("Resulting JSON: {}".format(raw_sensor_data))
return raw_sensor_data
def write_data_log(self, json_data):
'''
For backup purposes, write the given JSON data onto the file
DATALOG_CSV as specficied on self.SETTINGS_FILENAME.
'''
csv_line = json_data['datetime'] + self.DATALOG_CSV_SEP
for sensor in self.CFG['SENSORS']:
if sensor['nickname'] in json_data['sensors']:
csv_line += str(json_data['sensors']
[sensor['nickname']])
csv_line += self.DATALOG_CSV_SEP
csv_line = csv_line[:-1]
try:
with open(self.CFG['FILES']['DATALOG_CSV'], 'a') as f:
f.write(csv_line + '\n')
logging.info("Done! CSV line '{}' was appended to the file '{}'"
.format(csv_line, self.CFG['FILES']['DATALOG_CSV']))
except:
logging.error("Unable to write data log at '{}'"
.format(self.CFG['FILES']['DATALOG_CSV']))
def send_data_to_server(self, json_data):
logging.info("URL: {}".format(self.API_POST_URL))
r = None
try:
if os.path.exists(self.CFG['FILES']['SERVER_OUTGOING_JSON']):
logging.info("Outgoing data exists! Trying to send it first..")
with open(self.CFG['FILES']['SERVER_OUTGOING_JSON']) as f:
for i, line in enumerate(f):
r = requests.post(self.API_POST_URL,
json=json.loads(line,
encoding='utf-8'))
if r.status_code != 200:
raise Exception
logging.info('Line {}: {}'.format(i, r))
os.remove(self.CFG['FILES']['SERVER_OUTGOING_JSON'])
logging.info("Done! Server data should be up to date.")
r = requests.post(self.API_POST_URL, json=json_data)
if r.status_code != 200:
raise Exception
logging.info("Done! Request: {}".format(r))
except:
logging.error("Unable to reach the server.")
logging.info("Response request: {}".format(r))
logging.info("Attempting to write current data on local file '{}'"
.format(self.CFG['FILES']['SERVER_OUTGOING_JSON']))
with open(self.CFG['FILES']['SERVER_OUTGOING_JSON'], 'a') as f:
f.write(json.dumps(json_data) + '\n')
logging.info("Done! Data is saved an will be shipped as soon as "
"the server is back on.")
return
def serial_read_sensors(self, port_index=None):
'''
Sends the 'csv_nickname_list' string to the serial port of index
'port_index' (for self.SERIAL_PORTS) and returns the response line.
csv_nickname_list example: str
Example: 'DHT22_TEMP,DHT22_AH,BMP085_PRESSURE,LDR'
'''
result_line, ser = None, None
try:
if isinstance(port_index, int):
serial_port = self.SERIAL_PORTS[port_index]
else:
serial_port = self.SERIAL_PORTS[0]
# if present, the board will be reseted
ser = serial.Serial(serial_port,
self._getCFG('ARDUINO', 'BAUD_RATE',
expected_type=int),
timeout = self.SERIAL_READ_TIMEOUT,
xonxoff=True)
logging.info(str(ser))
time.sleep(self.BOARD_RESET_TIMEOUT)
while bool(result_line) is False:
result = ser.write(bytes(self.SENSORS_CSV_LINE, 'utf8'))
logging.info("sent: '{}' ({} bytes)".format(
self.SENSORS_CSV_LINE, result))
result_line = ser.readline()
logging.info("read: {} ({} bytes)".format(result_line,
len(result_line)))
result_line = self._decode_bytes(result_line)
if result_line is None:
logging.error("Unable to decode line as ASCII.")
logging.info("Trying a new reading..")
continue
ser.close()
return result_line
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logging.error("Unable to open serial port '{}'"
.format(serial_port))
finally:
if ser:
ser.close()
logging.info("serial closed")
return None
def setup_logging(self):
logging.basicConfig(
level=logging.DEBUG,
filename=self.EXECUTION_LOG_FILENAME,
format='%(asctime)s : %(levelname)s : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
if self.verbose:
root = logging.getLogger('')
console = logging.StreamHandler()
# console.setFormatter(logging.Formatter(
# fmt='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.addHandler(console)
def run(self):
'''
Starts the logger main loop, which iterate over the procedures:
1. Read sensor data via serial port;
2. If successful, save data on
<