Okay I got it. It's still rough around the edges and only tested on Ubuntu 16.04 with Python 2.7 but whatever:
#!/usr/bin/python
# This monitors pulseaudio streams via a dbus event-listener.
#
# On Ubuntu to enable pulseaudio dbus support:
#
# echo -e '.ifexists module-dbus-protocol.so\nload-module module-dbus-protocol\n.endif' >> ~/.pulse/pulse/default.pa
# # globally: /etc/pulse/default.pa
#
# And restart pulseaudio:
#
# pkill pulseaudio; pulseaudio
#
from __future__ import division, print_function, unicode_literals
import dbus, os, gobject, logging
from dbus.mainloop.glib import DBusGMainLoop
from logging import info, debug, error, warning as warn
#logging.getLogger().setLevel(logging.DEBUG)
skypestream = []
cstr = 'org.PulseAudio.Core1'
# convert byte array to string
def dbus2str(db):
if type(db)==dbus.Struct:
return str(tuple(dbus2str(i) for i in db))
if type(db)==dbus.Array:
return "".join([dbus2str(i) for i in db])
if type(db)==dbus.Dictionary:
return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
if type(db)==dbus.String:
return db+''
if type(db)==dbus.UInt32:
return str(db+0)
if type(db)==dbus.Byte:
return chr(db)
if type(db)==dbus.Boolean:
return db==True
if type(db)==dict:
return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
return "(%s:%s)" % (type(db), db)
def sig_handler(path=None, sender=None, msg=None):
debug( '\n\npath: %s\n%s\n\nsender: %s\n%s\n\nmsg: %s\n%s\n\n',
path, dir(path), sender, dir(sender), msg, dir(msg) )
mem = msg.get_member()
dbus_pstreams = (
dbus.Interface(
pulse_bus.get_object(object_path=path),
dbus_interface='org.freedesktop.DBus.Properties'
) for path in core1.Get(
cstr,
'PlaybackStreams',
dbus_interface='org.freedesktop.DBus.Properties' )
)
pstreams = {}
for pstream in dbus_pstreams:
try:
pstreams[pstream.Get(cstr+'.Stream', 'Index')] = pstream
except dbus.exceptions.DBusException:
pass
if pstreams:
for stream in pstreams.keys():
plist = pstreams[stream].Get(cstr+'.Stream', 'PropertyList')
appname = dbus2str(plist.get('application.name', None))
if mem == 'PlaybackStreamRemoved' and path in skypestream:
skypestream.remove(path)
print('no-skype')
if appname.find('Skype') > -1:
if mem == 'NewPlaybackStream':
skypestream.append(path)
print('busy', appname)
def pulse_bus_address():
address = None
if 'PULSE_DBUS_SERVER' in os.environ:
address = os.environ['PULSE_DBUS_SERVER']
else:
bus = dbus.SessionBus()
server_lookup = bus.get_object("org.PulseAudio1",
"/org/pulseaudio/server_lookup1")
address = server_lookup.Get("org.PulseAudio.ServerLookup1",
"Address", dbus_interface="org.freedesktop.DBus.Properties")
debug(address)
if not address: raise RuntimeError('No pulseaudio dbus address found!')
return address
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
loop = gobject.MainLoop()
pulse_bus = dbus.connection.Connection(pulse_bus_address())
core1 = pulse_bus.get_object(object_path='/org/pulseaudio/core1')
core1.ListenForSignal(cstr+'.NewPlaybackStream', dbus.Array(signature="o"))
core1.ListenForSignal(cstr+'.PlaybackStreamRemoved', dbus.Array(signature="o"))
pulse_bus.add_signal_receiver(sig_handler, message_keyword='msg')
loop.run()