Are Stored Procedures available with MySQL Cluster?

The answer is yes – kind of.

Stored procedures are implemented in a MySQL Server and can be used regardless of the storage engine being used for a specific table. One inference from this is that they won’t work when accessing the Cluster database directly through the NDB API.

This leads to the question of whether or not that limitation actually restricts what you can achieve. This article gives a brief introduction to stored procedures and looks at how the same results can be achieved using the NDB API.

Stored procedures provide a rudimentary way of implementing functionality within the database (rather than in the application code). They are implemented by the database designer and have the ability to perform computations as well as make changes to the data in the database. A typical use of stored procedures would be to control all access to the data by a user or application – for example, to impose extra checks on the data or make sure that all linked data is updated rather than leaving it to the user or application designer to always remember to do it. To impose this, the DBA could grant permission to users to call the stored procedures but not write to the tables directly.

This functionality can be very useful when the data is being accessed through the SQL interface. If using the NDB API then you have the full power of the C++ language at your disposal and so a designer can code whatever checks and side effects are needed within a wrapper method and then have applications use those methods rather than accessing the raw NDB API directly for those changes.

There is one piece of functionality available using stored procedures which could be very helpful to applications using the NDB API – triggers. The rest of this article explains what triggers are; how they’re used and how that same results can be achieved using the NDB API.

Triggers

Triggers allow stored code to be invoked as a side effect of SQL commands being executed on the database through a MySQL Server. The database designer can implement a stored procedure and then register it to be invoked when specific actions (INSERT, DELETE etc.) are performed on a table.

The following example shows how a simple stored procedure can be implemented and then registered against a table.

mysql> USE test;
Database changed
mysql> create table ASSETS (NAME varchar(30) not null primary key,VALUE int) engine=ndb;
Query OK, 0 rows affected (0.67 sec)
mysql> create table AUDIT_LOG (NOTE varchar(30) not NULL primary key) engine=ndb;
Query OK, 0 rows affected (0.56 sec)
mysql> delimiter //
mysql> create procedure log_it (log_string varchar(30))
    -> begin
    -> insert into AUDIT_LOG values(log_string);
    -> end
    -> //
Query OK, 0 rows affected (0.00 sec)
mysql> delimiter ;
mysql> create trigger ins_asset before insert on ASSETS
    -> for each row call log_it(new.name);
Query OK, 0 rows affected (0.00 sec

The stored procedure in this example is triggered whenever a new tuple is inserted into the ASSETS table. The procedure then inserts the asset’s name into the AUDIT_LOG table. If the tuple is deleted from the ASSETS table then the entry in the AUDIT_LOG table remains intact.

The following screen capture shows the results when adding a tuple to the table that contains the trigger.

mysql> insert into ASSETS values ('Computer',350);
Query OK, 1 row affected (0.01 sec)
mysql> select * from AUDIT_LOG;
+----------+
| NOTE     |
+----------+
| Computer |
+----------+
1 row in set (0.00 sec)

Note that as the trigger and stored procedure are implemented in the MySQL Server, they need to be separately defined in all of the MySQL Server instances where they are needed.

The following NDB API code adds a new tuple to the ASSETS table in much the same way as was done through SQL above (Note: my C++ is very rusty and so there will be glitches in this code – especially for string handling).

#include <NdbApi.hpp>
#include <stdio.h>
#include <string.h>
#include <iostream>
#include <cstdlib>

static void run_application(Ndb_cluster_connection &, char*);

#define PRINT_ERROR(code,msg) 
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ 
          << ", code: " << code 
          << ", msg: " << msg << "." << std::endl

#define APIERROR(error) { 
  PRINT_ERROR(error.code,error.message); 
  exit(-1); }

int main(int argc, char** argv)
{
  if (argc != 3)
  {
    std::cout << "Arguments are <connect_string cluster><asset_name>.n";
    exit(-1);
  }
  ndb_init();

  // connect to cluster and run application
  {
    const char *connectstring = argv[1];
    char *asset_name = argv[2];
    // Object representing the cluster
    Ndb_cluster_connection cluster_connection(connectstring);

    // Connect to cluster management server (ndb_mgmd)
    if (cluster_connection.connect(4 /* retries               */,
                                   5 /* delay between retries */,
                                   1 /* verbose               */))
    {
      std::cout << "Cluster management server was not ready within 30 secs.n";
      exit(-1);
    }

    // Connect and wait for the storage nodes (ndbd's)
    if (cluster_connection.wait_until_ready(30,0) < 0)
    {
      std::cout << "Cluster was not ready within 30 secs.n";
      exit(-1);
    }
    // run the application code
    run_application(cluster_connection, asset_name);
  }
  ndb_end(0);
  return 0;
}

