Private GIT

Skip to content
Snippets Groups Projects
Select Git revision
  • e06f9faf95c24bcb852ef9bc35dc17f7d124dd6d
  • master default protected
  • fix_nzb_cat
  • develop
  • guessit2-minimal
  • ssl_warning
  • UHD-qualities
  • fix_providers8
  • !
  • tvvault
  • provider_alpharatio
  • v5.1.1
  • v5.1
  • v5.0.3
  • v5.0.2
  • v5.0.1
  • v5.0
  • v4.2.1.07
  • v4.2.1.06
  • v4.2.1.05
  • v4.2.1.04
  • v4.2.1.03
  • v4.2.1.02
  • v4.2.1.01
  • v4.2.1.0
  • v4.2.0.6
  • v4.2.0.5
  • v4.2.0.4
  • v4.2.0.3
  • v4.2.0.2
  • v4.2.0.1
31 results

properFinder.py

Blame
  • main.py 4.47 KiB
    import time
    import json
    import subprocess
    import os
    from influxdb import InfluxDBClient
    
    # InfluxDB Settings
    DB_ADDRESS = os.environ.get('INFLUX_DB_ADDRESS')
    DB_PORT = int(os.environ.get('INFLUX_DB_PORT'))
    DB_USER = os.environ.get('INFLUX_DB_USER')
    DB_PASSWORD = os.environ.get('INFLUX_DB_PASSWORD')
    DB_DATABASE = os.environ.get('INFLUX_DB_DATABASE')
    DB_TAGS = os.environ.get('INFLUX_DB_TAGS')
    
    # Speedtest Settings
    # Time between tests (in seconds).
    TEST_INTERVAL = int(os.environ.get('SPEEDTEST_INTERVAL'))
    # Time before retrying a failed Speedtest (in seconds).
    TEST_FAIL_INTERVAL = int(os.environ.get('SPEEDTEST_FAIL_INTERVAL'))
    
    influxdb_client = InfluxDBClient(
        DB_ADDRESS, DB_PORT, DB_USER, DB_PASSWORD, None)
    
    
    def init_db():
        databases = influxdb_client.get_list_database()
    
        if len(list(filter(lambda x: x['name'] == DB_DATABASE, databases))) == 0:
            influxdb_client.create_database(
                DB_DATABASE)  # Create if does not exist.
        else:
            # Switch to if does exist.
            influxdb_client.switch_database(DB_DATABASE)
    
    
    def pkt_loss(data):
        if 'packetLoss' in data.keys():
            return data['packetLoss']
        else:
            return 0
    
    
    def tag_selection(data):
        tags = DB_TAGS
        tag_switch = {
            'isp': data['isp'],
            'interface': data['interface']['name'],
            'internal_ip': data['interface']['internalIp'],
            'interface_mac': data['interface']['macAddr'],
            'vpn_enabled': (False if data['interface']['isVpn'] == 'false' else True),
            'external_ip': data['interface']['externalIp'],
            'server_id': data['server']['id'],
            'server_name': data['server']['name'],
            'server_location': data['server']['location'],
            'server_country': data['server']['country'],
            'server_host': data['server']['host'],
            'server_port': data['server']['port'],
            'server_ip': data['server']['ip'],
            'speedtest_id': data['result']['id'],
            'speedtest_url': data['result']['url']
        }
        options = {}
        if tags == '':
            return None
        else:
            tags = tags.split(',')
            for tag in tags:
                tag = tag.strip()
                options[tag] = tag_switch[tag]
            return options
    
    
    def format_for_influx(cliout):
        data = json.loads(cliout)
        # There is additional data in the speedtest-cli output but it is likely not necessary to store.
        influx_data = [
            {
                'measurement': 'ping',
                'time': data['timestamp'],
                'fields': {
                    'jitter': data['ping']['jitter'],
                    'latency': data['ping']['latency']
                }
            },
            {
                'measurement': 'download',
                'time': data['timestamp'],
                'fields': {
                    # Byte to Megabit
                    'bandwidth': data['download']['bandwidth'] / 125000,
                    'bytes': data['download']['bytes'],
                    'elapsed': data['download']['elapsed']
                }
            },
            {
                'measurement': 'upload',
                'time': data['timestamp'],
                'fields': {
                    # Byte to Megabit
                    'bandwidth': data['upload']['bandwidth'] / 125000,
                    'bytes': data['upload']['bytes'],
                    'elapsed': data['upload']['elapsed']
                }
            },
            {
                'measurement': 'packetLoss',
                'time': data['timestamp'],
                'fields': {
                    'packetLoss': pkt_loss(data)
                }
            }
        ]
    
        return influx_data
    
    
    def main():
        init_db()  # Setup the database if it does not already exist.
    
        while (1):  # Run a Speedtest and send the results to influxDB indefinitely.
            speedtest = subprocess.run(
                ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True)
    
            if speedtest.returncode == 0:  # Speedtest was successful.
                data = format_for_influx(speedtest.stdout)
                print("Speedtest Successful:")
                if influxdb_client.write_points(data) == True:
                    print("Data written to DB successfully")
                    time.sleep(TEST_INTERVAL)
            else:  # Speedtest failed.
                print("Speedtest Failed:")
                print(speedtest.stderr)
                print(speedtest.stdout)
                time.sleep(TEST_FAIL_INTERVAL)
    
    
    if __name__ == '__main__':
        print('Speedtest CLI Data Logger to InfluxDB')
        main()