Using NDB API Events to mask/hide colum data when replicating

If you  have asynchronous replication where the slave database is using MySQL Cluster then you can use the NDB API events functionality to mask/overwrite data. You might do this for example if the replica is to be used for generating reports where some of the data is sensitive and not relevant to those reports. Unlike stored procedures, NDB API events will be triggered on the slave.

The first step is to set up replication (master->slave rather than multi-master) as described in Setting up MySQL Asynchronous Replication for High Availability).

In this example, the following table definition is used:

mysql> use clusterdb;
mysql> create table ASSETS (CODE int not null primary key, VALUE int) engine=ndb;

The following code should be compiled and then executed on a node within the slave Cluster:

#include <NdbApi.hpp>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <string.h>

#define APIERROR(error) 
  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" 
  << error.code << ", msg: " << error.message << "." << std::endl; 
  exit(-1); }

int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnName,
const int noEventColumnName);

static void do_blank(Ndb*, int);

int main(int argc, char** argv)
{
  if (argc < 1)
 {
    std::cout << "Arguments are <connect_string cluster>.n";
    exit(-1);
  }
  const char *connectstring = argv[1];

  ndb_init();

  Ndb_cluster_connection *cluster_connection=
  new Ndb_cluster_connection(connectstring); // Object representing the cluster

  int r= cluster_connection->connect(5 /* retries               */,
  3 /* delay between retries */,
  1 /* verbose               */);
  if (r > 0)
  {
    std::cout << "Cluster connect failed, possibly resolved with more retries.n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout << "Cluster connect failed.n";
    exit(-1);
  }

  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
    exit(-1);
  }

  Ndb* myNdb= new Ndb(cluster_connection,
                      "clusterdb");  // Object representing the database

  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());

  const char *eventName= "CHNG_IN_ASSETS";
  const char *eventTableName= "ASSETS";
  const int noEventColumnName= 2;
  const char *eventColumnName[noEventColumnName]=
  {"CODE",
   "VALUE"};

  // Create events
  myCreateEvent(myNdb,
  eventName,
  eventTableName,
  eventColumnName,
  noEventColumnName);

  // Normal values and blobs are unfortunately handled differently..
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;

  int i;

  // Start "transaction" for handling events
  NdbEventOperation* op;
  printf("create EventOperationn");
  if ((op = myNdb->createEventOperation(eventName)) == NULL)
    APIERROR(myNdb->getNdbError());

  printf("get valuesn");
  RA_BH recAttr[noEventColumnName];
  RA_BH recAttrPre[noEventColumnName];

  for (i = 0; i < noEventColumnName; i++) {
    recAttr[i].ra    = op->getValue(eventColumnName[i]);
    recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
  }

  // set up the callbacks
  // This starts changes to "start flowing"
  if (op->execute())
    APIERROR(op->getNdbError());

  while (true) {
    int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
    if (r > 0) {
      while ((op= myNdb->nextEvent())) {
        NdbRecAttr* ra = recAttr[0].ra;
        if (ra->isNULL() >= 0) { // we have a value
          if (ra->isNULL() == 0) { // we have a non-null value
            printf("CODE: %d ", ra->u_32_value());
            do_blank(myNdb, ra->u_32_value());
          } else 
            printf("%-5s", "NULL");
          } else
            printf("%-5s", "-"); // no value
            ra = recAttr[1].ra;
            printf("n");
          }
        }
      }
    }

int myCreateEvent(Ndb* myNdb,
                  const char *eventName,
                  const char *eventTableName,
                  const char **eventColumnNames,
                  const int noEventColumnNames)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());

  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
  if (!table) APIERROR(myDict->getNdbError());

  NdbDictionary::Event myEvent(eventName, *table);
  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);

  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);

  // Add event to database
  if (myDict->createEvent(myEvent) == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
            NdbError::SchemaObjectExists) {
    printf("Event creation failed, event existsn");
    printf("dropping Event...n");
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
    // try again
    // Add event to database
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
  } else
    APIERROR(myDict->getNdbError());

    return 0;
}

static void do_blank(Ndb* myNdb, int code)
{
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("ASSETS");

  if (myTable == NULL)
  APIERROR(myDict->getNdbError());

  NdbTransaction *myTransaction= myNdb->startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb->getNdbError());

  printf("Replacing VALUE with 0 for CODE: %d ", code);

  NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
  if (myOperation == NULL) APIERROR(myTransaction->getNdbError());

  myOperation->updateTuple();
  myOperation->equal("CODE", code);
  myOperation->setValue("VALUE", 0);

  if (myTransaction->execute( NdbTransaction::Commit ) == -1)
    APIERROR(myTransaction->getNdbError());

  myNdb->closeTransaction(myTransaction);
}

shell> slave_filter 127.0.0.1:1186

From the master Cluster, insert some values (note that the example can easily be extended to cover updates too):

mysql> insert into ASSETS values (101, 50),(102, 40), (103, 99);

and then check that on the slave the value has been set to 0 for each of the entries:

mysql> select * from ASSETS;
+------+-------+
| CODE | VALUE |
+------+-------+
|  100 |     0 |
|  103 |     0 |
|  101 |     0 |
|  102 |     0 |
+------+-------+

How this works…. The table data is replicated as normal and the real values are stored in the slave. The “slave_filter” process has registered against insert operations on this table and when it’s triggered it sets the VALUE field to 0. The event is processes asynchronously from the replication and so there will be some very narrow window during which the true values would be stored in the slave.





Leave a Reply