--- a/imms.py Sun Feb 08 17:27:21 2004 -0500
+++ b/imms.py Mon Feb 09 23:29:08 2004 -0500
@@ -1,18 +1,32 @@
-import os
+import os.path
+from sys import stderr
import sqlite
+from utils import sql_quote, unique
+
+_log = stderr
-def quote_sql(str):
- return str.replace("'", "''")
-
+def rating_to_color(rating):
+ i = rating - 75
+ red = green = blue = 0
+ if i <= 25:
+ red = 255
+ green = i * 255 / 25
+ elif i <= 50:
+ red = (50-i) * 255 / 25
+ green = 255
+ else:
+ green = 255
+ blue = (i-50) * 255 / 25
+ return "#%02x%02x%02x" % (red, green, blue)
+
class IMMSDb:
def __init__(self, dbname = None):
if not dbname:
dbname = os.environ['HOME'] + '/.imms/imms.db'
# autocommit = 1 disable autocommit!
- self.cx = sqlite.connect(dbname, autocommit = 1,
- timeout = 2, encoding = ('utf-8', 'replace'))
+ self.cx = sqlite.connect(dbname, autocommit = 1, timeout = 5)
self.cu = self.cx.cursor()
- def get_library_entry(self):
+ def get_library_entry(self, **kw):
qry = "SELECT path, uid, sid FROM Library";
first = 1
for key in kw.keys():
@@ -24,18 +38,19 @@
if key in ['uid', 'sid']:
qry += "%s = %d" % (key, kw[key])
else:
- qry += "%s = '%s'" % (key, quote_sql(kw[key]))
+ qry += "%s = '%s'" % (key, sql_quote(kw[key]))
qry += ";"
self.cu.execute(qry)
return self.cu.fetchall()
+ return res
def update_filename(self, oldname, newname):
self.cu.execute("""UPDATE Library
SET path = '%s'
- WHERE path = '%s';""" % (quote_sql(newname),
- quote_sql(oldname)))
+ WHERE path = '%s';""" % (sql_quote(newname),
+ sql_quote(oldname)))
def erase_filename(self, name):
self.cu.execute("""DELETE FROM Library
- WHERE path = '%s';""" % quote_sql(name))
+ WHERE path = '%s';""" % sql_quote(name))
def erase_uid(self, uid):
self.cu.execute("""BEGIN TRANSACTION;
DELETE FROM Library WHERE uid = %d;
@@ -47,10 +62,10 @@
DELETE FROM Library WHERE sid = %d;
DELETE FROM Info WHERE sid = %d;
DELETE FROM Last WHERE sid = %d;
- COMMIT;""")
+ COMMIT;""" % (sid, sid, sid))
def erase_path(self, path):
self.cu.execute("DELETE FROM Library WHERE path = '%s';" \
- % quote_sql(path))
+ % sql_quote(path))
def get_paths(self, uids = None, sids = None):
qry = "SELECT uid, sid, path FROM Library"
first = 1
@@ -79,48 +94,51 @@
ORDER BY Rating.rating;''' % (min, max))
return self.cu.fetchall()
def get_acoustics(self, uids = None):
- qry = "SELECT uid, bpm. spectrum FROM Acoustic"
+ qry = "SELECT uid, bpm, spectrum FROM Acoustic"
first = 1
- for uid in uids:
- if first:
- qry += ' WHERE'
- first = 0
- else:
- qry += ' OR'
- qry += " uid = %d" % uid
+ if uids:
+ for uid in uids:
+ if first:
+ qry += ' WHERE'
+ first = 0
+ else:
+ qry += ' OR'
+ qry += " uid = %d" % uid
qry += ';'
self.cu.execute(qry)
return self.cu.fetchall()
def get_infos(self, sids = None):
- qry = "SELECT sid, artist, title FROM Infos"
+ qry = "SELECT sid, artist, title FROM Info"
first = 1
- for sid in sids:
- if first:
- qry += ' WHERE'
- first = 0
- else:
- qry += ' OR'
- qry += " sid = %d" % id
+ if sids:
+ for sid in sids:
+ if first:
+ qry += ' WHERE'
+ first = 0
+ else:
+ qry += ' OR'
+ qry += " sid = %d" % id
qry += ';'
self.cu.execute(qry)
return self.cu.fetchall()
def get_last(self, sids = None):
qry = "SELECT sid, last FROM Last"
first = 1
- for sid in sids:
- if first:
- qry += ' WHERE'
- first = 0
- else:
- qry += ' OR'
- qry += " sid = %d" % id
+ if sids:
+ for sid in sids:
+ if first:
+ qry += ' WHERE'
+ first = 0
+ else:
+ qry += ' OR'
+ qry += " sid = %d" % id
qry += ';'
self.cu.execute(qry)
return self.cu.fetchall()
def get_uid_by_path(self, path):
- entries = self.get_library_entries(path = path)
+ entries = self.get_library_entry(path = path)
return map(lambda x: x[1], entries)
- def get_ratings_and_info(self, uids = None):
+ def get_ratings_and_paths(self, uids = None):
qry = '''SELECT l.uid, r.rating, l.path, ls.last
FROM Library l, Rating r, Last ls
WHERE l.uid = r.uid AND l.sid = ls.sid'''
@@ -136,17 +154,99 @@
results = {}
tune = self.cu.fetchone()
while tune:
- try:
- uid = int(tune[0])
- if results.has_key(uid):
- results[uid]['path'].append(
- tune[2].decode('utf-8', 'replace'))
- else:
- results[uid] = {
- 'rating' : int(tune[1]),
- 'path' : [ tune[2].decode('utf-8', 'replace') ],
- 'last' : int(tune[3])}
- except UnicodeDecodeError:
- print tune[2]
- tune = self.cu.fetchone()
+ uid = int(tune[0])
+ if results.has_key(uid):
+ results[uid]['path'].append(tune[2])
+ else:
+ results[uid] = {
+ 'rating' : int(tune[1]),
+ 'path' : [ tune[2] ],
+ 'last' : int(tune[3])}
+ tune = self.cu.fetchone()
return results
+ def get_ratings_and_infos(self):
+ self.cu.execute('''SELECT r.rating, i.artist, i.title
+ FROM Library l, Rating r, Info i
+ WHERE l.uid = r.uid AND l.sid = i.sid;''')
+ return self.cu.fetchall()
+
+class IMMSCleaner:
+ def __init__(self, db):
+ self.db = db
+ def check_uid(self, uid):
+ lib = self.db.get_library_entry(uid = uid)
+ if len(lib) == 0:
+ print >> _log, "Erased uid = ", uid
+ self.db.erase_uid(uid)
+ def check_sid(self, sid):
+ lib = self.db.get_library_entry(sid = sid)
+ if len(lib) == 0:
+ print >> _log, "Erased sid = ", sid
+ self.db.erase_sid(sid)
+ def is_path_in_db(self, path):
+ return len(self.db.get_library_entry(path = path))
+ # Note: I doesn't much how I handle the two following functions...
+ # May be I must just have the second one and handle everything
+ # else in the derived class.
+ def check_and_edit_path(self, path, uid, sid):
+ """Must return the new path, None to remove
+ it. If the new file name is already in the Db,
+ it will be skip. The skip is more efficient if path
+ is return.
+ This is the default handler which always skip the file by
+ returning path directly.
+ """
+ # The right thing (but not safe) would be to erase the
+ # file if it already exist in the db... But I find it
+ # too much unsafe... Erasing a file shouldn't be easy to
+ # do.
+ return path
+ def clean_library(self):
+ lib = self.db.get_library_entry()
+ print >> _log, "Processing %d entries" % len(lib)
+ deleted_uids = []
+ deleted_sids = []
+ for entry in lib:
+ path, uid, sid = entry
+ uid = int(uid)
+ sid = int(sid)
+ newfile = self.check_and_edit_path(path, uid, sid)
+ if not newfile:
+ print >> _log, "Erasing ", path
+ self.db.erase_filename(path)
+ deleted_uids.append(uid)
+ deleted_sids.append(sid)
+ elif (path == newfile):
+ pass
+ elif self.is_path_in_db(newfile):
+ print >> _log, "Skipping ", path
+ pass
+ else:
+ print >> _log, "Renaming ", path, " into ", newfile
+ self.db.update_filename(path, newfile)
+ map(self.check_uid, unique(deleted_uids))
+ map(self.check_sid, unique(deleted_sids))
+ def clean_rating(self):
+ print >> _log, "Clean Rating"
+ rates = self.db.get_ratings()
+ rates = unique(map(lambda x: x[0], rates))
+ map(self.check_uid, rates)
+ def clean_acoustic(self):
+ print >> _log, "Clean Acoustic"
+ uids = self.db.get_acoustics()
+ uids = map(lambda x: x[0], uids )
+ map(self.check_uid, uids)
+ def clean_info(self):
+ print >> _log, "Clean Info"
+ sids = map(lambda x: x[0], self.db.get_infos())
+ map(self.check_sid, sids)
+ def clean_last(self):
+ print >> _log, "Clean Last"
+ sids = map(lambda x: x[0], self.db.get_last())
+ map(self.check_sid, sids)
+ def clean_all(self):
+ self.clean_library()
+ self.clean_rating()
+ self.clean_acoustic()
+ self.clean_info()
+ self.clean_last()