avtomonitor/main.py
2023-04-05 22:19:53 +02:00

94 lines
3.0 KiB
Python

import json
from datetime import datetime, timedelta
from collections import OrderedDict
from signals import *
import os
"""
Example transmission
--------------------
{
'id': 1,
'begin_frequency': 438092000,
'end_frequency': 438108000,
'begin_date': '2023-03-25 16:44:28.798000',
'end_date': '2023-03-25 16:44:31.870000',
'sample_size': 2048,
'data_file': 'device_2/transmission/2023-03-25/16_44_28_438100000_uint8.bin',
'device_id': 2,
'data_type': 'uint8',
'modulation_id': 1
}
Notes
-----
Only relevant information is the difference between begin and end date as well as the begin date itself.
"""
SAMPLE_RATE = 16000
# Get mean time span between the different times
def mean_time_span(timestamps):
time_diffs = []
for i in range(len(timestamps)-1):
time_diff = datetime.fromisoformat(timestamps[i+1]) - datetime.fromisoformat(timestamps[i])
time_diffs.append(time_diff.total_seconds())
mean_time_span = sum(time_diffs) / len(time_diffs)
return timedelta(seconds=mean_time_span)
# Calculate seconds of transmission
def seconds_between(t1, t2):
t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S.%f")
t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S.%f")
return abs((t2 - t1).seconds)
# Main function
def main():
with open("data/db.json", "r") as f:
data = json.loads(f.read())
# Remove the identification transmission
times = [(x["begin_date"], seconds_between(x["begin_date"], x["end_date"]), x["data_file"].replace("device_2/", "")) for x in data if seconds_between(x["begin_date"], x["end_date"]) > 3]
# Sort by length
signals = dict()
for i in times:
try:
signals[i[1]]["signals"].append(i[0])
except KeyError:
signals[i[1]] = {"file": i[2], "signals": [i[0]]}
# Turn one of the transmissions of a specific length into an mp3, in order to audibly test if it's the malicious ones
data = np.memmap(i[2], dtype=np.uint8, mode="r")
decode(data[: 2048 * (os.path.getsize(i[2]) // 1024)].reshape(-1, 2048), "FM", i[2].replace("_uint8.bin",".mp3"), SAMPLE_RATE)
# Order dict
signals = dict(OrderedDict(sorted(signals.items(), reverse=True)))
# We know the length of the malicious transmissions from observing the mp3s generated by the previous loop
# so we could filter pretty accurately.
transmission_lengths = {"avtomontior": 22, "vinovnici":24, "psuvni":21}
new_signals = dict()
total_sum = 0
# Filter the signals to leave only the ones of interest
for i in transmission_lengths:
if transmission_lengths[i] in signals:
signal_times = signals[transmission_lengths[i]]["signals"]
mean_time = mean_time_span(signal_times).total_seconds() / 60
# Create our entry in the dictionary
new_signals[i] = {"average_gap": mean_time,"signals":signal_times}
# Update the total sum
total_sum += mean_time
# Add the total average to the dict
new_signals["total_average"] = total_sum/3
# Write dictionary to file
with open('data/out.json', 'w', encoding='utf-8') as f:
json.dump(new_signals, f, ensure_ascii=False, indent=4)
if __name__ == "__main__":
main()