static void do_insert(Ndb &, char*);

static void run_application(Ndb_cluster_connection &cluster_connection,
char *asset_name)
{
  /********************************************
  * Connect to database via NdbApi           *
  ********************************************/
  // Object representing the database
  Ndb myNdb( &cluster_connection, "test" );
  if (myNdb.init()) APIERROR(myNdb.getNdbError());
  do_insert(myNdb, asset_name);
}

static void do_insert(Ndb &myNdb, char *asset_name)
{
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("ASSETS");
  char str[20];
  str[0] = strlen(asset_name);
  strcpy(str +1, asset_name);

  if (myTable == NULL) APIERROR(myDict->getNdbError());
  NdbTransaction *myTransaction= myNdb.startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
  NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
  if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
  myOperation->insertTuple();
  myOperation->setValue("NAME", str);
  myOperation->setValue("VALUE", 555);
  if (myTransaction->execute( NdbTransaction::Commit ) == -1)
    APIERROR(myTransaction->getNdbError());
  myNdb.closeTransaction(myTransaction);
}

This code can then be executed and then the effects verified using SQL commands through the MySQL Server – note that the stored procedure has not been triggered and so the name has not been copied into the AUDIT_LOG table.

[billy@ws1 stored]$ ./test_stored_procedures localhost:1186 Monitor

mysql> select * from ASSETS;
+----------+-------+
| NAME     | VALUE |
+----------+-------+
| Monitor  |   555 |
| Computer |   350 |
+----------+-------+
2 rows in set (0.01 sec)

mysql> select * from AUDIT_LOG;
+----------+
| NOTE     |
+----------+
| Computer |
+----------+
1 row in set (0.00 sec)

It could easily be argued that triggers are not required when using the NDB API – simply code a wrapper method that also applies the required side effects. However, it is possible to come up with scenarios where triggers would be much more convenient – for example if the application is already littered with accesses to the data and you want to retrofit the side effect.

Fortunately, the NDB API includes the ability to register triggers against operations for a specific table. The code that follows implements a process that waits for an INSERT to be performed on the ASSETS table and then creates an entry in the AUDIT_LOG table just as the earlier stored procedure did.

#include <NdbApi.hpp>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <string.h>

#define APIERROR(error) 
{ std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" 
            << error.code << ", msg: " << error.message << "." << std::endl; 
  exit(-1); }

int myCreateEvent(Ndb* myNdb,
                  const char *eventName,
                  const char *eventTableName,
                  const char **eventColumnName,
                  const int noEventColumnName);

static void do_insert(Ndb*, char*);

int main(int argc, char** argv)
{
  if (argc < 2)
  {
    std::cout << "Arguments are <connect_string cluster> <timeout>].n";
    exit(-1);
  }
  const char *connectstring = argv[1];
  int timeout = atoi(argv[2]);
  ndb_init();
  Ndb_cluster_connection *cluster_connection=
  new Ndb_cluster_connection(connectstring);

  int r= cluster_connection->connect(5 /* retries               */,
                                     3 /* delay between retries */,
                                     1 /* verbose               */);
  if (r > 0)
  {
    std::cout
       << "Cluster connect failed, possibly resolved with more retries.n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout
       << "Cluster connect failed.n";
    exit(-1);
  }
  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
    exit(-1);
  }
  Ndb* myNdb= new Ndb(cluster_connection,
                      "test");  // Object representing the database
  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
  const char *eventName= "CHNG_IN_ASSETS";
  const char *eventTableName= "ASSETS";
  const int noEventColumnName= 2;
  const char *eventColumnName[noEventColumnName]=
    {"NAME",
     "VALUE"};

  // Create events
  myCreateEvent(myNdb,
  eventName,
  eventTableName,
  eventColumnName,
  noEventColumnName);

  // Normal values and blobs are unfortunately handled differently..
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;

  int i, j;
  j = 0;
  while (j < timeout) {
    // Start "transaction" for handling events
    NdbEventOperation* op;
  if ((op = myNdb->createEventOperation(eventName)) == NULL)
    APIERROR(myNdb->getNdbError());
  RA_BH recAttr[noEventColumnName];
  RA_BH recAttrPre[noEventColumnName];
  for (i = 0; i < noEventColumnName; i++) {
    recAttr[i].ra    = op->getValue(eventColumnName[i]);
    recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
  }
  if (op->execute())
    APIERROR(op->getNdbError());
  NdbEventOperation* the_op = op;
  i= 0;
  while (i < timeout) {
    int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
    if (r > 0) {
    while ((op= myNdb->nextEvent())) {
      i++;
      NdbRecAttr* ra = recAttr[0].ra;
      if (ra->isNULL() >= 0) { // we have a value
        if (ra->isNULL() == 0) { // we have a non-null value
          printf("NAME: %s ", ra->aRef());
          do_insert(myNdb, ra->aRef());
        } else
          printf("%-5s", "NULL");
        } else
        printf("%-5s", "-"); // no value
        ra = recAttr[1].ra;
        printf("n");
      }
    }
  }
  if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
  the_op = 0;
  j++;
  }
  {
    NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
    if (!myDict) APIERROR(myNdb->getNdbError());
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
  }
  delete myNdb;
  delete cluster_connection;
  ndb_end(0);
  return 0;
}

