| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 1 | import json | 
|  | 2 | import logging | 
|  | 3 | import os | 
|  | 4 | import threading | 
|  | 5 | import time | 
|  | 6 | import queue | 
|  | 7 |  | 
|  | 8 | import sqlalchemy | 
|  | 9 | from sqlalchemy.engine import CreateEnginePlugin | 
|  | 10 | from sqlalchemy import event | 
|  | 11 |  | 
|  | 12 | # https://docs.sqlalchemy.org/en/14/core/connections.html? | 
|  | 13 | # highlight=createengineplugin#sqlalchemy.engine.CreateEnginePlugin | 
|  | 14 |  | 
|  | 15 | LOG = logging.getLogger(__name__) | 
|  | 16 |  | 
|  | 17 | # The theory of operation here is that we register this plugin with | 
|  | 18 | # sqlalchemy via an entry_point. It gets loaded by virtue of plugin= | 
|  | 19 | # being in the database connection URL, which gives us an opportunity | 
|  | 20 | # to hook the engines that get created. | 
|  | 21 | # | 
|  | 22 | # We opportunistically spawn a thread, which we feed "hits" to over a | 
|  | 23 | # queue, and which occasionally writes those hits to a special | 
|  | 24 | # database called 'stats'. We access that database with the same user, | 
|  | 25 | # pass, and host as the main connection URL for simplicity. | 
|  | 26 |  | 
|  | 27 |  | 
|  | 28 | class LogCursorEventsPlugin(CreateEnginePlugin): | 
|  | 29 | def __init__(self, url, kwargs): | 
|  | 30 | self.db_name = url.database | 
|  | 31 | LOG.info('Registered counter for database %s' % self.db_name) | 
|  | 32 | new_url = sqlalchemy.engine.URL.create(url.drivername, | 
|  | 33 | url.username, | 
|  | 34 | url.password, | 
|  | 35 | url.host, | 
|  | 36 | url.port, | 
|  | 37 | 'stats') | 
|  | 38 |  | 
|  | 39 | self.engine = sqlalchemy.create_engine(new_url) | 
|  | 40 | self.queue = queue.Queue() | 
|  | 41 | self.thread = None | 
|  | 42 |  | 
| Michael Johnson | f834f9a | 2023-03-06 18:47:03 +0000 | [diff] [blame] | 43 | def update_url(self, url): | 
|  | 44 | return url.difference_update_query(["dbcounter"]) | 
|  | 45 |  | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 46 | def engine_created(self, engine): | 
|  | 47 | """Hook the engine creation process. | 
|  | 48 |  | 
|  | 49 | This is the plug point for the sqlalchemy plugin. Using | 
|  | 50 | plugin=$this in the URL causes this method to be called when | 
|  | 51 | the engine is created, giving us a chance to hook it below. | 
|  | 52 | """ | 
|  | 53 | event.listen(engine, "before_cursor_execute", self._log_event) | 
|  | 54 |  | 
|  | 55 | def ensure_writer_thread(self): | 
|  | 56 | self.thread = threading.Thread(target=self.stat_writer, daemon=True) | 
|  | 57 | self.thread.start() | 
|  | 58 |  | 
|  | 59 | def _log_event(self, conn, cursor, statement, parameters, context, | 
|  | 60 | executemany): | 
|  | 61 | """Queue a "hit" for this operation to be recorded. | 
|  | 62 |  | 
|  | 63 | Attepts to determine the operation by the first word of the | 
|  | 64 | statement, or 'OTHER' if it cannot be determined. | 
|  | 65 | """ | 
|  | 66 |  | 
|  | 67 | # Start our thread if not running. If we were forked after the | 
|  | 68 | # engine was created and this plugin was associated, our | 
|  | 69 | # writer thread is gone, so respawn. | 
|  | 70 | if not self.thread or not self.thread.is_alive(): | 
|  | 71 | self.ensure_writer_thread() | 
|  | 72 |  | 
|  | 73 | try: | 
|  | 74 | op = statement.strip().split(' ', 1)[0] or 'OTHER' | 
|  | 75 | except Exception: | 
|  | 76 | op = 'OTHER' | 
|  | 77 |  | 
|  | 78 | self.queue.put((self.db_name, op)) | 
|  | 79 |  | 
|  | 80 | def do_incr(self, db, op, count): | 
|  | 81 | """Increment the counter for (db,op) by count.""" | 
|  | 82 |  | 
| Michael Johnson | f834f9a | 2023-03-06 18:47:03 +0000 | [diff] [blame] | 83 | query = sqlalchemy.text('INSERT INTO queries (db, op, count) ' | 
|  | 84 | '  VALUES (:db, :op, :count) ' | 
|  | 85 | '  ON DUPLICATE KEY UPDATE count=count+:count') | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 86 | try: | 
|  | 87 | with self.engine.begin() as conn: | 
| Michael Johnson | f834f9a | 2023-03-06 18:47:03 +0000 | [diff] [blame] | 88 | r = conn.execute(query, {'db': db, 'op': op, 'count': count}) | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 89 | except Exception as e: | 
|  | 90 | LOG.error('Failed to account for access to database %r: %s', | 
|  | 91 | db, e) | 
|  | 92 |  | 
|  | 93 | def stat_writer(self): | 
|  | 94 | """Consume messages from the queue and write them in batches. | 
|  | 95 |  | 
|  | 96 | This reads "hists" from from a queue fed by _log_event() and | 
|  | 97 | writes (db,op)+=count stats to the database after ten seconds | 
|  | 98 | of no activity to avoid triggering a write for every SELECT | 
| Dan Smith | d115bfd | 2023-07-31 07:04:34 -0700 | [diff] [blame] | 99 | call. Write no less often than every sixty seconds to avoid being | 
|  | 100 | starved by constant activity. | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 101 | """ | 
|  | 102 | LOG.debug('[%i] Writer thread running' % os.getpid()) | 
|  | 103 | while True: | 
|  | 104 | to_write = {} | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 105 | last = time.time() | 
| Dan Smith | d115bfd | 2023-07-31 07:04:34 -0700 | [diff] [blame] | 106 | while time.time() - last < 60: | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 107 | try: | 
|  | 108 | item = self.queue.get(timeout=10) | 
|  | 109 | to_write.setdefault(item, 0) | 
|  | 110 | to_write[item] += 1 | 
| Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 111 | except queue.Empty: | 
|  | 112 | break | 
|  | 113 |  | 
|  | 114 | if to_write: | 
|  | 115 | LOG.debug('[%i] Writing DB stats %s' % ( | 
|  | 116 | os.getpid(), | 
|  | 117 | ','.join(['%s:%s=%i' % (db, op, count) | 
|  | 118 | for (db, op), count in to_write.items()]))) | 
|  | 119 |  | 
|  | 120 | for (db, op), count in to_write.items(): | 
|  | 121 | self.do_incr(db, op, count) |