在Python中使用onvif管理摄像头,包括设备发现,获取RTSP地址,获取设备信息,截图,云台控制与缩放,设置时间

目标

使用Python管理支持ONVIF的设备。支持:

  • 设备发现
  • 获取RTSP地址
  • 获取设备信息
  • 截图
  • 云台控制、缩放与聚焦
  • 设置时间

参考文档:wsdl文档Python API 本文严重参考并感谢:《ONVIF with python》

本文在上边的基础上,支持了多画面多码流的摄像头与NVR,并对更多功能做了详细说明,以下均基于Python3.11,其他版本可能略有不同

2024/1/23更新:适配NVR(多摄像头模式,可以独立控制每个摄像头),兼容了更多设备(部分设备的ONVIF有些限制,现在做了兼容)

实现

一、安装包

1
2
3
4
5
pip install onvif-zeep
# WSDiscovery 用于设备发现,不用则不再需要安装
pip install WSDiscovery
# requests 用于截图获取,不用则不需要安装
pip install requests

二、使用

基本类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import base64
import datetime
from itertools import groupby

import pytz
import requests
from onvif import ONVIFCamera
from requests.auth import HTTPDigestAuth
from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery
from zeep.helpers import serialize_object


def checkPwdAndGetCam(ip, port, usr, pwd):
try:
cam = ONVIFCamera(ip, port, usr, pwd)
media = cam.create_media_service()
profiles = media.GetProfiles()
except Exception as e:
if 'timed out' in str(e):
raise Exception("连接超时,请检查地址以及端口是否正确")
elif 'HTTPConnectionPool' in str(e):
raise Exception(
"连接失败,请检查地址以及端口是否正确。"
"<br/><br/><front style='color: #aaa;'>异常信息:%s</front>" % str(e))
else:
raise Exception(
"请检查账号密码是否正确。"
"<br/><br/><front style='color: #aaa;'>异常信息:%s</front>" % str(e))
return {
'cam': cam,
'media': media,
'profiles': profiles
}


class OnvifClient:
def __init__(self, ip, port: int, usr, pwd, token=None, sourceToken=None, nodeToken=False, needSnapImg=True, needPtz=True):
"""
初始化参数
:param ip:
:param port:
:param usr:
:param pwd:
:param token: 每个码流都会有一个独立的token,通常一个画面会有2个或更多码流,例如主码流&辅码流
:param sourceToken: 每个画面会有一个独立的sourceToken,通常一个摄像头只有一个画面,有些红外、双摄之类的摄像头会有多个画面
:param nodeToken: 用于获取PTZ控制信息的token,因为nodeToken有可能为None,所以用False表示没传
:param needSnapImg: 是否需要截图,选否会加快速度
:param needPtz: 是否需要PTZ控制,选否会加快速度,一般查摄像头列表时可以传False
"""
self.usr = usr
self.pwd = pwd
result = checkPwdAndGetCam(ip, port, usr, pwd)
self.profiles = result['profiles']
self.cam = result['cam']
self.media = result['media']
self.needSnapImg = needSnapImg
# 如果没传token,默认使用第一个token
self.token = token if token is not None else self.profiles[0].token
# 如果没传sourceToken,默认使用第一个sourceToken
self.sourceToken = sourceToken if sourceToken is not None \
else self.profiles[0].VideoSourceConfiguration.SourceToken
# 如果没传nodeToken,默认使用第一个nodeToken
PTZConfiguration = self.profiles[0].PTZConfiguration
self.nodeToken = nodeToken if nodeToken is not False \
else (PTZConfiguration.NodeToken if PTZConfiguration is not None else None)
if needPtz:
self.ptz = self.cam.create_ptz_service() if bool(self.cam.devicemgmt.GetCapabilities().PTZ) else None
self.imaging = self.cam.create_imaging_service()
self.ptzNode = self.ptz.GetNode({
'NodeToken': self.nodeToken
}) if self.ptz is not None and self.nodeToken is not None else None
self.MoveOption = self.imaging.GetMoveOptions({'VideoSourceToken': self.sourceToken})
else:
self.cam = self.ptz = self.imaging = self.ptzNode = self.MoveOption = None
if self.ptzNode is not None:
SupportedPTZSpaces = self.ptzNode.SupportedPTZSpaces
# PTZ云台移动速度峰值
if len(SupportedPTZSpaces.PanTiltSpeedSpace) > 0:
PanTiltSpeedSpace = SupportedPTZSpaces.PanTiltSpeedSpace
elif len(SupportedPTZSpaces.ContinuousPanTiltVelocitySpace) > 0:
PanTiltSpeedSpace = SupportedPTZSpaces.ContinuousPanTiltVelocitySpace
else:
PanTiltSpeedSpace = None
self.PanTiltSpeedMax = PanTiltSpeedSpace[0].XRange.Max if PanTiltSpeedSpace is not None else None
# PTZ缩放速度峰值
if len(SupportedPTZSpaces.ZoomSpeedSpace) > 0:
ZoomSpeedSpace = SupportedPTZSpaces.ZoomSpeedSpace
elif len(SupportedPTZSpaces.ContinuousZoomVelocitySpace) > 0:
ZoomSpeedSpace = SupportedPTZSpaces.ContinuousZoomVelocitySpace
else:
ZoomSpeedSpace = None
self.ZoomSpeedMax = ZoomSpeedSpace[0].XRange.Max if ZoomSpeedSpace is not None else None
else:
self.PanTiltSpeedMax = self.ZoomSpeedMax = None
# 聚焦移动速度峰值
self.MoveSpeedMax = self.MoveOption.Continuous.Speed.Max \
if self.MoveOption is not None else None

