import json from datetime import datetime, timedelta from collections import OrderedDict from signals import * import os """ Example transmission -------------------- { 'id': 1, 'begin_frequency': 438092000, 'end_frequency': 438108000, 'begin_date': '2023-03-25 16:44:28.798000', 'end_date': '2023-03-25 16:44:31.870000', 'sample_size': 2048, 'data_file': 'device_2/transmission/2023-03-25/16_44_28_438100000_uint8.bin', 'device_id': 2, 'data_type': 'uint8', 'modulation_id': 1 } Notes ----- Only relevant information is the difference between begin and end date as well as the begin date itself. """ SAMPLE_RATE = 16000 # Get mean time span between the different times def mean_time_span(timestamps): time_diffs = [] for i in range(len(timestamps)-1): time_diff = datetime.fromisoformat(timestamps[i+1]) - datetime.fromisoformat(timestamps[i]) time_diffs.append(time_diff.total_seconds()) mean_time_span = sum(time_diffs) / len(time_diffs) return timedelta(seconds=mean_time_span) # Calculate seconds of transmission def seconds_between(t1, t2): t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S.%f") t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S.%f") return abs((t2 - t1).seconds) # Main function def main(): with open("data/db.json", "r") as f: data = json.loads(f.read()) # Remove the identification transmission times = [(x["begin_date"], seconds_between(x["begin_date"], x["end_date"]), x["data_file"].replace("device_2/", "")) for x in data if seconds_between(x["begin_date"], x["end_date"]) > 3] # Sort by length signals = dict() for i in times: try: signals[i[1]]["signals"].append(i[0]) except KeyError: signals[i[1]] = {"file": i[2], "signals": [i[0]]} # Turn one of the transmissions of a specific length into an mp3, in order to audibly test if it's the malicious ones data = np.memmap(i[2], dtype=np.uint8, mode="r") decode(data[: 2048 * (os.path.getsize(i[2]) // 1024)].reshape(-1, 2048), "FM", i[2].replace("_uint8.bin",".mp3"), SAMPLE_RATE) # Order dict signals = dict(OrderedDict(sorted(signals.items(), reverse=True))) # We know the length of the malicious transmissions from observing the mp3s generated by the previous loop # so we could filter pretty accurately. transmission_lengths = {"avtomontior": 22, "vinovnici":24, "psuvni":21} new_signals = dict() total_sum = 0 # Filter the signals to leave only the ones of interest for i in transmission_lengths: if transmission_lengths[i] in signals: signal_times = signals[transmission_lengths[i]]["signals"] mean_time = mean_time_span(signal_times).total_seconds() / 60 # Create our entry in the dictionary new_signals[i] = {"average_gap": mean_time,"signals":signal_times} # Update the total sum total_sum += mean_time # Add the total average to the dict new_signals["total_average"] = total_sum/3 # Write dictionary to file with open('data/out.json', 'w', encoding='utf-8') as f: json.dump(new_signals, f, ensure_ascii=False, indent=4) if __name__ == "__main__": main()