| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 1 | import json | 
 | 2 | import logging | 
 | 3 | import os | 
 | 4 | import threading | 
 | 5 | import time | 
 | 6 | import queue | 
 | 7 |  | 
 | 8 | import sqlalchemy | 
 | 9 | from sqlalchemy.engine import CreateEnginePlugin | 
 | 10 | from sqlalchemy import event | 
 | 11 |  | 
 | 12 | # https://docs.sqlalchemy.org/en/14/core/connections.html? | 
 | 13 | # highlight=createengineplugin#sqlalchemy.engine.CreateEnginePlugin | 
 | 14 |  | 
 | 15 | LOG = logging.getLogger(__name__) | 
 | 16 |  | 
 | 17 | # The theory of operation here is that we register this plugin with | 
 | 18 | # sqlalchemy via an entry_point. It gets loaded by virtue of plugin= | 
 | 19 | # being in the database connection URL, which gives us an opportunity | 
 | 20 | # to hook the engines that get created. | 
 | 21 | # | 
 | 22 | # We opportunistically spawn a thread, which we feed "hits" to over a | 
 | 23 | # queue, and which occasionally writes those hits to a special | 
 | 24 | # database called 'stats'. We access that database with the same user, | 
 | 25 | # pass, and host as the main connection URL for simplicity. | 
 | 26 |  | 
 | 27 |  | 
 | 28 | class LogCursorEventsPlugin(CreateEnginePlugin): | 
 | 29 |     def __init__(self, url, kwargs): | 
 | 30 |         self.db_name = url.database | 
 | 31 |         LOG.info('Registered counter for database %s' % self.db_name) | 
 | 32 |         new_url = sqlalchemy.engine.URL.create(url.drivername, | 
 | 33 |                                                url.username, | 
 | 34 |                                                url.password, | 
 | 35 |                                                url.host, | 
 | 36 |                                                url.port, | 
 | 37 |                                                'stats') | 
 | 38 |  | 
 | 39 |         self.engine = sqlalchemy.create_engine(new_url) | 
 | 40 |         self.queue = queue.Queue() | 
 | 41 |         self.thread = None | 
 | 42 |  | 
 | 43 |     def engine_created(self, engine): | 
 | 44 |         """Hook the engine creation process. | 
 | 45 |  | 
 | 46 |         This is the plug point for the sqlalchemy plugin. Using | 
 | 47 |         plugin=$this in the URL causes this method to be called when | 
 | 48 |         the engine is created, giving us a chance to hook it below. | 
 | 49 |         """ | 
 | 50 |         event.listen(engine, "before_cursor_execute", self._log_event) | 
 | 51 |  | 
 | 52 |     def ensure_writer_thread(self): | 
 | 53 |         self.thread = threading.Thread(target=self.stat_writer, daemon=True) | 
 | 54 |         self.thread.start() | 
 | 55 |  | 
 | 56 |     def _log_event(self, conn, cursor, statement, parameters, context, | 
 | 57 |                    executemany): | 
 | 58 |         """Queue a "hit" for this operation to be recorded. | 
 | 59 |  | 
 | 60 |         Attepts to determine the operation by the first word of the | 
 | 61 |         statement, or 'OTHER' if it cannot be determined. | 
 | 62 |         """ | 
 | 63 |  | 
 | 64 |         # Start our thread if not running. If we were forked after the | 
 | 65 |         # engine was created and this plugin was associated, our | 
 | 66 |         # writer thread is gone, so respawn. | 
 | 67 |         if not self.thread or not self.thread.is_alive(): | 
 | 68 |             self.ensure_writer_thread() | 
 | 69 |  | 
 | 70 |         try: | 
 | 71 |             op = statement.strip().split(' ', 1)[0] or 'OTHER' | 
 | 72 |         except Exception: | 
 | 73 |             op = 'OTHER' | 
 | 74 |  | 
 | 75 |         self.queue.put((self.db_name, op)) | 
 | 76 |  | 
 | 77 |     def do_incr(self, db, op, count): | 
 | 78 |         """Increment the counter for (db,op) by count.""" | 
 | 79 |  | 
 | 80 |         query = ('INSERT INTO queries (db, op, count) ' | 
 | 81 |                  '  VALUES (%s, %s, %s) ' | 
 | 82 |                  '  ON DUPLICATE KEY UPDATE count=count+%s') | 
 | 83 |         try: | 
 | 84 |             with self.engine.begin() as conn: | 
 | 85 |                 r = conn.execute(query, (db, op, count, count)) | 
 | 86 |         except Exception as e: | 
 | 87 |             LOG.error('Failed to account for access to database %r: %s', | 
 | 88 |                       db, e) | 
 | 89 |  | 
 | 90 |     def stat_writer(self): | 
 | 91 |         """Consume messages from the queue and write them in batches. | 
 | 92 |  | 
 | 93 |         This reads "hists" from from a queue fed by _log_event() and | 
 | 94 |         writes (db,op)+=count stats to the database after ten seconds | 
 | 95 |         of no activity to avoid triggering a write for every SELECT | 
 | 96 |         call. Write no less often than every thirty seconds and/or 100 | 
 | 97 |         pending hits to avoid being starved by constant activity. | 
 | 98 |         """ | 
 | 99 |         LOG.debug('[%i] Writer thread running' % os.getpid()) | 
 | 100 |         while True: | 
 | 101 |             to_write = {} | 
 | 102 |             total = 0 | 
 | 103 |             last = time.time() | 
 | 104 |             while time.time() - last < 30 and total < 100: | 
 | 105 |                 try: | 
 | 106 |                     item = self.queue.get(timeout=10) | 
 | 107 |                     to_write.setdefault(item, 0) | 
 | 108 |                     to_write[item] += 1 | 
 | 109 |                     total += 1 | 
 | 110 |                 except queue.Empty: | 
 | 111 |                     break | 
 | 112 |  | 
 | 113 |             if to_write: | 
 | 114 |                 LOG.debug('[%i] Writing DB stats %s' % ( | 
 | 115 |                     os.getpid(), | 
 | 116 |                     ','.join(['%s:%s=%i' % (db, op, count) | 
 | 117 |                               for (db, op), count in to_write.items()]))) | 
 | 118 |  | 
 | 119 |             for (db, op), count in to_write.items(): | 
 | 120 |                 self.do_incr(db, op, count) |