69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | def request_route(
url: str,
start: Location,
end: Location,
status_update_delay: int = 30,
num_requests: int = 10,
force_recalculation: bool = False,
mesh_id: int = None,
) -> str:
"""Requests a route from polarRouteServer, repeating the request for status until the route is available.
Args:
url (str): Base URL to send request to.
start (Location): Start location of route
end (Location): End location of route
status_update_delay (int, optional): Delay in seconds between each status request. Defaults to 10.
num_requests (int, optional): Max number of status requests before giving up. Defaults to 10.
force_recalculation (bool, optional): Force recalculation of an already existing route. Default: False.
Raises:
Exception: If no status URL is returned.
Returns:
str: JSON response of route request.
"""
# make route request
response_body, status = make_request(
"POST",
url,
"/api/route",
{"Content-Type": "application/json"},
json.dumps(
{
"start_lat": start.lat,
"start_lon": start.lon,
"end_lat": end.lat,
"end_lon": end.lon,
"start_name": start.name,
"end_name": end.name,
"force_recalculate": force_recalculation,
"mesh_id": mesh_id,
},
).encode("utf-8"),
)
print(pprint.pprint(response_body))
if not str(status).startswith("2"):
return None
# if route is returned
if response_body.get("json") is not None:
return response_body["json"]
# if no route returned, request status at status-url
status_url = response_body.get("status-url")
if status_url is None:
raise Exception("No status URL returned.")
id = response_body.get("id")
status_request_count = 0
while status_request_count <= num_requests:
status_request_count += 1
print(
f"\nWaiting for {status_update_delay} seconds before sending status request."
)
time.sleep(status_update_delay)
# make route request
print(f"Status request #{status_request_count} of {num_requests}")
response_body, status = make_request(
"GET",
url,
f"/api/route/{id}",
headers={"Content-Type": "application/json"},
)
print(f"Route calculation {response_body.get('status')}.")
print(pprint.pprint(response_body))
if response_body.get("status") == "PENDING":
continue
elif response_body.get("status") == "FAILURE":
return None
elif response_body.get("status") == "SUCCESS":
return response_body.get("json")
print(
f'Max number of requests sent. Quitting.\nTo send more status requests, run: "curl {url}/api/route/{id}"'
)
return None
|