def get_rtsp(self):
"""
获取RTSP地址等
参考文档:https://www.onvif.org/onvif/ver10/media/wsdl/media.wsdl#op.GetStreamUri
"""
result = []
StreamSetup = {'Stream': 'RTP-Unicast', 'Transport': {'Protocol': 'RTSP'}}
for profile in self.profiles:
obj = self.media.create_type('GetStreamUri')
obj.StreamSetup = StreamSetup
obj.ProfileToken = profile.token
res_uri = self.media.GetStreamUri(obj)['Uri']
if 'rtsp://' in res_uri and '@' not in res_uri:
res_uri = res_uri.replace('rtsp://', 'rtsp://%s:%s@' % (self.usr, self.pwd))
result.append({
'source': profile.VideoSourceConfiguration.SourceToken,
'node': profile.PTZConfiguration.NodeToken if profile.PTZConfiguration is not None else None,
'uri': res_uri,
'token': profile.token,
'videoEncoding': profile.VideoEncoderConfiguration.Encoding,
'Resolution': serialize_object(profile.VideoEncoderConfiguration.Resolution),
'img': self.snip_image(profile.token) if self.needSnapImg else None
})
sortedResult = sorted(result, key=lambda d: d['source'])
groupData = groupby(sortedResult, key=lambda x: x['source'])
return [{'source': key, 'data': [item for item in group]} for key, group in groupData]

def snip_image(self, token=None):
"""
截图,如果在浏览器上访问,可在img的src填入[data:image/jpeg;base64,%s],%s处填写return值
参考文档:https://www.onvif.org/onvif/ver10/media/wsdl/media.wsdl#op.GetSnapshotUri
:param token:
:return: base64转码之后的图片
"""
token = token if token is not None else self.token
res = self.media.GetSnapshotUri({'ProfileToken': token})
auth = HTTPDigestAuth(self.usr, self.pwd)
rsp = requests.get(res.Uri, auth=auth)
return base64.b64encode(rsp.content).decode('utf-8')

def get_deviceInfo(self):
"""
获取设备信息
参考文档:https://www.onvif.org/onvif/ver10/device/wsdl/devicemgmt.wsdl#op.GetDeviceInformation
:return: 设备信息,包括名称-Model、厂家-Manufacturer、固件版本-FirmwareVersion、序列号-SerialNumber、硬件ID-HardwareId
"""
return serialize_object(self.cam.devicemgmt.GetDeviceInformation())

def ptz_move(self, Velocity=None, token=None):
"""
PTZ控制移动
参考文档:https://www.onvif.org/onvif/ver20/ptz/wsdl/ptz.wsdl#op.ContinuousMove
:param token: 移动设备的token
:param Velocity: 可选参数,不传表示停止移动
"""
token = token if token is not None else self.token
if self.ptz is None:
if Velocity is not None:
# 只在移动时展示不支持,以免频繁打扰
raise Exception("该设备不支持PTZ控制")
else:
if Velocity is None:
self.ptz.Stop({'ProfileToken': token})
else:
request = self.ptz.create_type('ContinuousMove')
request.ProfileToken = token
request.Velocity = Velocity
self.ptz.ContinuousMove(request)

def focus_move(self, speed=None, token=None):
"""
聚焦
参考文档:https://www.onvif.org/onvif/ver20/imaging/wsdl/imaging.wsdl#op.Move
:param token: VideoSourceToken
:param speed: 正数:聚焦+,拉近;负数:聚焦-,拉远;None:停止聚焦
"""
token = token if token is not None else self.sourceToken
if speed is not None:
request = self.imaging.create_type('Move')
request.VideoSourceToken = token
request.Focus = {'Continuous': {'Speed': speed}}
try:
self.imaging.Move(request)
except Exception as e:
raise Exception(
"该设备不支持该功能!"
"<br/><br/><front style='color: #aaa;'>异常信息:%s</front>" % str(e))
else:
self.imaging.Stop({'VideoSourceToken': token})

