Python访问OPCUA服务器,订阅一个变量标签方式

2024-03-04 0 645
目录
  • Python访问OPCUA服务器,订阅一个变量标签
  • opcua-python学习踩坑记录
    • OPCUA通信协议介绍
    • OPCUA的两种工作方式
    • OPCUA-python是什么?
    • 客户端常用函数
    • 示例程序:温度实时检测,每隔五秒传到客户端
  • 总结

    Python访问OPCUA服务器,订阅一个变量标签

    from opcua import Client
    from opcua import ua
    import time
    #By Weloveut:Python访问OPCUA服务器,展示get变量的方法和注册逢变触发的回调函数
    #自定义回调函数
    class SubHandler(object):
    def data_change(self, handle, node, val, attr):
    print(\”Python: New data change event\”, handle, node, val, attr)
    #创建UA的客户端,包含IP地址和端口号
    client = Client(\”opc.tcp://172.32.1.82:4840/\”)
    #建立到服务器连接
    client.connect()
    #获取object对象
    objects = client.get_objects_node()
    #获取根对象
    root = client.get_root_node()
    #依据变量地址创建变量对象
    myvar = client.get_node(\’ns=3;s=\”clocl0.5hz\”.\”tags\”[0]\’)
    #获取变量当前值
    valuetmp=myvar.get_value()
    print(valuetmp)
    #注册到服务器的变量订阅,变量逢变触发
    handler = SubHandler()
    sub = client.create_subscription(500, handler)
    sub.subscribe_data_change(myvar)
    time.sleep(100000)
    client.disconnect()

    opcua-python学习踩坑记录

    OPCUA通信协议介绍

    OPC UA(OPC 统一架构)是一种用于工业控制系统的通信协议。它是一种开放的、平台无关的协议,可用于连接各种工业设备和软件应用程序,使它们能够互相通信并共享数据。

    OPC UA 的主要优点在于,它能够提供一种统一的方式来连接不同厂商的设备和应用程序。这意味着,使用 OPC UA 的软件应用程序可以与使用其他协议的设备和应用程序进行通信,而无需考虑底层的通信协议。

    其实这类似于物联网中的mqtt协议,只不过mqtt较为轻量级,opcua则是用于工业相关的控制系统

    OPCUA的两种工作方式

    • 客户端/服务器模式:在这种模式下,OPC UA 客户端向 OPC UA 服务器请求数据,服务器接收请求并向客户端发送数据。
    • 发布/订阅模式:在这种模式下,OPC UA 服务器将数据“发布”到网络上,而客户端可以“订阅”特定的数据。当服务器更新数据时,客户端会收到更新的数据。

    这两种模式可以根据应用场景的需要进行选择。例如,客户端/服务器模式适用于客户端需要向服务器请求单个数据项的情况,而发布/订阅模式则适用于客户端需要定期接收大量数据的情况。

    个人觉得客户端/服务器模式类似于TCP通讯协议,是点对点的,对实时性要求比较高,而发布订阅模式类似于UDP通讯协议,对实时性要求并没有那么高,但接收数据较多

    其中发布订阅模式对小的嵌入式设备(stm32系列)比较友好,可以融合物联网的mqtt协议,真正将opcua纳入网络,而不只是自己的工业网络

    OPCUA-python是什么?

    顾名思义,那就是利用python实现这种通信协议的库呗,这个库目前已经停止更新,开发者们开发了一个新的库,名为opcua-asyncio,可通过如下链接访问:

    https://github.com/FreeOpcUa/opcua-asyncio

    不过做一些比较基本的功能,只用OPCUA-python这个库还是够用的

    服务器常用函数

    endpoint = \”opc.tcp://{}:{}\”.format(url, port)
    myserver.set_endpoint(endpoint)

    设置endpoint(其实endpoint就像一个服务器链接)

    myserver = opcua.Server()

    开启opcua服务器

    objects = myserver.get_objects_node()

    设置服务器节点(使客户端得以访问)

    addspace = myserver.register_namespace(name)

    设置服务器命名空间

    root = myserver.get_root_node()

    获取服务器根节点

    objects.add_method(1, \”Conveyor\”, Start_Conveyor_prog)

    为节点(objects) 添加方法Start_Conveyor_prog

    客户端常用函数

    myclient = Client(endpoint)

    在客户端引用endpoint

    myclient.get_node(\”ns=2;i=3\”)

    在客户端里使用ns=2;i=3这个节点

    Start_Lathe_Prog1 = method[4]

    调用服务器里设置的方法

    Workpiece = objects_node.call_method(Start_Conveyor_prog,[WorkpieceID.get_value()])

    call_method函数,第一个参数是节点名称,第二个参数是值,是这个节点里的值,属于数组,或者传递字典也是可以的

    return_value_kuka_prog1 = objects_node.call_method(Start_Kuka_Prog1)

    返回某个方法

    示例程序:温度实时检测,每隔五秒传到客户端

    服务器程序:

    import opcua
    from time import sleep
    from random import randint
    from datetime import datetime

    # Assign endpoint URL
    url = \”localhost\”
    port = 5001
    endpoint = \”opc.tcp://{}:{}\”.format(url, port)

    # create instance/object of type opcua.Server
    myserver = opcua.Server()

    # Assign endpoint url on the OPC UA server address space
    myserver.set_endpoint(endpoint)

    # Create a name for OPC UA namespace
    name = \”Temperature Sensor\”

    # Register the name on OPC UA namespace
    addspace = myserver.register_namespace(name)

    # Get reference to the Objects node of the OPC UA server
    objects = myserver.get_objects_node()

    # Create objects on the object node
    param = objects.add_object(addspace, \”parameters\”)

    # Create variables
    Sensor_name = param.add_variable(addspace, \”Sensor Name\”, \”Temperature_Sensor_SF12\”)
    Temperature = param.add_variable(addspace, \”Temperature Value\”, \’NA\’)

    # Set variable as writable
    Sensor_name.set_writable()
    Temperature.set_writable()

    #########################################################################################
    # Present the data structure of the OPC UA server
    root = myserver.get_root_node()
    print(\”Root Node Id: %s\” % root)
    print(\”Children of root are: %s\” % root.get_children())
    print()

    my_objects = myserver.get_objects_node()
    print(\”Defined object Node Id: %s\” % my_objects)
    print(\”Children of the defined object are: %s\” % my_objects.get_children())
    print()

    sensor_name_node = my_objects.get_children()[1].get_children()[0]
    print(\”Sensor name node Id: %s\” % sensor_name_node)
    print(\”Sensor name node browse name: %s\” % sensor_name_node.get_browse_name())
    print(\”Sensor name default value: %s\” % sensor_name_node.get_value())
    print()

    temperature_node = my_objects.get_children()[1].get_children()[1]
    print(\”Temperature node Id: %s\” % temperature_node)
    print(\”Temperature node browse name: %s\” % temperature_node.get_browse_name())
    print(\”Temperature default value: %s\” % temperature_node.get_value())
    print()
    ########################################################################################

    # starting! The server will continue running
    myserver.start()
    current_time = str(datetime.now().time())[:-7]
    print(\”{} – Server is initialising…\”.format(current_time))

    while True:
    sleep(5)
    current_time = str(datetime.now().time())[:-7]
    current_temp = randint(10, 25)

    Temperature.set_value(current_temp) # publish temperature value
    print(\”{} – Current temperature: {} Celsius\”.format(current_time, current_temp))

    客户端程序:

    from opcua import Client
    from datetime import datetime

    # Assign endpoint URL
    url = \”localhost\”
    port = 5001
    endpoint = \”opc.tcp://{}:{}\”.format(url, port)

    # Assign endpoint url on the OPC UA client address space
    myclient = Client(endpoint)

    # Connect to server
    myclient.connect()

    # Assign nodes
    Temperature_node = myclient.get_node(\”ns=2;i=3\”)

    # Subhandler Class from OPC UA
    class SubHandler(object):
    \”\”\”
    Subscription Handler. To receive events from server for a subscription
    \”\”\”

    def datachange_notification(self, node, val, data):
    \”\”\”
    called for every datachange notification from server
    \”\”\”
    global DataChange # Global variable
    current_time = str(datetime.now().time())[:-7]
    DataChange = val # Assigning value to global variable
    print(\”{} – Received temperature: {} Celsius\”.format(current_time, val))

    # Initailise variable

    nodes = myclient.get_objects_node()
    DataChange = \”null\”

    # Create Sub handler
    handler = SubHandler()

    # create subscription by passing period in milliseconds and handler
    subscribe = myclient.create_subscription(0, handler)

    # pass variable node to subscribe data change method
    handler = subscribe.subscribe_data_change(Temperature_node)

    程序运行效果:服务器

    Python访问OPCUA服务器,订阅一个变量标签方式

    程序运行效果:客户端

    Python访问OPCUA服务器,订阅一个变量标签方式

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持悠久资源网。

    您可能感兴趣的文章:

    • OPCUA-Python实例
    • 关于python与opcuaExpertendpoint连接的问题
    • 使用OpCode绕过Python沙箱的方法详解

    收藏 (0) 打赏

    感谢您的支持,我会继续努力的!

    打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
    点赞 (0)

    悠久资源 Python Python访问OPCUA服务器,订阅一个变量标签方式 https://www.u-9.cn/jiaoben/python/183196.html

    常见问题

    相关文章

    发表评论
    暂无评论
    官方客服团队

    为您解决烦忧 - 24小时在线 专业服务