-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathedge_main.py
More file actions
executable file
·106 lines (91 loc) · 4.1 KB
/
edge_main.py
File metadata and controls
executable file
·106 lines (91 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import sys
import time
import argparse
import queue
import threading
import edge_globals
import numpy as np
from loguru import logger
from local.sys_info import SysInfo
from tools.read_config import read_config
from local.local_store import DataStore
from local.video_reader import VideoReader
from local.decision_engine import DecisionEngine
from model_manager.model_cache import load_models
from config.model_info import edge_object_detection_model
from edge_worker import local_worker, offload_worker, Task, id_gen, ThreadPoolExecutorWithQueueSizeLimit
if __name__ == '__main__':
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument('-f', '--file', help="input video file or local camera")
group.add_argument('-r', '--rtsp', help="RTSP camera", action='store_true')
parser.add_argument(
'-s', '--serv', type=int, default=1,
help="input service demand, 1 for OBJECT_DETECTION",
)
parser.add_argument('-i', '--interval', type=int, help="interval between reading two frames in ms", required=True)
args = parser.parse_args()
logger.add("log/client_{time}.log", level="INFO")
file_type = edge_globals.IMAGE_TYPE
serv_type = 1 # args.serv
INTERVAL = args.interval / 1000.0 # convert into seconds
input_file = args.file
if input_file is not None:
if os.path.isfile(input_file) is False and input_file.isdigit() is False:
logger.error("input video file or local camera does not exist")
sys.exit()
elif input_file.isdigit():
input_file = int(input_file)
if input_file is None and args.rtsp is False:
logger.error("select either video file or RTSP camera")
sys.exit()
# obtain the control policy from the configuration file
edge_policy = read_config("edge-setting", "control_policy")
# load the video analytics models into memory)
if edge_policy != "always_cloud_lowest_delay":
logger.info("local models are loading...")
edge_globals.loaded_model = load_models(edge_object_detection_model)
logger.info("local models have loaded!")
# create the objects for video reading, decision making, and information management
reader = VideoReader(input_file, args.rtsp)
edge_globals.sys_info = SysInfo()
decision_engine = DecisionEngine(edge_globals.sys_info)
edge_globals.datastore = DataStore()
# start the thread pool for processing offloading requests
WORKER_NUM = int(read_config("edge-setting", "worker_number"))
executor = ThreadPoolExecutorWithQueueSizeLimit(max_workers=WORKER_NUM)
# the queue for local processing task passing
task_queue = queue.Queue(int(read_config("edge-setting", "queue_maxsize")))
# start the thread for local inference
local_processor = threading.Thread(target=local_worker, args=(task_queue,))
local_processor.start()
# n = 0
# read frames from video file or camera in loop
while True:
frame = reader.read_frame()
if frame is None:
executor.shutdown(wait=True)
local_processor.join(timeout=20)
cloud_average_process_delay = np.average([p.value for p in edge_globals.sys_info.offload_delay])
logger.info("Service come over!")
sys.exit()
# obtain the CPU and memory usage
edge_globals.sys_info.update_local_utilization()
# create the inference as a task
task_id = id_gen()
t_start = time.time()
task = Task(task_id, frame, serv_type, t_start)
# make decision on video frame processing
task = decision_engine.get_decision(edge_policy, task)
# local processing on the edge
if task.location == edge_globals.LOCAL:
task_queue.put(task, block=True)
edge_globals.sys_info.local_pending_task += 1
# offload to the cloud for processing
elif task.location == edge_globals.OFFLOAD:
executor.submit(offload_worker, task)
t_end = time.time()
if t_end - t_start < INTERVAL:
dur = INTERVAL - (t_end - t_start)
time.sleep(dur)