def set_cam_time(self, timeStamp=None):
"""
设置时间
参考文档:https://www.onvif.org/onvif/ver10/device/wsdl/devicemgmt.wsdl#op.SetSystemDateAndTime
:param timeStamp: 秒级时间戳,不传则使用当前时间
"""
if timeStamp is None:
timeNow = datetime.datetime.now()
else:
timeNow = datetime.datetime.fromtimestamp(int(timeStamp))
utc_now = timeNow.astimezone(pytz.utc)
self.cam.devicemgmt.SetSystemDateAndTime({
'DateTimeType': 'Manual',
'DaylightSavings': False,
'TimeZone': {
'TZ': 'CST-8'
},
'UTCDateTime': {
'Time': {
'Hour': utc_now.hour,
'Minute': utc_now.minute,
'Second': utc_now.second
},
'Date': {
'Year': utc_now.year,
'Month': utc_now.month,
'Day': utc_now.day
}
}
})


def ptzChangeByClient(client, codeStr, status, speed=50.0):
"""
PTZ控制
:param client: onvif客户端
:param speed: 相对速度,1-100
:param status: 状态,1-开始,0-停止
:param codeStr: 标志字符串
"""
ptzList = ['Up', 'Right', 'Down', 'Left', 'LeftUp', 'RightUp', 'LeftDown', 'RightDown', 'ZoomWide', 'ZoomTele']
focusList = ['FocusFar', 'FocusNear']
if codeStr in ptzList:
if client.ptz is None:
if status == 1:
raise Exception("当前设备不支持PTZ控制")
else:
return
if status == 0:
if 'Zoom' not in codeStr:
if client.PanTiltSpeedMax is None:
return
else:
if client.ZoomSpeedMax is None:
return
client.ptz_move()
else:
PanTiltSpeed = 0
ZoomSpeed = 0
if 'Zoom' not in codeStr:
if client.PanTiltSpeedMax is None:
raise Exception("当前设备不支持云台控制")
PanTiltSpeed = client.PanTiltSpeedMax * float(speed) / 100.0
speedTilt = PanTiltSpeed if 'Up' in codeStr else (
PanTiltSpeed * -1.0 if 'Down' in codeStr else 0)
speedPan = PanTiltSpeed if 'Right' in codeStr else (
PanTiltSpeed * -1.0 if 'Left' in codeStr else 0)
params = {
'PanTilt': {
'x': speedPan,
'y': speedTilt
}
}
else:
if client.ZoomSpeedMax is None:
raise Exception("当前设备不支持缩放控制")
ZoomSpeed = client.ZoomSpeedMax * float(speed) / 100.0
speedZoom = 0 if 'Zoom' not in codeStr else (
ZoomSpeed * -1.0 if 'Wide' in codeStr else ZoomSpeed)
params = {
'Zoom': speedZoom
}
client.ptz_move(params)
elif codeStr in focusList:
if client.MoveSpeedMax is None:
if status == 1:
raise Exception("当前设备不支持聚焦控制")
else:
return
if status == 0:
client.focus_move()
else:
MoveSpeed = client.MoveSpeedMax * float(speed) / 100.0
client.focus_move(MoveSpeed if 'FocusNear' == codeStr else MoveSpeed * -1.0)
else:
if status == 1:
raise Exception("该方式暂不支持")

调用方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import json


# 设备发现
print(ws_discovery())

client = OnvifClient('192.168.1.10', 80, 'admin', '123456', needSnapImg=False)
# 如果要控制特定摄像头,可以下边这样写
# client = OnvifClient('192.168.1.10', 80, 'admin', '123456', token="ProfileToken002", sourceToken= "VideoSourceToken002", nodeToken="NodeToken002", needSnapImg=False)

# 获取所有画面所有码流的RTSP地址、token(即ProfileToken)、sourceToken、nodeToken等信息
print(json.dumps(client.get_rtsp()))

# 获取设备信息
print(json.dumps(client.get_deviceInfo()))

# 设置时间
client.set_cam_time()

# 云台与聚焦控制
# 云台上移
ptzChangeByClient(client, 'Up', 1)
# 移动一秒
time.sleep(1)
# 然后停止
ptzChangeByClient(client, 'Up', 0)

在Python中使用onvif管理摄像头,包括设备发现,获取RTSP地址,获取设备信息,截图,云台控制与缩放,设置时间
https://blog.ctftools.com/2023/12/newpost-54/
作者
Dr3@m
发布于
2023年12月14日
许可协议