Change DB counting mechanism

The mysql performance_schema method for counting per-database queries
is very heavyweight in that it requires full logging (in a table) of
every query. We do hundreds of thousands in the course of a tempest
run, which ends up creating its own performance problem.

This changes the approach we take, which is to bundle a very tiny
sqlalchemy plugin module which counts just what we care about in
a special database.

It is more complex than just enabling the features in mysql, but it
is a massively smaller runtime overhead. It also provides us the
opportunity to easily zero the counters just before a tempest run.

Change-Id: I361bc30bb970cdaf18b966951f217862d302f0b9
diff --git a/lib/databases/mysql b/lib/databases/mysql
index 6b3ea02..b292da2 100644
--- a/lib/databases/mysql
+++ b/lib/databases/mysql
@@ -151,12 +151,16 @@
     fi
 
     if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" ]]; then
-        echo "enabling MySQL performance_schema items"
-        # Enable long query history
-        iniset -sudo $my_conf mysqld \
-               performance-schema-consumer-events-statements-history-long TRUE
-        iniset -sudo $my_conf mysqld \
-               performance_schema_events_stages_history_long_size 1000000
+        echo "enabling MySQL performance counting"
+
+        # Install our sqlalchemy plugin
+        pip_install ${TOP_DIR}/tools/dbcounter
+
+        # Create our stats database for accounting
+        recreate_database stats
+        mysql -u $DATABASE_USER -p$DATABASE_PASSWORD -h $MYSQL_HOST -e \
+              "CREATE TABLE queries (db VARCHAR(32), op VARCHAR(32),
+                count INT, PRIMARY KEY (db, op)) ENGINE MEMORY" stats
     fi
 
     restart_service $MYSQL_SERVICE_NAME
@@ -218,7 +222,17 @@
 
 function database_connection_url_mysql {
     local db=$1
-    echo "$BASE_SQL_CONN/$db?charset=utf8"
+    local plugin
+
+    # NOTE(danms): We don't enable perf on subnodes yet because the
+    # plugin is not installed there
+    if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" ]]; then
+        if is_service_enabled mysql; then
+            plugin="&plugin=dbcounter"
+        fi
+    fi
+
+    echo "$BASE_SQL_CONN/$db?charset=utf8$plugin"
 }
 
 
diff --git a/stack.sh b/stack.sh
index 6e9ced9..16dce81 100755
--- a/stack.sh
+++ b/stack.sh
@@ -1512,6 +1512,19 @@
 time_totals
 async_print_timing
 
+if is_service_enabled mysql; then
+    if [[ "$MYSQL_GATHER_PERFORMANCE" == "True" && "$MYSQL_HOST" ]]; then
+        echo ""
+        echo ""
+        echo "Post-stack database query stats:"
+        mysql -u $DATABASE_USER -p$DATABASE_PASSWORD -h $MYSQL_HOST stats -e \
+              'SELECT * FROM queries' -t 2>/dev/null
+        mysql -u $DATABASE_USER -p$DATABASE_PASSWORD -h $MYSQL_HOST stats -e \
+              'DELETE FROM queries' 2>/dev/null
+    fi
+fi
+
+
 # Using the cloud
 # ===============
 
