import configparser import sys import os from pprint import pprint DEFAULT_INI=\ """\ [server] ; base URL of the server URL = http://dados.cta.if.ufrgs.br/emm ; board identification number (auto generated when a new board is created) BOARD_ID = ; authentication token for the board's user USER_HASH = [reading] ; CSV list of sensor names/nicknames (order reflect columns on datalog files) SENSORS = DHT22_TEMP, DHT22_AH, BMP085_PRESSURE, LDR ; time between logger cycles -- format: hours:minutes:seconds INTERVAL = 0:5:0 ; true for read board clock; if fail, or false, system's time will be used RTC_DS1307 = true [datalog] ; CSV delimiter for local log files (sugestions: ',' or ';' or '\t') CSV_SEP = '\t' ; format of the datetime column (see Python docs on datetime module) DATETIME_FORMAT = %Y-%m-%d-%H-%M-%S [arduino] ; CSV list of ports to attempt or blank for automatic search on ; /dev/ttyACM* and /dev/ttyUSB* (where * varies from 0 to 10) SERIAL_PORT = """ def make_path_here(filename): ''' Append filename to the current __file__ path. ''' return os.path.join(os.path.abspath(os.path.dirname(__file__)), filename) class RTCDateTime: SENSOR_NAME = 'RTC_DS1307' READ_TIMESTAMP = '%Y-%m-%d %H:%M:%S' __qualname__ = "RTCDateTime fmt='{}'".format(READ_TIMESTAMP) def __init__(self, s): self.dt = datetime.strptime(s, self.RTC_DT_FMT) def __str__(self): return self.dt.strftime('%Y%m%d%H%M%S') class Config: SETTINGS_FILENAME = make_path_here('../../settings.ini') DATALOG_PATH = make_path_here('../../data/') EXECUTION_LOG_PATH = make_path_here('../logs/') OUTGOING_FILENAME = make_path_here('../../data/outgoing.json') URL_API_POST_RAWSENSORDATA = '{base}api/post/rawsensordata/{bid}' SERIAL_CSV_SEP = ',' def __init__(self): super().__init__() self.default = configparser.ConfigParser() self.default.read_string(DEFAULT_INI) try: self.load_settings() except Exception as e: print('\nConfigurationError:', e) def __getitem__(self, key): return self._parser._sections[key] def ask_restore_settings_file(self): a = input("\nRestore default file now and overwrite all" "current values? [y/N] ") if 'y' in a.lowercase(): try: with open(self.SETTINGS_FILENAME, 'w') as f: f.write(DEFAULT_SETTINGS) print("Done! Restart the application!") sys.exit(0) except: print("Unable to write on file\n {}\n\nTry to run this " "program from another location!".format( self.SETTINGS_FILENAME)) sys.exit(1) def load_settings(self): ''' Load the INI configuration file onto the self._parser attribute. ''' self._parser = configparser.ConfigParser() try: self._parser.read(self.SETTINGS_FILENAME) except: print("Unable to open configuration file at\n {}".format( self.SETTINGS_FILENAME)) pprint(self._parser._sections) self.validate_server_url() self.validate_reading_sensors() self.validate_reading_interval() self.validate_datalog_csv_sep() self.validate_arduino_serial_port() pprint(self._parser._sections) def validate_server_url(self): try: bid = int(self['server']['board_id']) except: raise Exception( "Invalid numeric value for the server/BOARD_ID key.\n" "Expected to be of type integer.\n" "Was given:\n {}".format(self['server']['board_id'])) self['server']['api_post_url'] = self.URL_API_POST_RAWSENSORDATA\ .format(base=self['server']['url'] + ('' if self['server']['url'].endswith('/') else '/'), bid=bid) def validate_reading_sensors(self): self['reading']['sensors'] = [x.strip() for x in self['reading']['sensors'].split(',') if x.strip() != ''] if len(self['reading']['sensors']) == 0: raise Exception( "At least one sensor must be present on the reading/" "SENSORS key.") if 'true' in self['reading']['rtc_ds1307'].lower(): self['reading']['sensors'].append(RTCDateTime.SENSOR_NAME) self['reading']['command'] = 'readSensors,' + ','.join( self['reading']['sensors']) def validate_reading_interval(self): try: numbers = self['reading']['interval'].split(':') if len(numbers) != 3: raise Exception self['reading']['interval'] = {k: int(v) for k, v in zip(['H', 'M', 'S'], numbers)} self['reading']['interval_seconds'] =\ 3600 * self['reading']['interval']['H'] + \ 60 * self['reading']['interval']['M'] + \ self['reading']['interval']['S'] except: raise Exception( "Invalid time value for the reading/INTERVAL key.\n" "Expected format:\n" " hours:minutes:seconds (integer numbers)\n" "Was given:\n {}".format(self['reading']['interval'])) def validate_datalog_csv_sep(self): value = self['datalog']['csv_sep'] self['datalog']['csv_sep'] = bytes(value, 'utf8').decode( 'unicode_escape') if not value in (',', ';', r'\t'): raise Exception( "Invalid character value for the datalog/CSV_SEP key.\n" "Supported values:\n ',' ';' '\\t'\n" "Was given:\n {}".format(value)) def validate_arduino_serial_port(self): self['arduino']['serial_port'] = [x.strip() for x in self['arduino']['serial_port'].split(',') if x.strip() != ''] if len(self['arduino']['serial_port']) == 0: self['arduino']['serial_port'] = [] for i in range(5): self['arduino']['serial_port'].append('/dev/ttyACM{}'.format(i)) self['arduino']['serial_port'].append('/dev/ttyUSB{}'.format(i))