The idea is as old as Replication is:

How do you know which is the most current slave.

You can use the Master_Binlog_Pos to guess what is most up to date, but which transaction does this binlog position match ?

With the proxy you can add a global transaction ID to your setup, if you let the inject some information into the stream.

The idea is simple and is documented in various places.

Create a MEMORY table which is replicated with a single UNSIGNED BIGINT and increment it at the end of each transaction.

CREATE TABLE trx (
  trx_id BIGINT UNSIGNED NOT NULL
) ENGINE=memory;

INSERT INTO trx VALUES ( 0 );

When ever you commit a transaction UPDATE the trx\_id field:

UPDATE trx SET trx_id = trx_id + 1

Usecases:

  • identify which slave is most current and switch to it in case of master failure
  • identify if slaves are current enough to SELECT from them
  • identify most current slave to send the load to them in R/W splitting

The wonders of Query Injection

As it might not be simple to find all the places in your app where you close a transaction you can use the proxy to analyze the traffic and inject a UPDATE at the right places.

The MySQL protocol is nice to us and announces if we are in a transaction or not by setting a bit in the result-set of each query. As soon as the flag is 0 we know that a transaction is finished.

As configuration we have to know the name of the database, the table and the field to increment:

 trx_db        = "repl"
 trx_table     = "trx"
 trx_table_fld = "trx_id"

Next we have to send all queries through the lua-script to tracks its result-set:

 function read_query(packet)
   -- send all queries to read_query_result()
   proxy.queries:append(1, packet)

   return proxy.PROXY_SEND_QUERY
 end

And as last step we have to check the result-set of the queries if a transaction got closed:

 function read_query_result(inj)
   if inj.id ~= 1 then
     -- ignore the result of all injected queries
     return proxy.PROXY_IGNORE_RESULT
   end

   local res      = assert(inj.resultset)
   local flags    = res.flags

   -- let's hope the query had a resultset
   --
   if res.query_status == proxy.MYSQLD_PACKET_ERR then
     return
   end

   if flags.in_trans == 0 then
     -- we left the transaction, increment the trx-id
     if trx_db ~= proxy.connection.default_db then
       -- switch to the transaction db
       proxy.queries:append(2,
         string.char(proxy.COM_INIT_DB) .. trx_db)
     end

     proxy.queries:append(3, string.char(proxy.COM_QUERY) ..
       "UPDATE " .. trx_table ..
       " SET ".. trx_table_fld .. " = ".. trx_table_fld .." + 1" )

     if proxy.connection.default_db and trx_db ~= proxy.connection.default_db then
       -- and back to the DB of the user
       proxy.queries:append(4,
         string.char(proxy.COM_INIT_DB) .. proxy.connection.default_db)
     end
   end
 end

The function does a bit more and switches between the user-db and the db of the trx-id table in case they are different.

root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
|     32 |

root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
|     33 |

We are in auto-commit mode and the trx table is not transactional. Each SELECT is a transaction and increments trx-counter.

As soon as we open a transaction explicitly the trx-counter isn't incremented.

root@127.0.0.1:4040> begin;

root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
|     34 |

root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
|     34 |

root@127.0.0.1:4040> commit;

TRX ID as part of the transaction

The above implementation shows the basic idea and injects the TRX_ID after the end of the transaction. You can also invert the idea and make the TRX_ID part of the COMMIT by intercepting the ID before you send the COMMIT.

In read_query() you check for COMMIT and prepend it with the UPDATE. I leave the implementation to the imagination of the reader.

Misuse of the AFFECTED_ROWS()

I only follow the idea to point out a small trick: We can tweak the protocol a bit to transfer the TRX_ID back without really sending a SELECT to read it from the table.

At the end of each non-SELECT query we have a OK packet which is made up of the following bytes:

Bytes                       Name
-----                       ----
1   (Length Coded Binary)   field_count, always = 0
1-9 (Length Coded Binary)   affected_rows
1-9 (Length Coded Binary)   insert_id
2                           server_status
2                           warning_count
n   (until end of packet)   message

For COMMIT this looks like

| result.packet = 00 00 00 02 00 00 00

We can overwrite the response of the COMMIT packet and use affect_rows or insert_id for our needs and let the client access it.

Some configuration again, same as above:

trx_db        = "repl"
trx_table     = "trx"
trx_table_fld = "trx_id"

affected_rows = 0

The injection is a bit more interesting this time as we have to SELECT the transaction id now. For performance reasons we use the session variable @trx_id.

function read_query(packet)
  if packet:byte() == proxy.COM_QUERY and packet:sub(2):lower() == "commit" then
    -- we left the transaction, increment the trx-id
    if trx_db ~= proxy.connection.default_db then
      -- switch to the transaction db
      proxy.queries:append(2,
        string.char(proxy.COM_INIT_DB) .. trx_db)
    end

    proxy.queries:append(3, string.char(proxy.COM_QUERY) ..
      "UPDATE " .. trx_table ..
      " SET ".. trx_table_fld .. " = @trx_id := ( ".. trx_table_fld .." + 1 )" )

    proxy.queries:append(4, string.char(proxy.COM_QUERY) ..
      "SELECT @trx_id" )

    if proxy.connection.default_db ~= "" and trx_db ~= proxy.connection.default_db then
      -- and back to the DB of the user
      proxy.queries:append(5,
        string.char(proxy.COM_INIT_DB) .. proxy.connection.default_db)
    end
  end

  proxy.queries:append(1, packet)

  return proxy.PROXY_SEND_QUERY
end

Handling the result-set only has to extract the trx-id and store it internally for a while. In case we sent a COMMIT we patch the response a bit and replace the affected rows:

function read_query_result(inj)
  if inj.id == 1 and inj.query:sub(2):lower() == "commit" then
    proxy.response = {
      type = proxy.MYSQLD_PACKET_RAW,
      packets = {
        "\000" .. -- fields
        string.char(affected_rows) ..
        "\000" .. -- insert_id
        inj.resultset.raw:sub(4)
      }
    }
    return proxy.PROXY_SEND_RESULT
  elseif inj.id == 4 then
    -- this should be the SELECT

    for row in inj.resultset.rows do
      affected_rows = row[1] + 0
    end

    return proxy.PROXY_IGNORE_RESULT
  elseif inj.id ~= 1 then
    return proxy.PROXY_IGNORE_RESULT
  end
end

Just to show that it works:

root@127.0.0.1:4040 [(none)]> commit;
Query OK, 40 rows affected (0.01 sec)

In your APIs you can use mysql_affected_rows() or similar calls to get access to this value.


Comments

Enable javascript to load comments.