int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnNames,
const int noEventColumnNames)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());
  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
  if (!table) APIERROR(myDict->getNdbError());
  NdbDictionary::Event myEvent(eventName, *table);
  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);

  // Add event to database
  if (myDict->createEvent(myEvent) == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
    NdbError::SchemaObjectExists) {
    printf("Event creation failed, event existsn");
    printf("dropping Event...n");
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
    // try again
    // Add event to database
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
  } else
    APIERROR(myDict->getNdbError());
    return 0;
}
static void do_insert(Ndb* myNdb, char *asset_name)
{
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("AUDIT_LOG");
  char str[30];
  str[0] = strlen(asset_name);
  strcpy(str +1, asset_name);
  printf("Storing %i characters: %sn", strlen(asset_name), asset_name);
  if (myTable == NULL) APIERROR(myDict->getNdbError());
  NdbTransaction *myTransaction= myNdb->startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
  myOperation->insertTuple();
  myOperation->setValue("NOTE", str);
  if (myTransaction->execute( NdbTransaction::Commit ) == -1)
    APIERROR(myTransaction->getNdbError());
  myNdb->closeTransaction(myTransaction);
 }

We can then use the code to make the addition through the NDB API. We use one terminal to run the listener and then another to run the code to add the tuple.

[billy@ws1 stored]$ ./trigger_listener localhost:1186 100
[billy@ws1 stored]$ ./test_stored_procedures localhost:1186 Keyboard

mysql> select * from ASSETS;
+----------+-------+
| NAME     | VALUE |
+----------+-------+
| Keyboard |   555 |
| Computer |   350 |
| Monitor  |   555 |
+----------+-------+
3 rows in set (0.00 sec)

mysql> select * from AUDIT_LOG;
+-----------+
| NOTE      |
+-----------+
| Computer  |
| Keyboard  |
+-----------+
2 rows in set (0.00 sec)

A major advantage of this approach is that the trigger is implemented within the Cluster database and so is invoked regardless of where the INSERT is requested – whether it be through the NDB API or through any of the MySQL Servers. This is shown in the results that follow.

mysql> drop trigger ins_asset;
Query OK, 0 rows affected (0.00 sec)

mysql> drop procedure log_it;
Query OK, 0 rows affected (0.00 sec)
mysql> insert into ASSETS values("Printers", 200);
Query OK, 1 row affected (0.00 sec)

mysql> select * from ASSETS;
+----------+-------+
| NAME     | VALUE |
+----------+-------+
| Keyboard |   555 |
| Computer |   350 |
| Monitor  |   555 |
| Printers |   200 |
+----------+-------+
4 rows in set (0.00 sec)

mysql> select * from AUDIT_LOG;
+-----------+
| NOTE      |
+-----------+
| Printers  |
| Keyboard  |
| Computer  |
+-----------+
4 rows in set (0.00 sec)

Note that I first removed the original trigger and stored procedure that were defined in the MySQL Server.

There is another key difference between MySQL triggers and NDB events – triggers are executed as part of the MySQL transaction making the main database change whereas NDB events happen asynchronously. The effect of this is:

  • The original transaction will commit succesfully before the side effects have been processed
  • If the process waiting for the event disappears then the side effect will not be processed – for this reson, you may want to consider an audit/clean-up scripts  to cover these cases.

Conclusion

Stored procedures are fully supported for users or applications which access a Cluster database through a MySQL Server (whether directly using SQL or through any of the numerous connectors that are available). Applications which access the database through the NDB API have the full flexibility of C++ to implement functionality that can achieve the same results. Triggers are available whichever method is used to access the database – albeit with different implementations and slightly different functionality.





Leave a Reply

Your email address will not be published. Required fields are marked *