1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
| import base64 import datetime from itertools import groupby
import pytz import requests from onvif import ONVIFCamera from requests.auth import HTTPDigestAuth from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery from zeep.helpers import serialize_object
def checkPwdAndGetCam(ip, port, usr, pwd): try: cam = ONVIFCamera(ip, port, usr, pwd) media = cam.create_media_service() profiles = media.GetProfiles() except Exception as e: if 'timed out' in str(e): raise Exception("连接超时,请检查地址以及端口是否正确") elif 'HTTPConnectionPool' in str(e): raise Exception( "连接失败,请检查地址以及端口是否正确。" "<br/><br/><front style='color: #aaa;'>异常信息:%s</front>" % str(e)) else: raise Exception( "请检查账号密码是否正确。" "<br/><br/><front style='color: #aaa;'>异常信息:%s</front>" % str(e)) return { 'cam': cam, 'media': media, 'profiles': profiles }
class OnvifClient: def __init__(self, ip, port: int, usr, pwd, token=None, sourceToken=None, nodeToken=False, needSnapImg=True, needPtz=True): """ 初始化参数 :param ip: :param port: :param usr: :param pwd: :param token: 每个码流都会有一个独立的token,通常一个画面会有2个或更多码流,例如主码流&辅码流 :param sourceToken: 每个画面会有一个独立的sourceToken,通常一个摄像头只有一个画面,有些红外、双摄之类的摄像头会有多个画面 :param nodeToken: 用于获取PTZ控制信息的token,因为nodeToken有可能为None,所以用False表示没传 :param needSnapImg: 是否需要截图,选否会加快速度 :param needPtz: 是否需要PTZ控制,选否会加快速度,一般查摄像头列表时可以传False """ self.usr = usr self.pwd = pwd result = checkPwdAndGetCam(ip, port, usr, pwd) self.profiles = result['profiles'] self.cam = result['cam'] self.media = result['media'] self.needSnapImg = needSnapImg self.token = token if token is not None else self.profiles[0].token self.sourceToken = sourceToken if sourceToken is not None \ else self.profiles[0].VideoSourceConfiguration.SourceToken PTZConfiguration = self.profiles[0].PTZConfiguration self.nodeToken = nodeToken if nodeToken is not False \ else (PTZConfiguration.NodeToken if PTZConfiguration is not None else None) if needPtz: self.ptz = self.cam.create_ptz_service() if bool(self.cam.devicemgmt.GetCapabilities().PTZ) else None self.imaging = self.cam.create_imaging_service() self.ptzNode = self.ptz.GetNode({ 'NodeToken': self.nodeToken }) if self.ptz is not None and self.nodeToken is not None else None self.MoveOption = self.imaging.GetMoveOptions({'VideoSourceToken': self.sourceToken}) else: self.cam = self.ptz = self.imaging = self.ptzNode = self.MoveOption = None if self.ptzNode is not None: SupportedPTZSpaces = self.ptzNode.SupportedPTZSpaces if len(SupportedPTZSpaces.PanTiltSpeedSpace) > 0: PanTiltSpeedSpace = SupportedPTZSpaces.PanTiltSpeedSpace elif len(SupportedPTZSpaces.ContinuousPanTiltVelocitySpace) > 0: PanTiltSpeedSpace = SupportedPTZSpaces.ContinuousPanTiltVelocitySpace else: PanTiltSpeedSpace = None self.PanTiltSpeedMax = PanTiltSpeedSpace[0].XRange.Max if PanTiltSpeedSpace is not None else None if len(SupportedPTZSpaces.ZoomSpeedSpace) > 0: ZoomSpeedSpace = SupportedPTZSpaces.ZoomSpeedSpace elif len(SupportedPTZSpaces.ContinuousZoomVelocitySpace) > 0: ZoomSpeedSpace = SupportedPTZSpaces.ContinuousZoomVelocitySpace else: ZoomSpeedSpace = None self.ZoomSpeedMax = ZoomSpeedSpace[0].XRange.Max if ZoomSpeedSpace is not None else None else: self.PanTiltSpeedMax = self.ZoomSpeedMax = None self.MoveSpeedMax = self.MoveOption.Continuous.Speed.Max \ if self.MoveOption is not None else None
def get_rtsp(self): """ 获取RTSP地址等 参考文档:https://www.onvif.org/onvif/ver10/media/wsdl/media.wsdl#op.GetStreamUri """ result = [] StreamSetup = {'Stream': 'RTP-Unicast', 'Transport': {'Protocol': 'RTSP'}} for profile in self.profiles: obj = self.media.create_type('GetStreamUri') obj.StreamSetup = StreamSetup obj.ProfileToken = profile.token res_uri = self.media.GetStreamUri(obj)['Uri'] if 'rtsp://' in res_uri and '@' not in res_uri: res_uri = res_uri.replace('rtsp://', 'rtsp://%s:%s@' % (self.usr, self.pwd)) result.append({ 'source': profile.VideoSourceConfiguration.SourceToken, 'node': profile.PTZConfiguration.NodeToken if profile.PTZConfiguration is not None else None, 'uri': res_uri, 'token': profile.token, 'videoEncoding': profile.VideoEncoderConfiguration.Encoding, 'Resolution': serialize_object(profile.VideoEncoderConfiguration.Resolution), 'img': self.snip_image(profile.token) if self.needSnapImg else None }) sortedResult = sorted(result, key=lambda d: d['source']) groupData = groupby(sortedResult, key=lambda x: x['source']) return [{'source': key, 'data': [item for item in group]} for key, group in groupData]
def snip_image(self, token=None): """ 截图,如果在浏览器上访问,可在img的src填入[data:image/jpeg;base64,%s],%s处填写return值 参考文档:https://www.onvif.org/onvif/ver10/media/wsdl/media.wsdl#op.GetSnapshotUri :param token: :return: base64转码之后的图片 """ token = token if token is not None else self.token res = self.media.GetSnapshotUri({'ProfileToken': token}) auth = HTTPDigestAuth(self.usr, self.pwd) rsp = requests.get(res.Uri, auth=auth) return base64.b64encode(rsp.content).decode('utf-8')
def get_deviceInfo(self): """ 获取设备信息 参考文档:https://www.onvif.org/onvif/ver10/device/wsdl/devicemgmt.wsdl#op.GetDeviceInformation :return: 设备信息,包括名称-Model、厂家-Manufacturer、固件版本-FirmwareVersion、序列号-SerialNumber、硬件ID-HardwareId """ return serialize_object(self.cam.devicemgmt.GetDeviceInformation())
def ptz_move(self, Velocity=None, token=None): """ PTZ控制移动 参考文档:https://www.onvif.org/onvif/ver20/ptz/wsdl/ptz.wsdl#op.ContinuousMove :param token: 移动设备的token :param Velocity: 可选参数,不传表示停止移动 """ token = token if token is not None else self.token if self.ptz is None: if Velocity is not None: raise Exception("该设备不支持PTZ控制") else: if Velocity is None: self.ptz.Stop({'ProfileToken': token}) else: request = self.ptz.create_type('ContinuousMove') request.ProfileToken = token request.Velocity = Velocity self.ptz.ContinuousMove(request)
def focus_move(self, speed=None, token=None): """ 聚焦 参考文档:https://www.onvif.org/onvif/ver20/imaging/wsdl/imaging.wsdl#op.Move :param token: VideoSourceToken :param speed: 正数:聚焦+,拉近;负数:聚焦-,拉远;None:停止聚焦 """ token = token if token is not None else self.sourceToken if speed is not None: request = self.imaging.create_type('Move') request.VideoSourceToken = token request.Focus = {'Continuous': {'Speed': speed}} try: self.imaging.Move(request) except Exception as e: raise Exception( "该设备不支持该功能!" "<br/><br/><front style='color: #aaa;'>异常信息:%s</front>" % str(e)) else: self.imaging.Stop({'VideoSourceToken': token})
def set_cam_time(self, timeStamp=None): """ 设置时间 参考文档:https://www.onvif.org/onvif/ver10/device/wsdl/devicemgmt.wsdl#op.SetSystemDateAndTime :param timeStamp: 秒级时间戳,不传则使用当前时间 """ if timeStamp is None: timeNow = datetime.datetime.now() else: timeNow = datetime.datetime.fromtimestamp(int(timeStamp)) utc_now = timeNow.astimezone(pytz.utc) self.cam.devicemgmt.SetSystemDateAndTime({ 'DateTimeType': 'Manual', 'DaylightSavings': False, 'TimeZone': { 'TZ': 'CST-8' }, 'UTCDateTime': { 'Time': { 'Hour': utc_now.hour, 'Minute': utc_now.minute, 'Second': utc_now.second }, 'Date': { 'Year': utc_now.year, 'Month': utc_now.month, 'Day': utc_now.day } } })
def ptzChangeByClient(client, codeStr, status, speed=50.0): """ PTZ控制 :param client: onvif客户端 :param speed: 相对速度,1-100 :param status: 状态,1-开始,0-停止 :param codeStr: 标志字符串 """ ptzList = ['Up', 'Right', 'Down', 'Left', 'LeftUp', 'RightUp', 'LeftDown', 'RightDown', 'ZoomWide', 'ZoomTele'] focusList = ['FocusFar', 'FocusNear'] if codeStr in ptzList: if client.ptz is None: if status == 1: raise Exception("当前设备不支持PTZ控制") else: return if status == 0: if 'Zoom' not in codeStr: if client.PanTiltSpeedMax is None: return else: if client.ZoomSpeedMax is None: return client.ptz_move() else: PanTiltSpeed = 0 ZoomSpeed = 0 if 'Zoom' not in codeStr: if client.PanTiltSpeedMax is None: raise Exception("当前设备不支持云台控制") PanTiltSpeed = client.PanTiltSpeedMax * float(speed) / 100.0 speedTilt = PanTiltSpeed if 'Up' in codeStr else ( PanTiltSpeed * -1.0 if 'Down' in codeStr else 0) speedPan = PanTiltSpeed if 'Right' in codeStr else ( PanTiltSpeed * -1.0 if 'Left' in codeStr else 0) params = { 'PanTilt': { 'x': speedPan, 'y': speedTilt } } else: if client.ZoomSpeedMax is None: raise Exception("当前设备不支持缩放控制") ZoomSpeed = client.ZoomSpeedMax * float(speed) / 100.0 speedZoom = 0 if 'Zoom' not in codeStr else ( ZoomSpeed * -1.0 if 'Wide' in codeStr else ZoomSpeed) params = { 'Zoom': speedZoom } client.ptz_move(params) elif codeStr in focusList: if client.MoveSpeedMax is None: if status == 1: raise Exception("当前设备不支持聚焦控制") else: return if status == 0: client.focus_move() else: MoveSpeed = client.MoveSpeedMax * float(speed) / 100.0 client.focus_move(MoveSpeed if 'FocusNear' == codeStr else MoveSpeed * -1.0) else: if status == 1: raise Exception("该方式暂不支持")
|