当前位置:网站首页>IOTOS物联中台对接海康安防平台(iSecure Center)门禁系统
IOTOS物联中台对接海康安防平台(iSecure Center)门禁系统
2022-04-23 03:11:00 【爱投斯】
前言
IOTOS对接海康门禁机,该接口支持门常开、门常闭、门开和门闭四种操作引起的门状态获取。门常开操作,门会一直处于开状态,不会自动关闭,执行门闭操作,门才会关上;门常闭操作,门会一直处于关毕状态,普通卡刷卡门不会被打开,执行门开操作,门会打开;门开操作,执行门打开动作,超过门打开时间,门会自动关上;门闭操作,执行关门动作,会立即把门关上。
调用该接口,首先要通过获取门禁点资源列表的接口,获取到门禁点唯一编号,然后根据门禁点唯一编号进行门禁点状态状态查询。需要注意的是门通道必须接上门磁才能正常发送门状态变化通知,如果未接门磁,平台无法通过门状态变更通知来更新门状态。
支持设备
所有可以上到海康综合安防管理平台(iSecure Center)平台门禁机
SDK文档
目的
获取门禁机的开关状态
适用范围
所有海康门禁机的硬件设备、IOTOS采集程序
使用示例
代码示例
#!coding:utf8
import json
import sys
import threading
sys.path.append("..")
from driver import *
import logging
import urllib3
import hmac # hex-based message authentication code 哈希消息认证码
import hashlib # 提供了很多加密的算法
import base64
import certifi
class URL_list():
def __init__(self):
# 获取token,过期时长12小时 √
self.token = "/artemis/api/v1/oauth/token"
# 查询门禁点状态
self.door_stat = "/artemis/api/acs/v1/door/states"
# 查询门禁点列表(门禁点)
self.door_search = "/artemis/api/resource/v2/door/search"
class info_post():
def __init__(self, _driver_instance):
self.driver_instance = _driver_instance
# 综平地址
self.base_url = "https://143.134.201.253:443"
# 综平key
self.appKey = "29740612"
# 综评serect
self.appSecret = "di1lnAhWkkRUuSzGQBQN"
# 请求方法
self.http_method = "POST"
self.encoding = "utf-8"
# 初始化请求地址
self.req_uri = URL_list()
# https初始化
urllib3.disable_warnings()
self.https = urllib3.PoolManager(cert_reqs='CERT_NONE', ca_certs=certifi.where())
self.token_header_key = "X-Subject-Token"
def sign(self, key, value):
temp = hmac.new(key.encode(), value.encode(), digestmod=hashlib.sha256)
return base64.b64encode(temp.digest()).decode()
def request(self, url, headers=None, data=None, method="POST"):
try:
if data:
if isinstance(data, dict):
data = json.dumps(data)
response = self.https.urlopen(method, url, body=data, headers=headers)
response_body = response.data.decode(self.encoding)
try:
jsonObj = json.loads(response_body)
return response.status, jsonObj
except json.JSONDecodeError:
return response.status + 100, response_body
except urllib3.exceptions.HTTPError as e:
return 500, None
except Exception as e:
return 500, None
def access_token(self):
# sign_str 的拼接很关键,不然得不到正确的签名
sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + self.appKey + "\n" + \
self.req_uri.token
signature = self.sign(self.appSecret, sign_str)
headers = {
'Accept': '*/*',
'Content-Type': 'application/json',
'x-ca-key': self.appKey, # appKey,即 AK
'x-ca-signature-headers': 'x-ca-key',
'x-ca-signature': signature, # 需要计算得到的签名,此处通过后台得到
}
url = self.base_url + self.req_uri.token
stauts,results = self.request(method="POST", url=url, headers=headers)
if stauts == 200:
self.acc_token = results.get(u"data", {
}).get(u"access_token", None)
threading.Timer(10, self.access_token).start()
def door_state(self):
url = self.base_url + self.req_uri.door_stat
headers = {
'access_token': self.acc_token,
'Content-Type': 'application/json',
}
body = {
"doorIndexCodes": []
}
door_sta = self.request(url, headers=headers, data=body)
return door_sta
def door_search(self):
value_type = {
0:"初始状态",1:"开门状态",2:"关门状态",3:"离线状态"}
code,door_status = self.door_state()
url = self.base_url + self.req_uri.door_search
headers = {
'access_token': self.acc_token,
'Content-Type': 'application/json',
}
body = {
"pageNo": 1,
"pageSize": 200
}
status,door_search_info = self.request(url, headers=headers, data=body)
door = door_search_info.get("data", {
}).get("list", [])
door_statu = door_status.get("data", {
}).get("authDoorList", [])
if status == 200:
for i in door:
for j in door_statu:
door_in = json.dumps(i)
door = json.loads(door_in)
status_in = json.dumps(j)
status = json.loads(status_in)
indexcode = door.get(u"indexCode", "")
name = door.get(u"name", "")
st = status.get(u"doorState", "")
door_index_code = status.get(u"doorIndexCode", "")
if door_index_code == indexcode:
self.driver_instance.setValue(name,value_type[st])
threading.Timer(1, self.door_search).start()
class door(IOTOSDriverI):
#1、通信初始化
def InitComm(self,attrs):
self.online(True)
self.collectingOneCircle = False # 让下面采集Collecting只执行一个循环,遍历一次点表!
self.pauseCollect = False
self.apartment = info_post(self)
threading.Timer(1, self.apartment.access_token).start()
threading.Timer(2, self.apartment.door_search).start()
代码说明
初始化采集
内置函数类Init初始化,使得循环采集可以开始。
def InitComm(self, attrs):
self.online(True) #设备上线
self.setCollectingOneCircle(False)#一次循环采集false
self.setPauseCollect(False)#暂停采集false
循环进程请求门禁数据点
进程请求门禁数据点
代码(设备)
def door_search(self):
value_type = {
0:"初始状态",1:"开门状态",2:"关门状态",3:"离线状态"}
code,door_status = self.door_state()
url = self.base_url + self.req_uri.door_search
headers = {
'access_token': self.acc_token,
'Content-Type': 'application/json',
}
body = {
"pageNo": 1,
"pageSize": 200
}
status,door_search_info = self.request(url, headers=headers, data=body)
door = door_search_info.get("data", {
}).get("list", [])
door_statu = door_status.get("data", {
}).get("authDoorList", [])
if status == 200:
for i in door:
for j in door_statu:
door_in = json.dumps(i)
door = json.loads(door_in)
status_in = json.dumps(j)
status = json.loads(status_in)
indexcode = door.get(u"indexCode", "")
name = door.get(u"name", "")
st = status.get(u"doorState", "")
door_index_code = status.get(u"doorIndexCode", "")
if door_index_code == indexcode:
self.driver_instance.setValue(name,value_type[st])
threading.Timer(1, self.door_search).start()
数据点

