python dbus pulseaudio

python - ¿Obteniendo señales trabajando en la interfaz DBus de PulseAudio?



(2)

Estoy intentando que se llame a un controlador de señal D-Bus cuando el estado de un receptor cambia en PulseAudio (por ejemplo, se desactiva). Desafortunadamente, no se está llamando y, francamente, no estoy seguro de por qué.

import dbus import dbus.mainloop.glib from gi.repository import GObject dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() def signal_handler(*args, **kwargs): print(''sig: '', args, kwargs) def connect(): import os if ''PULSE_DBUS_SERVER'' in os.environ: address = os.environ[''PULSE_DBUS_SERVER''] else: bus = dbus.SessionBus() server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1") address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties") return dbus.connection.Connection(address) conn = connect() core = conn.get_object(object_path=''/org/pulseaudio/core1'') core.connect_to_signal(''StateUpdated'', signal_handler) core.ListenForSignal(''org.PulseAudio.Core1.Device.StateUpdated'', dbus.Array(signature=''o''), dbus_interface=''org.PulseAudio.Core1'') loop = GObject.MainLoop() loop.run()


En mi entorno, tengo dbus y pulseaudio ejecución, sin embargo, la dirección descubierta no existe:

>>> import dbus >>> import dbus.mainloop.glib >>> from gi.repository import GObject >>> dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) <dbus.mainloop.NativeMainLoop object at 0x7f3c98ffd4e0> >>> bus = dbus.SessionBus() >>> server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1") >>> address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties") >>> address dbus.String(''unix:path=/run/user/1000/pulse/dbus-socket'', variant_level=1) $ dbus-monitor --address ''unix:path=/run/user/1000/pulse/dbus-socket'' Failed to open connection to unix:path=/run/user/1000/pulse/dbus-socket: Failed to connect to socket /run/user/1000/pulse/dbus-socket: No such file or directory $ ls /run/user/1000/pulse/ cli native pid

No sé si mi configuración es predeterminada, pero parece que la integración de dbus simplemente no existe.


Intenta esto, trabaja para mi.

import dbus import os from dbus.mainloop.glib import DBusGMainLoop import gobject def pulse_bus_address(): if ''PULSE_DBUS_SERVER'' in os.environ: address = os.environ[''PULSE_DBUS_SERVER''] else: bus = dbus.SessionBus() server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1") address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties") print(address) return address def sig_handler(state): print("State changed to %s" % state) if state == 0: print("Pulseaudio running.") elif state == 1: print("Pulseaudio idle.") elif state == 2: print("Pulseaudio suspended") # setup the glib mainloop DBusGMainLoop(set_as_default=True) loop = gobject.MainLoop() pulse_bus = dbus.connection.Connection(pulse_bus_address()) pulse_core = pulse_bus.get_object(object_path=''/org/pulseaudio/core1'') pulse_core.ListenForSignal(''org.PulseAudio.Core1.Device.StateUpdated'', dbus.Array(signature=''o''), dbus_interface=''org.PulseAudio.Core1'') pulse_bus.add_signal_receiver(sig_handler, ''StateUpdated'') loop.run()

Requiere el archivo default.pa de pulseaudio para tener lo siguiente:

.ifexists module-dbus-protocol.so load-module module-dbus-protocol .endif

Edición: para aquellos que se preguntan sobre la pregunta de @ conf-f-use sobre el nombre de la aplicación. Resulta que ellos mismos respondieron este problema y publicaron la respuesta aquí: https://askubuntu.com/questions/906160/is-there-a-way-to-detect-whether-a-skype-call-is-in-progress-dbus-pulseaudio
Robando una sección del código de @con-f-use y aplicando mi código anterior, obtenemos un monitor que rastrea el estado y puede decirle el nombre de la aplicación, el artista, el título y el nombre de lo que se está reproduciendo.
Saludos @ con-f-use :)

import dbus import os from dbus.mainloop.glib import DBusGMainLoop import gobject def pulse_bus_address(): if ''PULSE_DBUS_SERVER'' in os.environ: address = os.environ[''PULSE_DBUS_SERVER''] else: bus = dbus.SessionBus() server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1") address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties") print(address) return address # convert byte array to string def dbus2str(db): if type(db)==dbus.Struct: return str(tuple(dbus2str(i) for i in db)) if type(db)==dbus.Array: return "".join([dbus2str(i) for i in db]) if type(db)==dbus.Dictionary: return dict((dbus2str(k), dbus2str(v)) for k, v in db.items()) if type(db)==dbus.String: return db+'''' if type(db)==dbus.UInt32: return str(db+0) if type(db)==dbus.Byte: return chr(db) if type(db)==dbus.Boolean: return db==True if type(db)==dict: return dict((dbus2str(k), dbus2str(v)) for k, v in db.items()) return "(%s:%s)" % (type(db), db) def sig_handler(state): print("State changed to %s" % state) if state == 0: print("Pulseaudio running.") elif state == 1: print("Pulseaudio idle.") elif state == 2: print("Pulseaudio suspended") dbus_pstreams = ( dbus.Interface( pulse_bus.get_object(object_path=path), dbus_interface=''org.freedesktop.DBus.Properties'' ) for path in pulse_core.Get( ''org.PulseAudio.Core1'', ''PlaybackStreams'', dbus_interface=''org.freedesktop.DBus.Properties'' ) ) pstreams = {} for pstream in dbus_pstreams: try: pstreams[pstream.Get(''org.PulseAudio.Core1.Stream'', ''Index'')] = pstream except dbus.exceptions.DBusException: pass if pstreams: for stream in pstreams.keys(): plist = pstreams[stream].Get(''org.PulseAudio.Core1.Stream'', ''PropertyList'') appname = dbus2str(plist.get(''application.name'', None)) artist = dbus2str(plist.get(''media.artist'', None)) title = dbus2str(plist.get(''media.title'', None)) name = dbus2str(plist.get(''media.name'', None)) print appname,artist,title,name # setup the glib mainloop DBusGMainLoop(set_as_default=True) loop = gobject.MainLoop() pulse_bus = dbus.connection.Connection(pulse_bus_address()) pulse_core = pulse_bus.get_object(object_path=''/org/pulseaudio/core1'') #pulse_clients = pulse_bus.get_object(object_path=''/org/pulseaudio/core1/Clients'') #print dir(pulse_clients) pulse_core.ListenForSignal(''org.PulseAudio.Core1.Device.StateUpdated'', dbus.Array(signature=''o''), dbus_interface=''org.PulseAudio.Core1'') pulse_bus.add_signal_receiver(sig_handler, ''StateUpdated'') loop.run()