-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfleet_simulator_west.py
208 lines (172 loc) · 7.55 KB
/
fleet_simulator_west.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import psycopg2
import random
import math
import time
from datetime import datetime, timedelta
from faker import Faker
fake = Faker()
# Function to establish database connection
def connect_to_database():
try:
connection = psycopg2.connect(
database="",
user="",
password="",
host="",
port="26257"
)
return connection
except Exception as e:
print("Error:", e)
return None
# Function to close database connection
def close_connection(connection):
if connection:
connection.close()
# Function to generate random latitude and longitude within the continental United States
def generate_latitude_longitude():
min_lat, max_lat = 24.396308, 49.384358 # Range for latitude (continental U.S.)
min_long, max_long = -125.0, -110.0 # Range for longitude (continental U.S.)
latitude = random.uniform(min_lat, max_lat)
longitude = random.uniform(min_long, max_long)
return latitude, longitude
# Function to calculate the distance between two points using Haversine formula
def calculate_distance(lat1, lon1, lat2, lon2):
R = 3958.8 # Earth radius in miles
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon / 2) * math.sin(dlon / 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c
return distance
# Function to simulate vehicle movement and update data in the database
def simulate_vehicle_movement(connection, cursor, vehicle, start_time, end_time):
current_time = start_time
last_lat = vehicle['Latitude']
last_long = vehicle['Longitude']
while current_time <= end_time:
# Generate new latitude and longitude
new_lat, new_long = generate_latitude_longitude()
# Calculate distance from last location
last_lat_float = float(last_lat)
last_long_float = float(last_long)
new_lat_float = float(new_lat)
new_long_float = float(new_long)
distance = calculate_distance(last_lat_float, last_long_float, new_lat_float, new_long_float)
# If distance exceeds 25 miles, limit movement to 25 miles
if distance > 25:
ratio = 25 / distance
new_lat = last_lat + (new_lat - last_lat) * ratio
new_long = last_long + (new_long - last_long) * ratio
# Limit movement to the continental United States
min_lat, max_lat = 24.396308, 49.384358 # Range for latitude (continental U.S.)
min_long, max_long = -125.0, -110.0 # Range for longitude (continental U.S.)
new_lat = max(min(new_lat, max_lat), min_lat)
new_long = max(min(new_long, max_long), min_long)
# Update last location
last_lat = new_lat
last_long = new_long
# Simulate diagnostic data
vehicle['Timestamp'] = current_time
vehicle['Latitude'] = new_lat
vehicle['Longitude'] = new_long
vehicle['EngineTemperature'] = random.uniform(50, 120)
vehicle['OilPressure'] = random.uniform(10, 80)
vehicle['FuelLevel'] = random.uniform(0, 100)
vehicle['TirePressure'] = random.uniform(25, 40)
vehicle['EngineRPM'] = random.randint(500, 4000)
vehicle['TransmissionFluidLevel'] = random.uniform(0, 100)
vehicle['BatteryVoltage'] = random.uniform(10, 16)
# Update database with simulated data
update_vehicle_data(connection, cursor, vehicle)
# Update LocationHistory table
update_location_history(connection, cursor, vehicle)
# Print updated data
print("Time:", current_time)
print("Vehicle ID:", vehicle['VehicleID'])
print("Location: Lat={}, Long={}".format(vehicle['Latitude'], vehicle['Longitude']))
print("Diagnostics:", vehicle['EngineTemperature'], vehicle['OilPressure'], vehicle['FuelLevel'], vehicle['TirePressure'],
vehicle['EngineRPM'], vehicle['TransmissionFluidLevel'], vehicle['BatteryVoltage'])
print()
# Sleep for a random interval to simulate time passing
time.sleep(random.uniform(0.75, 2))
# Increment current time
current_time += timedelta(minutes=random.randint(5, 30))
# Function to update vehicle data in the database
def update_vehicle_data(connection, cursor, vehicle):
try:
# Update Vehicle table
cursor.execute("""
UPDATE Vehicle
SET Latitude = %s, Longitude = %s
WHERE VehicleID = %s
""", (vehicle['Latitude'], vehicle['Longitude'], vehicle['VehicleID']))
# Insert into Diagnostic table
cursor.execute("""
INSERT INTO Diagnostic (VehicleID, Timestamp, EngineTemperature, OilPressure, FuelLevel, TirePressure,
EngineRPM, TransmissionFluidLevel, BatteryVoltage)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (vehicle['VehicleID'], vehicle['Timestamp'], vehicle['EngineTemperature'], vehicle['OilPressure'],
vehicle['FuelLevel'], vehicle['TirePressure'], vehicle['EngineRPM'],
vehicle['TransmissionFluidLevel'], vehicle['BatteryVoltage']))
# Commit the transaction
connection.commit()
except Exception as e:
print("Error:", e)
connection.rollback()
# Function to update LocationHistory table
def update_location_history(connection, cursor, vehicle):
try:
# Insert into LocationHistory table
cursor.execute("""
INSERT INTO LocationHistory (VehicleID, Timestamp, Latitude, Longitude)
VALUES (%s, %s, %s, %s)
""", (vehicle['VehicleID'], vehicle['Timestamp'], vehicle['Latitude'], vehicle['Longitude']))
# Commit the transaction
connection.commit()
except Exception as e:
print("Error:", e)
connection.rollback()
# Function to fetch vehicles from the database
def fetch_vehicles(connection, cursor):
try:
cursor.execute("SELECT * FROM Vehicle where FleetID = '9e3f2a0f-e8c6-4f04-acce-718975fce43d'")
vehicles = cursor.fetchall()
# Convert fetched data into list of dictionaries
vehicles_list = []
for vehicle_tuple in vehicles:
vehicle_dict = {
'VehicleID': vehicle_tuple[0],
'FleetID': vehicle_tuple[1],
'VIN': vehicle_tuple[2],
'Make': vehicle_tuple[3],
'Model': vehicle_tuple[4],
'Year': vehicle_tuple[5],
'LastServiceDate': vehicle_tuple[6],
'NextServiceDate': vehicle_tuple[7],
'LastMaintenanceTask': vehicle_tuple[8],
'LastMaintenanceDate': vehicle_tuple[9],
'CurrentOdometerReading': vehicle_tuple[10],
'Latitude': vehicle_tuple[11],
'Longitude': vehicle_tuple[12],
# Add other attributes as needed
}
vehicles_list.append(vehicle_dict)
return vehicles_list
except Exception as e:
print("Error:", e)
return []
# Start and end time for simulation
start_time = datetime(2024, 1, 5, 8, 0, 0)
end_time = datetime(2024, 1, 5, 17, 0, 0)
# Connect to the database
connection = connect_to_database()
cursor = connection.cursor()
# Fetch vehicles from the database
vehicles = fetch_vehicles(connection, cursor)
# Run simulation for each vehicle
for vehicle in vehicles:
simulate_vehicle_movement(connection, cursor, vehicle, start_time, end_time)
# Close database connection
close_connection(connection)