运行结果

版权声明
本文为[爱投斯]所创,转载请带上原文链接,感谢
https://iotos.blog.csdn.net/article/details/124338920
边栏推荐
猜你喜欢

【新版发布】ComponentOne 新增 .NET 6 和 Blazor 平台控件支持

C read / write binary file

Realize QQ login with PHP

Source Generator实战

Ide-idea-problem

Tips in MATLAB

Passing object type parameters through openfeign

2022年度Top9的任务管理系统

After the mobile phone is connected to the computer, how can QT's QDIR read the mobile phone file path

2022 Shandong Province safety officer C certificate work certificate question bank and online simulation examination
随机推荐
C WPF UI framework mahapps switching theme
MYSQL_ From mastery to abandonment
Xamarin effect Chapter 22 recording effect
ASP. Net 6 middleware series - Custom middleware classes
How does Microsoft solve the problem of multiple PC programs
2022a special equipment related management (elevator) work license question bank and simulation examination
Use of metagroup object tuple in C
为什么BI对企业这么重要?
2022年做跨境电商五大技巧小分享
由于3²+4²=5²,所以称‘3,4,5‘为勾股数,求n(包括n)以内所有勾股数数组。
Array and collection types passed by openfeign parameters
编码电机PID调试(速度环|位置环|跟随)
利用栈的回溯来解决“文件的最长绝对路径”问题
AOT和单文件发布对程序性能的影响
2022年P气瓶充装培训试题及模拟考试
TP5 multi conditional where query (using PHP variables)
Aspnetcore configuration multi environment log4net configuration file
准备一个月去参加ACM,是一种什么体验?
ASP.NET 6 中间件系列 - 执行顺序
Miniapi of. Net7 (special section): NET7 Preview3