]> www.infradead.org Git - users/dwmw2/linux.git/commitdiff
tools/net/ynl: add async notification handling
authorDonald Hunter <donald.hunter@gmail.com>
Wed, 13 Nov 2024 09:08:43 +0000 (09:08 +0000)
committerJakub Kicinski <kuba@kernel.org>
Fri, 15 Nov 2024 02:09:06 +0000 (18:09 -0800)
The notification handling in ynl is currently very simple, using sleep()
to wait a period of time and then handling all the buffered messages in
a single batch.

This patch adds async notification handling so that messages can be
processed as they are received. This makes it possible to use ynl as a
library that supplies notifications in a timely manner.

- Add poll_ntf() to be a generator that yields 1 notification at a
  time and blocks until a notification is available.
- Add a --duration parameter to the CLI, with --sleep as an alias.

./tools/net/ynl/cli.py \
    --spec <SPEC> --subscribe <TOPIC> [ --duration <SECS> ]

The cli will report any notifications for duration seconds and then
exit. If duration is not specified, then it will poll forever, until
interrupted.

Here is an example python snippet that shows how to use ynl as a library
for receiving notifications:

    ynl = YnlFamily(f"{dir}/rt_route.yaml")
    ynl.ntf_subscribe('rtnlgrp-ipv4-route')

    for event in ynl.poll_ntf():
        handle(event)

Signed-off-by: Donald Hunter <donald.hunter@gmail.com>
Link: https://patch.msgid.link/20241113090843.72917-3-donald.hunter@gmail.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
tools/net/ynl/cli.py
tools/net/ynl/lib/ynl.py

index 873463dbdcc01d7d944df52f4c72785c7e25c75b..41d9fa5c818d13209410d454216822c8cabaa236 100755 (executable)
@@ -6,7 +6,6 @@ import json
 import pathlib
 import pprint
 import sys
-import time
 
 sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
 from lib import YnlFamily, Netlink, NlError
@@ -46,7 +45,10 @@ def main():
     group.add_argument('--list-ops', action='store_true')
     group.add_argument('--list-msgs', action='store_true')
 
-    parser.add_argument('--sleep', dest='sleep', type=int)
+    parser.add_argument('--duration', dest='duration', type=int,
+                        help='when subscribed, watch for DURATION seconds')
+    parser.add_argument('--sleep', dest='duration', type=int,
+                        help='alias for duration')
     parser.add_argument('--subscribe', dest='ntf', type=str)
     parser.add_argument('--replace', dest='flags', action='append_const',
                         const=Netlink.NLM_F_REPLACE)
@@ -83,9 +85,6 @@ def main():
     if args.ntf:
         ynl.ntf_subscribe(args.ntf)
 
-    if args.sleep:
-        time.sleep(args.sleep)
-
     if args.list_ops:
         for op_name, op in ynl.ops.items():
             print(op_name, " [", ", ".join(op.modes), "]")
@@ -109,8 +108,11 @@ def main():
         exit(1)
 
     if args.ntf:
-        ynl.check_ntf()
-        output(ynl.async_msg_queue)
+        try:
+            for msg in ynl.poll_ntf(duration=args.duration):
+                output(msg)
+        except KeyboardInterrupt:
+            pass
 
 
 if __name__ == "__main__":
index c22c22bf2cb7d19c737b4f4bfb422755c846893b..01ec01a90e763ce62645eb91d7ee6e53de283d94 100644 (file)
@@ -12,6 +12,9 @@ import sys
 import yaml
 import ipaddress
 import uuid
+import queue
+import selectors
+import time
 
 from .nlspec import SpecFamily
 
@@ -489,7 +492,7 @@ class YnlFamily(SpecFamily):
         self.sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_GET_STRICT_CHK, 1)
 
         self.async_msg_ids = set()
-        self.async_msg_queue = []
+        self.async_msg_queue = queue.Queue()
 
         for msg in self.msgs.values():
             if msg.is_async:
@@ -903,7 +906,7 @@ class YnlFamily(SpecFamily):
 
         msg['name'] = op['name']
         msg['msg'] = attrs
-        self.async_msg_queue.append(msg)
+        self.async_msg_queue.put(msg)
 
     def check_ntf(self):
         while True:
@@ -925,11 +928,30 @@ class YnlFamily(SpecFamily):
 
                 decoded = self.nlproto.decode(self, nl_msg, None)
                 if decoded.cmd() not in self.async_msg_ids:
-                    print("Unexpected msg id done while checking for ntf", decoded)
+                    print("Unexpected msg id while checking for ntf", decoded)
                     continue
 
                 self.handle_ntf(decoded)
 
+    def poll_ntf(self, duration=None):
+        start_time = time.time()
+        selector = selectors.DefaultSelector()
+        selector.register(self.sock, selectors.EVENT_READ)
+
+        while True:
+            try:
+                yield self.async_msg_queue.get_nowait()
+            except queue.Empty:
+                if duration is not None:
+                    timeout = start_time + duration - time.time()
+                    if timeout <= 0:
+                        return
+                else:
+                    timeout = None
+                events = selector.select(timeout)
+                if events:
+                    self.check_ntf()
+
     def operation_do_attributes(self, name):
       """
       For a given operation name, find and return a supported