#!/usr/bin/env python3 ################################################################################ # SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import sys from pathlib import Path SCRIPT_DIR = Path(__file__).resolve().parent # apps/mine DEEPSTREAM_PY_DIR = SCRIPT_DIR.parent # deepstream_python_apps sys.path.insert(0, str(DEEPSTREAM_PY_DIR)) # common/ 在这里 sys.path.insert(0, str(DEEPSTREAM_PY_DIR / "pyds")) # pyds 模块 from pathlib import Path from os import environ import gi gi.require_version('Gst', '1.0') from gi.repository import GLib, Gst from ctypes import * import time import sys import math from common.platform_info import PlatformInfo from common.bus_call import bus_call from common.FPS import PERF_DATA import pyds import sys from pathlib import Path from utils.my_utils import * display_type = 0 silent = False file_loop = False perf_data = None measure_latency = False MAX_DISPLAY_LEN=64 PGIE_CLASS_ID_NORMAL = 0 PGIE_CLASS_ID_DOUBLE_PUPA = 1 PGIE_CLASS_ID_SPOT = 2 PGIE_CLASS_ID_HAIRY = 3 PGIE_CLASS_ID_MAGGOT_SHELL = 4 MUXER_OUTPUT_WIDTH=1920 MUXER_OUTPUT_HEIGHT=1080 MUXER_BATCH_TIMEOUT_USEC = 33000 TILED_OUTPUT_WIDTH=1280 TILED_OUTPUT_HEIGHT=720 YOLO_SIZE=768 GST_CAPS_FEATURES_NVMM="memory:NVMM" OSD_PROCESS_MODE= 0 OSD_DISPLAY_TEXT= 1 tracker_dict = {} obj_counter = { PGIE_CLASS_ID_NORMAL:0, PGIE_CLASS_ID_SPOT:0, PGIE_CLASS_ID_DOUBLE_PUPA:0, PGIE_CLASS_ID_HAIRY:0, PGIE_CLASS_ID_MAGGOT_SHELL:0 } od_labels = [] statics={} timestamp_in_video = 0 with open("../models/labels.txt", "r") as f: od_labels = [line.strip() for line in f.readlines()] def stats_callback(): # 这里统计你需要的数据,例如: global timestamp_in_video,statics timestamp_in_video += 1 # 假设每秒钟调用一次 temp_json = {} for class_id, count in obj_counter.items(): class_name = get_type_name_by_id(class_id) temp_json[class_name] = count statics[timestamp_in_video] = temp_json return True # 返回 True 才会继续循环调用 def add_fps_display_meta(lineCount, batch_meta, frame_meta, display_text): display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta) display_meta.num_labels = 1 py_nvosd_text_params = display_meta.text_params[0] py_nvosd_text_params.display_text = display_text py_nvosd_text_params.x_offset = 12 py_nvosd_text_params.y_offset = 12 + lineCount * 24 py_nvosd_text_params.font_params.font_name = "Serif" py_nvosd_text_params.font_params.font_size = 10 py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0) py_nvosd_text_params.set_bg_clr = 1 py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0) # print(pyds.get_string(py_nvosd_text_params.display_text)) pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta) # pgie_src_pad_buffer_probe will extract metadata received on tiler sink pad # and update params for drawing rectangle, object information etc. def pgie_src_pad_buffer_probe(pad,info,u_data): global measure_latency, obj_counter frame_number=0 num_rects=0 got_fps = False gst_buffer = info.get_buffer() if not gst_buffer: print("Unable to get GstBuffer ") return # Retrieve batch metadata from the gst_buffer # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the # C address of gst_buffer as input, which is obtained with hash(gst_buffer) # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set. # To enable component level latency measurement, please set environment variable # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above. if measure_latency: num_sources_in_batch = pyds.nvds_measure_buffer_latency(hash(gst_buffer)) if num_sources_in_batch == 0: print("Unable to get number of sources in GstBuffer for latency measurement") batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer)) l_frame = batch_meta.frame_meta_list while l_frame is not None: try: # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta # The casting is done by pyds.NvDsFrameMeta.cast() # The casting also keeps ownership of the underlying memory # in the C code, so the Python garbage collector will leave # it alone. frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data) except StopIteration: break frame_number=frame_meta.frame_num l_obj=frame_meta.obj_meta_list num_rects = frame_meta.num_obj_meta # 初始化 obj_counter = { PGIE_CLASS_ID_NORMAL:0, PGIE_CLASS_ID_SPOT:0, PGIE_CLASS_ID_DOUBLE_PUPA:0, PGIE_CLASS_ID_HAIRY:0, PGIE_CLASS_ID_MAGGOT_SHELL:0 } while l_obj is not None: try: # Casting l_obj.data to pyds.NvDsObjectMeta obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data) except StopIteration: break obj_counter[obj_meta.class_id] += 1 try: l_obj=l_obj.next except StopIteration: break # if not silent: # print("帧ID=", frame_number, "总蚕茧数=",num_rects,"正茧数=",obj_counter[PGIE_CLASS_ID_NORMAL],"黄斑茧数=",obj_counter[PGIE_CLASS_ID_SPOT]) # Update frame rate through this probe stream_index = "stream{0}".format(frame_meta.pad_index) global perf_data perf_data.update_fps(stream_index) text = f"Object Count: {num_rects}\n" for i in range(len(od_labels)): if obj_counter[i] > 0: text += f"\t{od_labels[i]}: {obj_counter[i]}\n" add_fps_display_meta(0,batch_meta,frame_meta, text) try: l_frame=l_frame.next except StopIteration: break # Multi Object Tracker---------------------------------------------------- l_obj=batch_meta.batch_user_meta_list while l_obj is not None: try: user_meta=pyds.NvDsUserMeta.cast(l_obj.data) except StopIteration: break # 遍历user_meta.base_meta.meta_type if user_meta and user_meta.base_meta.meta_type==pyds.NvDsMetaType.NVDS_TRACKER_PAST_FRAME_META : try: pPastDataBatch = pyds.NvDsTargetMiscDataBatch.cast(user_meta.user_meta_data) except StopIteration: break for miscDataStream in pyds.NvDsTargetMiscDataBatch.list(pPastDataBatch): for miscDataObj in pyds.NvDsTargetMiscDataStream.list(miscDataStream): unique_id = miscDataObj.uniqueId tracker_dict[unique_id] = miscDataObj.classId try: l_obj=l_obj.next except StopIteration: break return Gst.PadProbeReturn.OK def cb_newpad(decodebin, decoder_src_pad,data): print("In cb_newpad\n") caps=decoder_src_pad.get_current_caps() if not caps: caps = decoder_src_pad.query_caps() gststruct=caps.get_structure(0) gstname=gststruct.get_name() source_bin=data features=caps.get_features(0) # Need to check if the pad created by the decodebin is for video and not # audio. print("gstname=",gstname) if(gstname.find("video")!=-1): # Link the decodebin pad only if decodebin has picked nvidia # decoder plugin nvdec_*. We do this by checking if the pad caps contain # NVMM memory features. print("features=",features) if features.contains("memory:NVMM"): # Get the source bin ghost pad bin_ghost_pad=source_bin.get_static_pad("src") if not bin_ghost_pad.set_target(decoder_src_pad): sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n") else: sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n") def decodebin_child_added(child_proxy,Object,name,user_data): print("Decodebin child added:", name, "\n") if(name.find("decodebin") != -1): Object.connect("child-added",decodebin_child_added,user_data) if "source" in name: source_element = child_proxy.get_by_name("source") if source_element.find_property('drop-on-latency') != None: Object.set_property("drop-on-latency", True) def create_source_bin(index,uri): print("Creating source bin") # Create a source GstBin to abstract this bin's content from the rest of the # pipeline bin_name="source-bin-%02d" %index print(bin_name) nbin=Gst.Bin.new(bin_name) if not nbin: sys.stderr.write(" Unable to create source bin \n") # Source element for reading from the uri. # We will use decodebin and let it figure out the container format of the # stream and the codec and plug the appropriate demux and decode plugins. if file_loop: # use nvurisrcbin to enable file-loop uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin") uri_decode_bin.set_property("file-loop", 1) uri_decode_bin.set_property("cudadec-memtype", 0) else: uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin") if not uri_decode_bin: sys.stderr.write(" Unable to create uri decode bin \n") # We set the input uri to the source element uri_decode_bin.set_property("uri",uri) # Connect to the "pad-added" signal of the decodebin which generates a # callback once a new pad for raw data has beed created by the decodebin uri_decode_bin.connect("pad-added",cb_newpad,nbin) uri_decode_bin.connect("child-added",decodebin_child_added,nbin) # We need to create a ghost pad for the source bin which will act as a proxy # for the video decoder src pad. The ghost pad will not have a target right # now. Once the decode bin creates the video decoder and generates the # cb_newpad callback, we will set the ghost pad target to the video decoder # src pad. Gst.Bin.add(nbin,uri_decode_bin) bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC)) if not bin_pad: sys.stderr.write(" Failed to add ghost pad in source bin \n") return None return nbin def sca(stream_paths, out_put_file_name:str): disable_probe=False global perf_data,TILED_OUTPUT_WIDTH,TILED_OUTPUT_HEIGHT,YOLO_SIZE perf_data = PERF_DATA(len(stream_paths)) number_sources=len(stream_paths) if number_sources == 1: print("重置输出分辨率至:{} x {}".format(TILED_OUTPUT_WIDTH, TILED_OUTPUT_HEIGHT)) TILED_OUTPUT_WIDTH, TILED_OUTPUT_HEIGHT = getVideoResolution(stream_paths[0]) platform_info = PlatformInfo() Gst.init(None) pipeline = Gst.Pipeline() is_live = False if not pipeline: sys.stderr.write(" Unable to create Pipeline \n") nvstreammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer") if not nvstreammux: sys.stderr.write(" Unable to create NvStreamMux \n") pipeline.add(nvstreammux) for i in range(number_sources): uri_name=stream_paths[i] source_bin=create_source_bin(i, uri_name) if not source_bin: sys.stderr.write("Unable to create source bin \n") pipeline.add(source_bin) padname="sink_%u" %i sinkpad= nvstreammux.request_pad_simple(padname) if not sinkpad: sys.stderr.write("Unable to create sink pad bin \n") srcpad=source_bin.get_static_pad("src") if not srcpad: sys.stderr.write("Unable to create src pad bin \n") srcpad.link(sinkpad) queue1=Gst.ElementFactory.make("queue","queue1") queue2=Gst.ElementFactory.make("queue","queue2") queue3=Gst.ElementFactory.make("queue","queue3") queue4=Gst.ElementFactory.make("queue","queue4") queue5=Gst.ElementFactory.make("queue","queue5") pipeline.add(queue1) pipeline.add(queue2) pipeline.add(queue3) pipeline.add(queue4) pipeline.add(queue5) nvdslogger = None pgie = Gst.ElementFactory.make("nvinfer", "primary-inference") if not pgie: sys.stderr.write(" Unable to create pgie") pgie.set_property('config-file-path', "config/pgie_config.txt") if disable_probe: # Use nvdslogger for perf measurement instead of probe function nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger") nvmultistreamtiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler") if not nvmultistreamtiler: sys.stderr.write(" Unable to create tiler \n") nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor") if not nvvidconv: sys.stderr.write(" Unable to create nvvidconv \n") nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay") if not nvosd: sys.stderr.write(" Unable to create nvosd \n") nvosd.set_property('process-mode',OSD_PROCESS_MODE) nvosd.set_property('display-text',OSD_DISPLAY_TEXT) if file_loop: if platform_info.is_integrated_gpu(): # Set nvbuf-memory-type=4 for integrated gpu for file-loop (nvurisrcbin case) nvstreammux.set_property('nvbuf-memory-type', 4) else: # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case) nvstreammux.set_property('nvbuf-memory-type', 2) sink = Gst.ElementFactory.make("filesink", "file-sink") sink.set_property("location", out_put_file_name) sink.set_property("sync", 0) if not sink: sys.stderr.write(" Unable to create sink element \n") if is_live: nvstreammux.set_property('live-source', 1) nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", "convertor2") if not nvvidconv2: sys.stderr.write(" Unable to create nvvidconv2 \n") encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder") # for h264 # encoder = Gst.ElementFactory.make("avenc_mpeg4", "encoder") # for mpeg4 if not encoder: sys.stderr.write(" Unable to create encoder \n") encoder.set_property("bitrate", 2000000) h264parse = Gst.ElementFactory.make("h264parse", "parser") # codeparser = Gst.ElementFactory.make("mpeg4videoparse", "mpeg4-parser") if not h264parse: sys.stderr.write(" Unable to create code parser \n") qtmux = Gst.ElementFactory.make("qtmux", "qtmux") if not qtmux: sys.stderr.write(" Unable to create code parser \n") nvstreammux.set_property('width', YOLO_SIZE) nvstreammux.set_property('height', YOLO_SIZE) nvstreammux.set_property('batch-size', number_sources) nvstreammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC) pgie_batch_size=pgie.get_property("batch-size") if(pgie_batch_size != number_sources): pgie.set_property("batch-size",number_sources) tiler_rows=int(math.sqrt(number_sources)) tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows)) nvmultistreamtiler.set_property("rows",tiler_rows) nvmultistreamtiler.set_property("columns",tiler_columns) nvmultistreamtiler.set_property("width", TILED_OUTPUT_WIDTH) nvmultistreamtiler.set_property("height", TILED_OUTPUT_HEIGHT) if platform_info.is_integrated_gpu(): nvmultistreamtiler.set_property("compute-hw", 2) else: nvmultistreamtiler.set_property("compute-hw", 1) sink.set_property("qos",0) tracker = Gst.ElementFactory.make("nvtracker", "tracker") if not tracker: sys.stderr.write(" Unable to create tracker \n") tracker.set_property('tracker-width', YOLO_SIZE) tracker.set_property('tracker-height', YOLO_SIZE) # To fix 'gstnvtracker: Unable to acquire a user meta buffer. Try increasing user-meta-pool-size' tracker.set_property('user-meta-pool-size', 128) tracker.set_property('gpu_id', 0) tracker.set_property('ll-lib-file', 'lib/libnvds_nvmultiobjecttracker.so') tracker.set_property('ll-config-file', 'config/config_tracker_NvDCF_perf.yml') pipeline.add(pgie) if nvdslogger: pipeline.add(nvdslogger) pipeline.add(nvmultistreamtiler) pipeline.add(tracker) pipeline.add(nvvidconv) pipeline.add(nvosd) pipeline.add(sink) nvstreammux.link(queue1) queue1.link(pgie) pgie.link(tracker) tracker.link(queue2) if nvdslogger: queue2.link(nvdslogger) nvdslogger.link(nvmultistreamtiler) else: queue2.link(nvmultistreamtiler) nvmultistreamtiler.link(queue3) queue3.link(nvvidconv) nvvidconv.link(queue4) queue4.link(nvosd) nvosd.link(queue5) if display_type == 0: pipeline.add(nvvidconv2) pipeline.add(encoder) pipeline.add(h264parse) pipeline.add(qtmux) queue5.link(nvvidconv2) nvvidconv2.link(encoder) encoder.link(h264parse) h264parse.link(qtmux) qtmux.link(sink) else: queue5.link(sink) # create an event loop and feed gstreamer bus mesages to it loop = GLib.MainLoop() bus = pipeline.get_bus() bus.add_signal_watch() # 关闭日志 bus.connect ("message", bus_call, loop) pgie_src_pad=nvosd.get_static_pad("sink") if not pgie_src_pad: sys.stderr.write(" Unable to get src pad \n") else: if not disable_probe: pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0) # perf callback function to print fps every 5 sec GLib.timeout_add(5000, perf_data.perf_print_callback) GLib.timeout_add(500, stats_callback) # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set. # To enable component level latency measurement, please set environment variable # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above. if environ.get('NVDS_ENABLE_LATENCY_MEASUREMENT') == '1': print ("Pipeline Latency Measurement enabled!\nPlease set env var NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 for Component Latency Measurement") global measure_latency measure_latency = True # List the sources for i, source in enumerate(stream_paths): print(i, ": ", source) print("Starting pipeline \n") # start play back and listed to events pipeline.set_state(Gst.State.PLAYING) # 开始时间 start_time = time.time() try: loop.run() except: pass pipeline.set_state(Gst.State.NULL) print("总蚕茧数:",len(tracker_dict)) print("Exiting app\n") # 结束时间 end_time = time.time() analysis_time = end_time - start_time print(f"处理时间: {analysis_time} 秒") global statics save_sca_results(stream_paths[0], out_put_file_name,tracker_dict,analysis_time,statics)