Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 1 | import json |
| 2 | import logging |
| 3 | import os |
| 4 | import threading |
| 5 | import time |
| 6 | import queue |
| 7 | |
| 8 | import sqlalchemy |
| 9 | from sqlalchemy.engine import CreateEnginePlugin |
| 10 | from sqlalchemy import event |
| 11 | |
| 12 | # https://docs.sqlalchemy.org/en/14/core/connections.html? |
| 13 | # highlight=createengineplugin#sqlalchemy.engine.CreateEnginePlugin |
| 14 | |
| 15 | LOG = logging.getLogger(__name__) |
| 16 | |
| 17 | # The theory of operation here is that we register this plugin with |
| 18 | # sqlalchemy via an entry_point. It gets loaded by virtue of plugin= |
| 19 | # being in the database connection URL, which gives us an opportunity |
| 20 | # to hook the engines that get created. |
| 21 | # |
| 22 | # We opportunistically spawn a thread, which we feed "hits" to over a |
| 23 | # queue, and which occasionally writes those hits to a special |
| 24 | # database called 'stats'. We access that database with the same user, |
| 25 | # pass, and host as the main connection URL for simplicity. |
| 26 | |
| 27 | |
| 28 | class LogCursorEventsPlugin(CreateEnginePlugin): |
| 29 | def __init__(self, url, kwargs): |
| 30 | self.db_name = url.database |
| 31 | LOG.info('Registered counter for database %s' % self.db_name) |
| 32 | new_url = sqlalchemy.engine.URL.create(url.drivername, |
| 33 | url.username, |
| 34 | url.password, |
| 35 | url.host, |
| 36 | url.port, |
| 37 | 'stats') |
| 38 | |
| 39 | self.engine = sqlalchemy.create_engine(new_url) |
| 40 | self.queue = queue.Queue() |
| 41 | self.thread = None |
| 42 | |
Michael Johnson | f834f9a | 2023-03-06 18:47:03 +0000 | [diff] [blame] | 43 | def update_url(self, url): |
| 44 | return url.difference_update_query(["dbcounter"]) |
| 45 | |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 46 | def engine_created(self, engine): |
| 47 | """Hook the engine creation process. |
| 48 | |
| 49 | This is the plug point for the sqlalchemy plugin. Using |
| 50 | plugin=$this in the URL causes this method to be called when |
| 51 | the engine is created, giving us a chance to hook it below. |
| 52 | """ |
| 53 | event.listen(engine, "before_cursor_execute", self._log_event) |
| 54 | |
| 55 | def ensure_writer_thread(self): |
| 56 | self.thread = threading.Thread(target=self.stat_writer, daemon=True) |
| 57 | self.thread.start() |
| 58 | |
| 59 | def _log_event(self, conn, cursor, statement, parameters, context, |
| 60 | executemany): |
| 61 | """Queue a "hit" for this operation to be recorded. |
| 62 | |
| 63 | Attepts to determine the operation by the first word of the |
| 64 | statement, or 'OTHER' if it cannot be determined. |
| 65 | """ |
| 66 | |
| 67 | # Start our thread if not running. If we were forked after the |
| 68 | # engine was created and this plugin was associated, our |
| 69 | # writer thread is gone, so respawn. |
| 70 | if not self.thread or not self.thread.is_alive(): |
| 71 | self.ensure_writer_thread() |
| 72 | |
| 73 | try: |
| 74 | op = statement.strip().split(' ', 1)[0] or 'OTHER' |
| 75 | except Exception: |
| 76 | op = 'OTHER' |
| 77 | |
| 78 | self.queue.put((self.db_name, op)) |
| 79 | |
| 80 | def do_incr(self, db, op, count): |
| 81 | """Increment the counter for (db,op) by count.""" |
| 82 | |
Michael Johnson | f834f9a | 2023-03-06 18:47:03 +0000 | [diff] [blame] | 83 | query = sqlalchemy.text('INSERT INTO queries (db, op, count) ' |
| 84 | ' VALUES (:db, :op, :count) ' |
| 85 | ' ON DUPLICATE KEY UPDATE count=count+:count') |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 86 | try: |
| 87 | with self.engine.begin() as conn: |
Michael Johnson | f834f9a | 2023-03-06 18:47:03 +0000 | [diff] [blame] | 88 | r = conn.execute(query, {'db': db, 'op': op, 'count': count}) |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 89 | except Exception as e: |
| 90 | LOG.error('Failed to account for access to database %r: %s', |
| 91 | db, e) |
| 92 | |
| 93 | def stat_writer(self): |
| 94 | """Consume messages from the queue and write them in batches. |
| 95 | |
| 96 | This reads "hists" from from a queue fed by _log_event() and |
| 97 | writes (db,op)+=count stats to the database after ten seconds |
| 98 | of no activity to avoid triggering a write for every SELECT |
Dan Smith | d115bfd | 2023-07-31 07:04:34 -0700 | [diff] [blame] | 99 | call. Write no less often than every sixty seconds to avoid being |
| 100 | starved by constant activity. |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 101 | """ |
| 102 | LOG.debug('[%i] Writer thread running' % os.getpid()) |
| 103 | while True: |
| 104 | to_write = {} |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 105 | last = time.time() |
Dan Smith | d115bfd | 2023-07-31 07:04:34 -0700 | [diff] [blame] | 106 | while time.time() - last < 60: |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 107 | try: |
| 108 | item = self.queue.get(timeout=10) |
| 109 | to_write.setdefault(item, 0) |
| 110 | to_write[item] += 1 |
Dan Smith | fe52d7f | 2022-04-28 12:34:38 -0700 | [diff] [blame] | 111 | except queue.Empty: |
| 112 | break |
| 113 | |
| 114 | if to_write: |
| 115 | LOG.debug('[%i] Writing DB stats %s' % ( |
| 116 | os.getpid(), |
| 117 | ','.join(['%s:%s=%i' % (db, op, count) |
| 118 | for (db, op), count in to_write.items()]))) |
| 119 | |
| 120 | for (db, op), count in to_write.items(): |
| 121 | self.do_incr(db, op, count) |