The Python listening script is now in production. With a few luser-type problems along the way. Specifically: I have discovered, quite recently, that having the same name for both the NOTIFY command and the LISTEN command is kind of a good strategy when you actually want your listener to be notified. The problem? The code was listening for notify_port_updated. The code was generating a port_updated. Remember how I was talking about Data driven listening? Well, perhaps I should have spent more time reading the data instead of getting the code to be driven by it.
That said, the solution is in and working. The final code is here:
#!/usr/bin/env python # # $Id:,v 1.2 2006/09/11 01:49:16 dan Exp $ # # This program listens for events on the database and processes them # # # configuration items # DSN = ' user=dan' CACHEPATH = '/usr/websites/' import sys, psycopg, select import os # for deleting cache files import syslog # for logging def RemoveCacheEntry(): syslog.syslog(syslog.LOG_NOTICE, 'checking for cache entries to remove...') dbh = psycopg.connect(DSN) dbh.autocommit(0) curs = dbh.cursor() curs.execute("SELECT id, port_id, category, port FROM cache_clearing_ports ORDER BY id") NumRows = curs.rowcount dbh.commit(); if (NumRows > 0): syslog.syslog(syslog.LOG_NOTICE, 'COUNT: %d entries to process' % (NumRows)) for row in curs.dictfetchall(): filename = CACHEPATH % (row['category'], row['port']) syslog.syslog(syslog.LOG_NOTICE, 'removing %s' % (filename)) try: os.remove(filename) except OSError, err: if err[0] == 2: pass # no file to delete, so no worries else: syslog.syslog(syslog.LOG_CRIT, 'ERROR: error deleting cache entry. Error message is %s' % (err)) continue # end if syslog.syslog(syslog.LOG_NOTICE, "DELETE FROM cache_clearing_ports WHERE id = %d" % (row['id'])) curs.execute("DELETE FROM cache_clearing_ports WHERE id = %d" % (row['id'])) dbh.commit() # end for else: syslog.syslog(syslog.LOG_ERR, 'ERROR: No cached entries found for removal') # end if syslog.syslog(syslog.LOG_NOTICE, 'finished') return NumRows def ProcessPortsMoved(): syslog.syslog(syslog.LOG_NOTICE, 'processing ports/MOVED') def ProcessPortsUpdating(): syslog.syslog(syslog.LOG_NOTICE, 'processing ports/UPDATING') def ProcessVUXML(): syslog.syslog(syslog.LOG_NOTICE, 'processing ports/security/portaudit/vuln.xml') syslog.openlog('fp-listen') syslog.syslog(syslog.LOG_NOTICE, 'Starting up') print "Opening connection using dns:", DSN conn = psycopg.connect(DSN) conn.autocommit(1) curs = conn.cursor() curs.execute("SELECT name, script_name FROM listen_for ORDER BY id") listens_for = curs.fetchall() listens = dict() for listen in listens_for: curs.execute("LISTEN %s" % listen[0]) listens[listen[0]] = listen[1] print listen print "Now listening..." while 1:[curs],[],[])==([],[],[]) curs.execute("SELECT 1") syslog.syslog(syslog.LOG_NOTICE, 'Just woke up! *************') notifies = curs.notifies() for n in notifies: print "got %s" % n[0] print "this is index %s" % listens[n[0]] # in real life, do something with each... print "got %s and I need to call %s" % (n[0], listens[n[0]]) syslog.syslog(syslog.LOG_NOTICE, "got %s and I need to call %s" % (n[0], listens[n[0]])) if listens.has_key(n[0]): if listens[n[0]] == 'listen_port': RemoveCacheEntry() elif listens[n[0]] == 'listen_ports_moved': ProcessPortsMoved() elif listens[n[0]] == 'listen_ports_updating': ProcessPortsUpdating() elif listens[n[0]] == 'listen_vuxml': ProcessVUXML() else: syslog.syslog(syslog.LOG_ERR, "Code does not know what to do when '%s' is found." % n[0]) else: syslog.syslog(syslog.LOG_NOTICE, 'no such key!') logging.error('terminating')
All that fluff about ports/MOVED, ports/UPDATING, etc… it will eventually be implemented. I’m planning on changing strategy though. At the moment, such files are processed in between commits; the FP daemon stops processing, and runs through the ports/MOVED file before proceeding. That will change.
And the trigger is:
CREATE OR REPLACE FUNCTION ports_clear_cache() RETURNS TRIGGER AS $$ DECLARE l_cache_clearing_ports_id int8; l_port text; l_category text; BEGIN IF TG_OP = 'UPDATE' THEN SELECT id INTO l_cache_clearing_ports_id FROM cache_clearing_ports WHERE port_id =; IF NOT FOUND THEN SELECT category, name INTO l_category, l_port FROM ports_all WHERE id =; INSERT INTO cache_clearing_ports (port_id, category, port) VALUES (, l_category, l_port); END IF; NOTIFY "port_updated"; END IF; -- when a port changes, add an entry to the cache clearing table RETURN NEW; END $$ LANGUAGE 'plpgsql'; DROP TRIGGER ports_clear_cache ON ports; CREATE TRIGGER ports_clear_cache AFTER UPDATE on ports FOR EACH ROW EXECUTE PROCEDURE ports_clear_cache();