import configparser import sys import os from pprint import pprint DEFAULT_INI=\ """\ [server] ; base URL of the server URL = http://dados.cta.if.ufrgs.br/emm ; user board authentication token (auto generated when a new board is created) BOARD_HASH = [reading] ; CSV list of sensor names/nicknames (order reflect columns on datalog files) SENSORS = DHT22_TEMP, DHT22_AH, BMP085_PRESSURE, LDR ; time between logger cycles, in minutes SLEEP_MINUTES = 5 ; true for read board clock; if fail, or false, system's time will be used RTC_DS1307 = true [datalog] ; CSV delimiter for local log files (valid options: , or ; or \t ) CSV_SEP = \t ; format of the datetime column ; %Y : years | %m : months | %d : days | %H : hours | %M : mins | %S : secs DATETIME_FORMAT = %Y-%m-%d-%H-%M-%S [arduino] ; CSV list of ports to attempt connection or blank for automatic search on ; /dev/ttyACM* and /dev/ttyUSB* (where * varies from 0 to 4) SERIAL_PORT = """ DEFAULT_INI_LINES = DEFAULT_INI.splitlines() def make_path_here(filename): ''' Append filename to the current __file__ path. ''' return os.path.realpath(os.path.join( os.path.abspath(os.path.dirname(__file__)), filename)) class Config: SETTINGS_FILENAME = make_path_here('../../settings.ini') DATALOG_PATH = make_path_here('../../data/') EXECUTION_LOG_PATH = make_path_here('../logs/') OUTGOING_FILENAME = make_path_here('../../data/outgoing.json') URL_API_POST_RAWSENSORDATA = '{base}api/post/rawsensordata' SERIAL_CSV_SEP = ',' RTC_NAME = 'RTC_DS1307' RTC_FORMAT = '%Y-%m-%d %H:%M:%S' class ConfigMissingSectionError(Exception): def __init__(self, section, parent): self.message = "\nConfigurationError: At file\n {filename}\n\n"\ "[{section}]\n ^\n"\ "Missing section!".format( filename=parent.SETTINGS_FILENAME, section=section) class ConfigMissingKeyError(Exception): def __init__(self, section, key, parent): self.message = "\nConfigurationError: At file\n {filename}\n\n"\ "[{section}]\n{comment}\n{key} = \n{indicator}\n"\ "Missing key!".format( filename=parent.SETTINGS_FILENAME, comment=parent.extract_comment(key), section=section, key=key.upper(), indicator=' '*len(key + ' = ') + '^') class ConfigValueError(Exception): def __init__(self, section, key, message, parent): self.message = "\nConfigurationError: At file\n {filename}\n\n"\ "[{section}]\n{comment}\n{key} = {value}\n"\ "{indicator}\n{msg}".format( filename=parent.SETTINGS_FILENAME, section=section, key=key, msg=message, comment=parent.extract_comment(key), value=parent[section][key.lower()], indicator=' '*len(key + ' = ') + '^') def __init__(self): self.default = configparser.ConfigParser() self.default.read_string(DEFAULT_INI) self.load_settings() def __getitem__(self, key): return self._parser._sections[key] def __contains__(self, key): return key in self._parser._sections def extract_comment(self, key): key, comment_lines = key.upper() + ' =', [] for i, line in enumerate(DEFAULT_INI_LINES): if key in line: j = i - 1 while j >= 0: if not DEFAULT_INI_LINES[j].startswith(';'): return '\n'.join(DEFAULT_INI_LINES[k] for k in range(j+1, i)) j -= 1 def assert_config_keys(self): for section in self.default: if section == 'DEFAULT': continue if section in self: for key in self.default[section]: if not key in self[section]: raise self.ConfigMissingKeyError(section, key, self) else: raise self.ConfigMissingSectionError(section, self) def ask_restore_settings_file(self): a = input("\nRestore default file now and overwrite all" "current values? [y/N] ") if 'y' in a.lowercase(): try: with open(self.SETTINGS_FILENAME, 'w') as f: f.write(DEFAULT_SETTINGS) print("Done! Restart the application!") sys.exit(0) except: print("Unable to write on file\n {}\n\nTry to run this " "program from another location!".format( self.SETTINGS_FILENAME)) sys.exit(1) def load_settings(self): ''' Load the INI configuration file onto the self._parser attribute. ''' self._parser = configparser.ConfigParser() try: self._parser.read(self.SETTINGS_FILENAME) except: print("Unable to open configuration file at\n {}".format( self.SETTINGS_FILENAME)) return self.assert_config_keys() self.validate_server() self.validate_reading() self.validate_datalog() self.validate_arduino() def validate_server(self): self['server']['api_post_url'] = self.URL_API_POST_RAWSENSORDATA\ .format(base=self['server']['url'] + ('' if self['server']['url'].endswith('/') else '/')) def validate_reading(self): self['reading']['sensors'] = [x.strip() for x in self['reading']['sensors'].split(',') if x.strip() != ''] if len(self['reading']['sensors']) == 0: raise self.ConfigValueError('reading', 'SENSORS', "EmptyListError: At least one sensor must be present!", self) if 'true' in self['reading']['rtc_ds1307'].lower(): self['reading']['sensors'].append(self.RTC_NAME) self['reading']['command'] = 'read,' + ','.join( self['reading']['sensors']) try: self['reading']['sleep_minutes'] =\ float(self['reading']['sleep_minutes'].replace(',', '.')) except: raise self.ConfigValueError('reading', 'SLEEP_MINUTES', 'TypeError: Number expected!', self) def validate_datalog(self): value = self['datalog']['csv_sep'] self['datalog']['csv_sep'] = bytes(value, 'utf8').decode( 'unicode_escape') if not value in (',', ';', r'\t'): raise self.ConfigValueError('datalog', 'CSV_SEP', "InvalidCharacter: Supported values are\n" "',' ';' '\\t'\n", self) def validate_arduino(self): self['arduino']['serial_port'] = [x.strip() for x in self['arduino']['serial_port'].split(',') if x.strip() != ''] if len(self['arduino']['serial_port']) == 0: self['arduino']['serial_port'] = [] for i in range(5): self['arduino']['serial_port'].append('/dev/ttyACM{}'.format(i)) self['arduino']['serial_port'].append('/dev/ttyUSB{}'.format(i))