diff --git a/tools/dbcounter/dbcounter.py b/tools/dbcounter/dbcounter.py
new file mode 100644
index 0000000..5057f0f
--- /dev/null
+++ b/tools/dbcounter/dbcounter.py
@@ -0,0 +1,120 @@
+import json
+import logging
+import os
+import threading
+import time
+import queue
+
+import sqlalchemy
+from sqlalchemy.engine import CreateEnginePlugin
+from sqlalchemy import event
+
+# https://docs.sqlalchemy.org/en/14/core/connections.html?
+# highlight=createengineplugin#sqlalchemy.engine.CreateEnginePlugin
+
+LOG = logging.getLogger(__name__)
+
+# The theory of operation here is that we register this plugin with
+# sqlalchemy via an entry_point. It gets loaded by virtue of plugin=
+# being in the database connection URL, which gives us an opportunity
+# to hook the engines that get created.
+#
+# We opportunistically spawn a thread, which we feed "hits" to over a
+# queue, and which occasionally writes those hits to a special
+# database called 'stats'. We access that database with the same user,
+# pass, and host as the main connection URL for simplicity.
+
+
+class LogCursorEventsPlugin(CreateEnginePlugin):
+    def __init__(self, url, kwargs):
+        self.db_name = url.database
+        LOG.info('Registered counter for database %s' % self.db_name)
+        new_url = sqlalchemy.engine.URL.create(url.drivername,
+                                               url.username,
+                                               url.password,
+                                               url.host,
+                                               url.port,
+                                               'stats')
+
+        self.engine = sqlalchemy.create_engine(new_url)
+        self.queue = queue.Queue()
+        self.thread = None
+
+    def engine_created(self, engine):
+        """Hook the engine creation process.
+
+        This is the plug point for the sqlalchemy plugin. Using
+        plugin=$this in the URL causes this method to be called when
+        the engine is created, giving us a chance to hook it below.
+        """
+        event.listen(engine, "before_cursor_execute", self._log_event)
+
+    def ensure_writer_thread(self):
+        self.thread = threading.Thread(target=self.stat_writer, daemon=True)
+        self.thread.start()
+
+    def _log_event(self, conn, cursor, statement, parameters, context,
+                   executemany):
+        """Queue a "hit" for this operation to be recorded.
+
+        Attepts to determine the operation by the first word of the
+        statement, or 'OTHER' if it cannot be determined.
+        """
+
+        # Start our thread if not running. If we were forked after the
+        # engine was created and this plugin was associated, our
+        # writer thread is gone, so respawn.
+        if not self.thread or not self.thread.is_alive():
+            self.ensure_writer_thread()
+
+        try:
+            op = statement.strip().split(' ', 1)[0] or 'OTHER'
+        except Exception:
+            op = 'OTHER'
+
+        self.queue.put((self.db_name, op))
+
+    def do_incr(self, db, op, count):
+        """Increment the counter for (db,op) by count."""
+
+        query = ('INSERT INTO queries (db, op, count) '
+                 '  VALUES (%s, %s, %s) '
+                 '  ON DUPLICATE KEY UPDATE count=count+%s')
+        try:
+            with self.engine.begin() as conn:
+                r = conn.execute(query, (db, op, count, count))
+        except Exception as e:
+            LOG.error('Failed to account for access to database %r: %s',
+                      db, e)
+
+    def stat_writer(self):
+        """Consume messages from the queue and write them in batches.
+
+        This reads "hists" from from a queue fed by _log_event() and
+        writes (db,op)+=count stats to the database after ten seconds
+        of no activity to avoid triggering a write for every SELECT
+        call. Write no less often than every thirty seconds and/or 100
+        pending hits to avoid being starved by constant activity.
+        """
+        LOG.debug('[%i] Writer thread running' % os.getpid())
+        while True:
+            to_write = {}
+            total = 0
+            last = time.time()
+            while time.time() - last < 30 and total < 100:
+                try:
+                    item = self.queue.get(timeout=10)
+                    to_write.setdefault(item, 0)
+                    to_write[item] += 1
+                    total += 1
+                except queue.Empty:
+                    break
+
+            if to_write:
+                LOG.debug('[%i] Writing DB stats %s' % (
+                    os.getpid(),
+                    ','.join(['%s:%s=%i' % (db, op, count)
+                              for (db, op), count in to_write.items()])))
+
+            for (db, op), count in to_write.items():
+                self.do_incr(db, op, count)
diff --git a/tools/dbcounter/pyproject.toml b/tools/dbcounter/pyproject.toml
new file mode 100644
index 0000000..d74d688
--- /dev/null
+++ b/tools/dbcounter/pyproject.toml
@@ -0,0 +1,3 @@
+[build-system]
+requires = ["sqlalchemy", "setuptools>=42"]
+build-backend = "setuptools.build_meta"
\ No newline at end of file
diff --git a/tools/dbcounter/setup.cfg b/tools/dbcounter/setup.cfg
new file mode 100644
index 0000000..f9f26f2
--- /dev/null
+++ b/tools/dbcounter/setup.cfg
@@ -0,0 +1,14 @@
+[metadata]
+name = dbcounter
+author = Dan Smith
+author_email = dms@danplanet.com
+version = 0.1
+description = A teeny tiny dbcounter plugin for use with devstack
+url = http://github.com/openstack/devstack
+license = Apache
+
+[options]
+modules = dbcounter
+entry_points =
+    [sqlalchemy.plugins]
+    dbcounter = dbcounter:LogCursorEventsPlugin
diff --git a/tools/get-stats.py b/tools/get-stats.py
index 670e723..465afca 100755
--- a/tools/get-stats.py
+++ b/tools/get-stats.py
@@ -83,13 +83,11 @@
 def get_db_stats(host, user, passwd):
     dbs = []
     db = pymysql.connect(host=host, user=user, password=passwd,
-                         database='performance_schema',
+                         database='stats',
                          cursorclass=pymysql.cursors.DictCursor)
     with db:
         with db.cursor() as cur:
-            cur.execute(
-                'SELECT COUNT(*) AS queries,current_schema AS db FROM '
-                'events_statements_history_long GROUP BY current_schema')
+            cur.execute('SELECT db,op,count FROM queries')
             for row in cur:
                 dbs.append({k: tryint(v) for k, v in row.items()})
     return dbs