蚕茧视频识别AI程序关键代码(不包含资源、模型、转换库)

This commit is contained in:
BBIT-Kai
2025-11-18 16:36:05 +08:00
parent 7a5e29be1c
commit 4fa0c7d1eb
23 changed files with 2269 additions and 0 deletions
+67
View File
@@ -0,0 +1,67 @@
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import time
from threading import Lock
start_time=time.time()
fps_mutex = Lock()
class GETFPS:
def __init__(self,stream_id):
global start_time
self.start_time=start_time
self.is_first=True
self.frame_count=0
self.stream_id=stream_id
def update_fps(self):
end_time = time.time()
if self.is_first:
self.start_time = end_time
self.is_first = False
else:
global fps_mutex
with fps_mutex:
self.frame_count = self.frame_count + 1
def get_fps(self):
end_time = time.time()
with fps_mutex:
stream_fps = float(self.frame_count/(end_time - self.start_time))
self.frame_count = 0
self.start_time = end_time
return round(stream_fps, 2)
def print_data(self):
print('frame_count=',self.frame_count)
print('start_time=',self.start_time)
class PERF_DATA:
def __init__(self, num_streams=1):
self.perf_dict = {}
self.all_stream_fps = {}
for i in range(num_streams):
self.all_stream_fps["stream{0}".format(i)]=GETFPS(i)
def perf_print_callback(self):
self.perf_dict = {stream_index:stream.get_fps() for (stream_index, stream) in self.all_stream_fps.items()}
print ("\n**PERF: ", self.perf_dict, "\n")
return True
def update_fps(self, stream_index):
self.all_stream_fps[stream_index].update_fps()
View File
+34
View File
@@ -0,0 +1,34 @@
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import Gst
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
+94
View File
@@ -0,0 +1,94 @@
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import platform
from threading import Lock
from cuda.bindings import runtime
from cuda.bindings import driver
guard_platform_info = Lock()
class PlatformInfo:
def __init__(self):
self.is_wsl_system = False
self.wsl_verified = False
self.is_integrated_gpu_system = False
self.is_integrated_gpu_verified = False
self.is_aarch64_platform = False
self.is_aarch64_verified = False
def is_wsl(self):
with guard_platform_info:
# Check if its already verified as WSL system or not.
if not self.wsl_verified:
try:
# Open /proc/version file
with open("/proc/version", "r") as version_file:
# Read the content
version_info = version_file.readline()
version_info = version_info.lower()
self.wsl_verified = True
# Check if "microsoft" is present in the version information
if "microsoft" in version_info:
self.is_wsl_system = True
except Exception as e:
print(f"ERROR: Opening /proc/version failed: {e}")
return self.is_wsl_system
def is_integrated_gpu(self):
#Using cuda apis to identify whether integrated/discreet
#This is required to distinguish Tegra and ARM_SBSA devices
with guard_platform_info:
#Cuda initialize
if not self.is_integrated_gpu_verified:
cuda_init_result, = driver.cuInit(0)
if cuda_init_result == driver.CUresult.CUDA_SUCCESS:
#Get cuda devices count
device_count_result, num_devices = driver.cuDeviceGetCount()
if device_count_result == driver.CUresult.CUDA_SUCCESS:
#If atleast one device is found, we can use the property from
#the first device
if num_devices >= 1:
#Get properties from first device
property_result, properties = runtime.cudaGetDeviceProperties(0)
if property_result == runtime.cudaError_t.cudaSuccess:
print("Is it Integrated GPU? :", properties.integrated)
self.is_integrated_gpu_system = properties.integrated
self.is_integrated_gpu_verified = True
else:
print("ERROR: Getting cuda device property failed: {}".format(property_result))
else:
print("ERROR: No cuda devices found to check whether iGPU/dGPU")
else:
print("ERROR: Getting cuda device count failed: {}".format(device_count_result))
else:
print("ERROR: Cuda init failed: {}".format(cuda_init_result))
return self.is_integrated_gpu_system
def is_platform_aarch64(self):
#Check if platform is aarch64 using uname
if not self.is_aarch64_verified:
if platform.uname()[4] == 'aarch64':
self.is_aarch64_platform = True
self.is_aarch64_verified = True
return self.is_aarch64_platform
sys.path.append('/opt/nvidia/deepstream/deepstream/lib')
+24
View File
@@ -0,0 +1,24 @@
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import ctypes
import sys
sys.path.append('/opt/nvidia/deepstream/deepstream/lib')
def long_to_uint64(l):
value = ctypes.c_uint64(l & 0xffffffffffffffff).value
return value