Tag Archive for NDB API

FOSDEM 2015 – SQL & NoSQL Presentation

Last weekend I got to present to the MySQL Developers Room at FOSDEM in Brussels.
FOSDEM-2015
The subject of my presentation was NoSQL and SQL the best of both worlds

There’s a lot of excitement around NoSQL Data Stores with the promise of simple access patterns, flexible schemas, scalability and High Availability. The downside comes in the form of losing ACID transactions, consistency, flexible queries and data integrity checks. What if you could have the best of both worlds? This session shows how MySQL Cluster provides simultaneous SQL and native NoSQL access to your data – whether a simple key-value API (Memcached), REST, JavaScript, Java or C++. You will hear how the MySQL Cluster architecture delivers in-memory real-time performance, 99.999% availability, on-line maintenance and linear, horizontal scalability through transparent auto-sharding.





Scalable, persistent, HA NoSQL Memcache storage using MySQL Cluster

Memcached API with Cluster Data Nodes

The native Memcached API for MySQL Cluster is now GA as part of MySQL Cluster 7.2

This post was first published in April 2011 when the first trial version of the Memcached API for MySQL Cluster was released; it was then up-versioned for the second MySQL Cluster 7.2 Development Milestone Release in October 2011. I’ve now refreshed the post based on the GA of MySQL Cluster 7.2 which includes the completed Memcache API.

There are a number of attributes of MySQL Cluster that make it ideal for lots of applications that are considering NoSQL data stores. Scaling out capacity and performance on commodity hardware, in-memory real-time performance (especially for simple access patterns), flexible schemas… sound familiar? In addition, MySQL Cluster adds transactional consistency and durability. In case that’s not enough, you can also simultaneously combine various NoSQL APIs with full-featured SQL – all working on the same data set. This post focuses on a new Memcached API that is now available to download, try out and deploy. This post steps through setting up Cluster with the Memcached API and then demonstrates how to read and write the same data through both Memcached and SQL (including for existing MySQL Cluster tables).

Download the community version from mysql.com or the commercial version from Oracle’s Software Delivery Cloud (note that there is not currently a Windows version).

Traditional use of Memcached

First of all a bit of background about Memcached. It has typically been used as a cache when the performance of the database of record (the persistent database) cannot keep up with application demand. When changing data, the application will push the change to the database, when reading data, the application first checks the Memached cache, if it is not there then the data is read from the database and copied into Memcached. If a Memcached instance fails or is restarted for maintenance reasons, the contents are lost and the application will need to start reading from the database again. Of course the database layer probably needs to be scaled as well so you send writes to the master and reads to the replication slaves.

This has become a classic architecture for web and other applications and the simple Memcached attribute-value API has become extremely popular amongst developers.

As an illustration of the simplicity of this API, the following example stores and then retrieves the string “Maidenhead” against the key “Test”:

telnet localhost 11211
set Test 0 0 10
Maidenhead!
END
get Test
VALUE Test 0 10
Maidenhead!
END

Note that if we kill and restart the memcached server, the data is lost (as it was only held in RAM):

get Test
END

New options for using Memcached API with MySQL Cluster

Architecture for Memcached NDB API

What we’re doing with MySQL Cluster is to offer a bunch of new ways of using this API but with the benefits of MySQL Cluster. The solution has been designed to be very flexible, allowing the application architect to find a configuration that best fits their needs.

A quick diversion on how this is implemented. The application sends reads and writes to the memcached process (using the standard Memcached API). This in turn invokes the Memcached Driver for NDB (which is part of the same process) which in turn calls the NDB API for very quick access to the data held in MySQL Cluster’s data nodes (it’s the fastest way of accessing MySQL Cluster).

Because the data is now stored in MySQL Cluster, it is persistent and you can transparently scale out by adding more data nodes (this is an on-line operation).

Another important point is that the NDB API is already a commonly used, fully functional access method that the Memcached API can exploit. For example, if you make a change to a piece of data then the change will automatically be written to any MySQL Server that has its binary logging enabled which in turn means that the change can be replicated to a second site.

Memcached API with Cluster Data Nodes

So the first (and probably simplest) architecture is to co-locate the Memcached API with the data nodes.

The applications can connect to any of the memcached API nodes – if one should fail just switch to another as it can access the exact same data instantly. As you add more data nodes you also add more memcached servers and so the data access/storage layer can scale out (until you hit the 48 data node limit).

Memcached server with the Application

Another simple option is to co-locate the Memcached API with the application. In this way, as you add more application nodes you also get more Memcached throughput. If you need more data storage capacity you can independently scale MySQL Cluster by adding more data nodes. One nice feature of this approach is that failures are handled very simply – if one App/Memcached machine should fail, all of the other applications just continue accessing their local Memcached API.

Separate Memcached layer

For maximum flexibility, you can have a separate Memcached layer so that the application, the Memcached API & MySQL Cluster can all be scaled independently.

In all of the examples so far, there has been a single source for the data (it’s all held in MySQL Cluster).






Local Cache in Memcached

If you choose, you can still have all or some of the data cached within the memcached server (and specify whether that data should also be persisted in MySQL Cluster) – you choose how to treat different pieces of your data. If for example, you had some data that is written to and read from frequently then store it just in MySQL Cluster, if you have data that is written to rarely but read very often then you might choose to cache it in Memcached as well and if you have data that has a short lifetime and wouldn’t benefit from being stored in MySQL Cluster then only hold it in Memcached. The beauty is that you get to configure this on a per-key-prefix basis (through tables in MySQL Cluster) and that the application doesn’t have to care – it just uses the Memcached API and relies on the software to store data in the right place(s) and to keep everything in sync.

Of course if you want to access the same data through SQL then you’d make sure that it was configured to be stored in MySQL Cluster.

Enough of the theory, how to try it out…

Installing & configuarying the software

As this post is focused on API access to the data rather than testing High Availability, performance or scalability the Cluster can be kept extremely simple with all of the processes (nodes) running on a single server. The only thing to be careful of when you create your Cluster is to make sure that you define at least 5 API sections (e.g. [mysqld]) in your configuration file so you can access using SQL and 2 Memcached servers (each uses 2 connections) at the same time.

For further information on how to set up a single-host Cluster, refer to this post or just follow the next few steps.

Create a config.ini file for the Cluster configuration:

[ndb_mgmd]
hostname=localhost
datadir=/home/billy/my_cluster/ndb_data
NodeId=1

[ndbd default]
noofreplicas=2
datadir=/home/billy/my_cluster/ndb_data

[ndbd]
hostname=localhost
NodeId=3

[ndbd]
hostname=localhost
NodeId=4

[mysqld]
NodeId=50

[mysqld]
NodeId=51

[mysqld]
NodeId=52

[mysqld]
NodeId=53

[mysqld]
NodeId=54

and a my.cnf file for the MySQL server:

[mysqld]
ndbcluster
datadir=/home/billy/my_cluster/mysqld_data

Before starting the Cluster, install the standard databases for the MySQL Server (from wherever you have MySQL Cluster installed – typically /usr/local/mysql):

[billy@ws2 mysql]$ ./scripts/mysql_install_db
  --basedir=/usr/local/mysql
  --datadir=/home/billy/my_cluster/mysqld_data
  --user=billy
Start up the system

We are now ready to start up the Cluster processes:

[billy@ws2 my_cluster]$ ndb_mgmd -f conf/config.ini
  --initial --configdir=/home/billy/my_cluster/conf/
[billy@ws2 my_cluster]$ ndbd
[billy@ws2 my_cluster]$ ndbd
[billy@ws2 my_cluster]$ ndb_mgm -e show # Wait for data nodes to start
[billy@ws2 my_cluster]$ mysqld --defaults-file=conf/my.cnf &

If your version doesn’t already have the ndbmemcache database installed then that should be your next step:

[billy@ws2 ~]$ mysql -h 127.0.0.1 -P3306 -u root < /usr/local/mysql/share/memcache-api/ndb_memcache_metadata.sql

After that, start the Memcached server (with the NDB driver activated):

[billy@ws2 ~]$  memcached -E /usr/local/mysql/lib/ndb_engine.so -e "connectstring=localhost:1186;role=db-only" -vv -c 20

Notice the “connectstring” – this allows the primary Cluster to be on a different machine to the Memcached API. Note that you can actually use the same Memcached server to access multiple Clusters – you configure this within the ndbmemcached database in the primary Cluster. In a production system you may want to include reconf=false amogst the -e parameters in order to stop configuration changes being applied to running Memcached servers (you’d need to restart those servers instead).

Try it out!

Next the fun bit – we can start testing it out:

[billy@ws2 ~]$ telnet localhost 11211

set maidenhead 0 0 3 
SL6 
STORED 
get maidenhead 
VALUE maidenhead 0 3 
SL6 
END

We can now check that the data really is stored in the database:

mysql> SELECT * FROM ndbmemcache.demo_table;
   +------------------+------------+-----------------+--------------+
   | mkey             | math_value | cas_value       | string_value |
   +------------------+------------+-----------------+--------------+
   | maidenhead       |       NULL | 263827761397761 | SL6          |
   +------------------+------------+-----------------+--------------+

Of course, you can also modify this data through SQL and immediately see the change through the Memcached API:

mysql> UPDATE ndbmemcache.demo_table SET string_value='sl6 4' WHERE mkey='maidenhead';

[billy@ws2 ~]$ telnet localhost 11211

get maidenhead
VALUE maidenhead 0 5 
SL6 4 
END

By default, the normal limit of 14K per row still applies when using the Memcached API; however, the standard configuration treats any key-value pair with a key-pefix of “b:” differently and will allow the value to be up to 3 Mb (note the default limit imposed by the Memcached server is 1 Mb and so you’d also need to raise that). Internally the contents of this value will be split between 1 row in ndbmemcache.demo_table_large and one or more rows in ndbmemcache.external_values.

Note that this is completely schema-less, the application can keep on adding new key/value pairs and they will all get added to the default table. This may well be fine for prototyping or modest sized databases. As you can see this data can be accessed through SQL but there’s a good chance that you’ll want a richer schema on the SQL side or you’ll need to have the data in multiple tables for other reasons (for example you want to replicate just some of the data to a second Cluster for geographic redundancy or to InnoDB for report generation).

The next step is to create your own databases and tables (assuming that you don’t already have them) and then create the definitions for how the app can get at the data through the Memcached API. First let’s create a table that has a couple of columns that we’ll also want to make accessible through the Memcached API:

mysql> CREATE DATABASE clusterdb; USE clusterdb;
mysql> CREATE TABLE towns_tab (town VARCHAR(30) NOT NULL PRIMARY KEY,
  zip VARCHAR(10), population INT, county VARCHAR(10)) ENGINE=NDB;
mysql> REPLACE INTO towns_tab VALUES ('Marlow', 'SL7', 14004, 'Berkshire');

Next we need to tell the NDB driver how to access this data through the Memcached API. Two ‘containers’ are created that identify the columns within our new table that will be exposed. We then define the key-prefixes that users of the Memcached API will use to indicate which piece of data (i.e. database/table/column) they are accessing:

mysql> USE ndbmemcache;
mysql> REPLACE INTO containers VALUES ('towns_cnt', 'clusterdb',
'towns_tab', 'town', 'zip', 0, NULL, NULL, NULL, NULL);
mysql> REPLACE INTO containers VALUES ('pop_cnt', 'clusterdb',
  'towns_tab', 'town', 'population', 0, NULL, NULL, NULL, NULL);
mysql> SELECT * FROM containers;
   +------------+-------------+------------------+-------------+----------------+-------+------------------+------------+--------------------+-----------------------------+
   | name       | db_schema   | db_table         | key_columns | value_columns  | flags | increment_column | cas_column | expire_time_column | large_values_table          |
   +------------+-------------+------------------+-------------+----------------+-------+------------------+------------+--------------------+-----------------------------+
   | demo_ext   | ndbmemcache | demo_table_large | mkey        | string_value   | 0     | NULL             | cas_value  | NULL               | ndbmemcache.external_values |
   | towns_cnt  | clusterdb   | towns_tab        | town        | zip            | 0     | NULL             | NULL       | NULL               | NULL                        |
   | demo_table | ndbmemcache | demo_table       | mkey        | string_value   | 0     | math_value       | cas_value  | NULL               | NULL                        |
   | pop_cnt    | clusterdb   | towns_tab        | town        | population     | 0     | NULL             | NULL       | NULL               | NULL                        |
   | demo_tabs  | ndbmemcache | demo_table_tabs  | mkey        | val1,val2,val3 | flags | NULL             | NULL       | expire_time        | NULL                        |
   +------------+-------------+------------------+-------------+----------------+-------+------------------+------------+--------------------+-----------------------------+
mysql> REPLACE INTO key_prefixes VALUES (1, 'twn_pr:', 0,
  'ndb-only', 'towns_cnt');
mysql> REPLACE INTO key_prefixes VALUES (1, 'pop_pr:', 0,
  'ndb-only', 'pop_cnt');
mysql> SELECT * FROM key_prefixes;
   +----------------+------------+------------+---------------+------------+
   | server_role_id | key_prefix | cluster_id | policy        | container  |
   +----------------+------------+------------+---------------+------------+
   |              1 | pop_pr:    |          0 | ndb-only      | pop_cnt    |
   |              0 | t:         |          0 | ndb-test      | demo_tabs  |
   |              3 |            |          0 | caching       | demo_table |
   |              0 |            |          0 | ndb-test      | demo_table |
   |              0 | mc:        |          0 | memcache-only | NULL       |
   |              1 | b:         |          0 | ndb-only      | demo_ext   |
   |              2 |            |          0 | memcache-only | NULL       |
   |              1 |            |          0 | ndb-only      | demo_table |
   |              0 | b:         |          0 | ndb-test      | demo_ext   |
   |              3 | t:         |          0 | caching       | demo_tabs  |
   |              1 | t:         |          0 | ndb-only      | demo_tabs  |
   |              4 |            |          0 | ndb-test      | demo_ext   |
   |              1 | twn_pr:    |          0 | ndb-only      | towns_cnt  |
   |              3 | b:         |          0 | caching       | demo_ext   |
   +----------------+------------+------------+---------------+------------+

At present it is necessary to restart the Memcached server in order to pick up the new key_prefix (and so you’d want to run multiple instances in order to maintain service):


[billy@ws2:~]$ memcached -E /usr/local/mysql/lib/ndb_engine.so -e "connectstring=localhost:1186;role=db-only" -vv -c 20
   07-Feb-2012 11:22:29 GMT NDB Memcache 5.5.19-ndb-7.2.4 started [NDB 7.2.4; MySQL 5.5.19]
   Contacting primary management server (localhost:1186) ...
   Connected to "localhost:1186" as node id 51.
   Retrieved 5 key prefixes for server role "db-only".
   The default behavior is that:
       GET uses NDB only
       SET uses NDB only
       DELETE uses NDB only.
   The 4 explicitly defined key prefixes are "b:" (demo_table_large), "pop_pr:" (towns_tab), 
      "t:" (demo_table_tabs) and "twn_pr:" (towns_tab)

Now these columns (and the data already added through SQL) are accessible through the Memcached API:

[billy@ws2 ~]$ telnet localhost 11211

get twn_pr:Marlow
VALUE twn_pr:Marlow 0 3 
SL7 
END 
set twn_pr:Maidenhead 0 0 3 
SL6 
STORED 
set pop_pr:Maidenhead 0 0 5 
42827 
STORED

and then we can check these changes through SQL:

mysql> SELECT * FROM clusterdb.towns_tab;
   +------------+------+------------+-----------+
   | town       | zip  | population | county    |
   +------------+------+------------+-----------+
   | Maidenhead | SL6  |      42827 | NULL      |
   | Marlow     | SL7  |      14004 | Berkshire |
   +------------+------+------------+-----------+

One final test is to start a second memcached server that will access the same data. As everything is running on the same host, we need to have the second server listen on a different port:

[billy@ws2 ~]$ memcached -E /usr/local/mysql/lib/ndb_engine.so 
   -e "connectstring=localhost:1186;role=db-only" -vv -c 20 
   -p 11212 -U 11212 
[billy@ws2 ~]$ telnet localhost 11212

get twn_pr:Marlow 
VALUE twn_pr:Marlow 0 3
SL7 
END

Memcached alongside NoSQL & SQL APIs

As mentioned before, there’s a wide range of ways of accessing the data in MySQL Cluster – both SQL and NoSQL. You’re free to mix and match these technologies – for example, a mission critical business application using SQL, a high-running web app using the Memcached API and a real-time application using the NDB API. And the best part is that they can all share the exact same data and they all provide the same HA infrastructure (for example synchronous replication and automatic failover within the Cluster and geographic replication to other clusters).

Finally, a reminder – please try this out and let us know what you think (or if you don’t have time to try it then let us now what you think anyway) by adding a comment to this post.





Using ClusterJ (part of MySQL Cluster Connector for Java) – a tutorial

Fig. 1 Java access to MySQL Cluster

ClusterJ is part of the MySQL Cluster Connector for Java which is currently in beta as part of MySQL Cluster 7.1. It is designed to provide a high performance method for Java applications to store and access data in a MySQL Cluster database. It is also designed to be easy for Java developers to use and is “in the style of” Hibernate/Java Data Objects (JDO) and JPA. It uses the Domain Object Model DataMapper pattern:

  • Data is represented as domain objects
  • Domain objects are separate from business logic
  • Domain objects are mapped to database tables

The purpose of ClusterJ is to provide a mapping from the table-oriented view of the data stored in MySQL Cluster to the Java objects used by the application. This is achieved by annotating interfaces representing the Java objects; where each persistent interface is mapped to a table and each property in that interface to a column. By default, the table name will match the interface name and the column names match the property names but this can be overridden using annotations.

Fig. 2 ClusterJ Interface Annotations

If the table does not already exist (for example, this is a brand new application with new data) then the table must be created manually – unlike OpenJPA, ClusterJ will not create the table automatically.

Figure 2 shows an example of an interface that has been created in order to represent the data held in the ‘employee’ table.

ClusterJ uses the following concepts:

  • Fig. 3 ClusterJ Terminology

    SessionFactory: There is one instance per MySQL Cluster instance for each Java Virtual Machine (JVM). The SessionFactory object is used by the application to get hold of sessions. The configuration details for the ClusterJ instance are defined in the Configuration properties which is an artifact associated with the SessionFactory.

  • Session: There is one instance per user (per Cluster, per JVM) and represents a Cluster connection
  • Domain Object: Objects representing the data from a table. The domain objects (and their relationships to the Cluster tables) are defined by annotated interfaces (as shown in the right-hand side of Figure 2.
  • Transaction: There is one transaction per session at any point in time. By default, each operation (query, insert, update, or delete) is run under a new transaction. . The Transaction interface allows developers to aggregate multiple operations into a single, atomic unit of work.

ClusterJ will be suitable for many Java developers but it has some restrictions which may make OpenJPA with the ClusterJPA plug-in more appropriate. These ClusterJ restrictions are:

  • Persistent Interfaces rather than persistent classes. The developer provides the signatures for the getter/setter methods rather than the properties and no extra methods can be added.
  • No Relationships between properties or between objects can be defined in the domain objects. Properties are primitive types.
  • No Multi-table inheritance; there is a single table per persistent interface
  • No joins in queries (all data being queried must be in the same table/interface)
  • No Table creation – user needs to create tables and indexes
  • No Lazy Loading – entire record is loaded at one time, including large object (LOBs).

Tutorial

This tutorial uses MySQL Cluster 7.1.2a on Fedora 12. If using earlier or more recent versions of MySQL Cluster then you may need to change the class-paths as explained in http://dev.mysql.com/doc/ndbapi/en/mccj-using-clusterj.html

It is necessary to have MySQL Cluster up and running. For simplicity all of the nodes (processes) making up the Cluster will be run on the same physical host, along with the application.

These are the MySQL Cluster configuration files being used :

config.ini:

[ndbd default]noofreplicas=2
datadir=/home/billy/mysql/my_cluster/data

[ndbd]
hostname=localhost
id=3

[ndbd]
hostname=localhost
id=4

[ndb_mgmd]
id = 1
hostname=localhost
datadir=/home/billy/mysql/my_cluster/data

[mysqld]
hostname=localhost
id=101

[api]
hostname=localhost

my.cnf:

[mysqld]
ndbcluster
datadir=/home/billy/mysql/my_cluster/data
basedir=/usr/local/mysql

This tutorial focuses on ClusterJ rather than on running MySQL Cluster; if you are new to MySQL Cluster then refer to running a simple Cluster before trying this tutorial.

ClusterJ needs to be told how to connect to our MySQL Cluster database; including the connect string (the address/port for the management node), the database to use, the user to login as and attributes for the connection such as the timeout values. If these parameters aren’t defined then ClusterJ will fail with run-time exceptions. This information represents the “configuration properties” shown in Figure 3.  These parameters can be hard coded in the application code but it is more maintainable to create a clusterj.properties file that will be imported by the application. This file should be stored in the same directory as your application source code.

clusterj.properties:

com.mysql.clusterj.connectstring=localhost:1186
 com.mysql.clusterj.database=clusterdb
 com.mysql.clusterj.connect.retries=4
 com.mysql.clusterj.connect.delay=5
 com.mysql.clusterj.connect.verbose=1
 com.mysql.clusterj.connect.timeout.before=30
 com.mysql.clusterj.connect.timeout.after=20
 com.mysql.clusterj.max.transactions=1024

As ClusterJ will not create tables automatically, the next step is to create ‘clusterdb’ database (referred to in clusterj.properties) and the ‘employee’ table:

[billy@ws1 ~]$ mysql -u root -h 127.0.0.1 -P 3306 -u root
 mysql>  create database clusterdb;use clusterdb;
 mysql> CREATE TABLE employee (
 ->     id INT NOT NULL PRIMARY KEY,
 ->     first VARCHAR(64) DEFAULT NULL,
 ->     last VARCHAR(64) DEFAULT NULL,
 ->     municipality VARCHAR(64) DEFAULT NULL,
 ->     started VARCHAR(64) DEFAULT NULL,
 ->     ended  VARCHAR(64) DEFAULT NULL,
 ->     department INT NOT NULL DEFAULT 1,
 ->     UNIQUE KEY idx_u_hash (first,last) USING HASH,
 ->     KEY idx_municipality (municipality)
 -> ) ENGINE=NDBCLUSTER;

The next step is to create the annotated interface:

Employee.java:

import com.mysql.clusterj.annotation.Column;
import com.mysql.clusterj.annotation.Index;
import com.mysql.clusterj.annotation.PersistenceCapable;
import com.mysql.clusterj.annotation.PrimaryKey;
@PersistenceCapable(table="employee")
@Index(name="idx_uhash")
public interface Employee {
@PrimaryKey
int getId();
void setId(int id);
String getFirst();
void setFirst(String first);

String getLast();
void setLast(String last);
@Column(name="municipality")
@Index(name="idx_municipality")
String getCity();
void setCity(String city);
String getStarted();
void setStarted(String date);
String getEnded();
void setEnded(String date);
Integer getDepartment();
void setDepartment(Integer department);
}

The name of the table is specified in the annotation @PersistenceCapable(table=”employee”) and then each column from the employee table has an associated getter and setter method defined in the interface. By default, the property name in the interface is the same as the column name in the table – the column name has been overridden for the City property by explicitly including the @Column(name=”municipality”) annotation just before the associated getter method. The @PrimaryKey annotation is used to identify the property whose associated column is the Primary Key in the table. ClusterJ is made aware of the existence of indexes in the database using the @Index annotation.

The next step is to write the application code which we step through here block by block; the first of which simply contains the import statements and then loads the contents of the clusterj.properties defined above:

Main.java (part 1):

import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.SessionFactory;
import com.mysql.clusterj.Session;
import com.mysql.clusterj.Query;
import com.mysql.clusterj.query.QueryBuilder;
import com.mysql.clusterj.query.QueryDomainType;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.*;
import java.util.Properties;
import java.util.List;
public class Main {
public static void main (String[] args) throws java.io.FileNotFoundException,java.io.IOException {
// Load the properties from the clusterj.properties file
File propsFile = new File("clusterj.properties");
InputStream inStream = new FileInputStream(propsFile);
Properties props = new Properties();
props.load(inStream);
//Used later to get userinput
BufferedReader br = new BufferedReader(new
InputStreamReader(System.in));

The next step is to get a handle for a SessionFactory from the ClusterJHelper class and then use that factory to create a session (based on the properties imported from clusterj.properties file.

Main.java (part 2):

// Create a session (connection to the database)
SessionFactory factory = ClusterJHelper.getSessionFactory(props);
Session session = factory.getSession();

Now that we have a session, it is possible to instantiate new Employee objects and then persist them to the database. Where there are no transaction begin() or commit() statements, each operation involving the database is treated as a separate transaction.

Main.java (part 3):

// Create and initialise an Employee
Employee newEmployee = session.newInstance(Employee.class);
newEmployee.setId(988);
newEmployee.setFirst("John");
newEmployee.setLast("Jones");
newEmployee.setStarted("1 February 2009");
newEmployee.setDepartment(666);
// Write the Employee to the database
session.persist(newEmployee);

At this point, a row will have been added to the ‘employee’ table. To verify this, a new Employee object is created and used to read the data back from the ‘employee’ table using the primary key (Id) value of 998:

Main.java (part 4):

// Fetch the Employee from the database
 Employee theEmployee = session.find(Employee.class, 988);
if (theEmployee == null)
 {System.out.println("Could not find employee");}
else
 {System.out.println ("ID: " + theEmployee.getId() + "; Name: " +
 theEmployee.getFirst() + " " + theEmployee.getLast());
 System.out.println ("Location: " + theEmployee.getCity());
 System.out.println ("Department: " + theEmployee.getDepartment());
 System.out.println ("Started: " + theEmployee.getStarted());
 System.out.println ("Left: " + theEmployee.getEnded());
}

This is the output seen at this point:

ID: 988; Name: John Jones
Location: null
Department: 666
Started: 1 February 2009
Left: null
Check the database before I change the Employee - hit return when you are done

The next step is to modify this data but it does not write it back to the database yet:

Main.java (part 5):

// Make some changes to the Employee & write back to the database
theEmployee.setDepartment(777);
theEmployee.setCity("London");
System.out.println("Check the database before I change the Employee -
hit return when you are done");
String ignore = br.readLine();

The application will pause at this point and give you chance to check the database to confirm that the original data has been added as a new row but the changes have not been written back yet:

mysql> select * from clusterdb.employee;
+-----+-------+-------+--------------+-----------------+-------+------------+
| id  | first | last  | municipality | started         | ended | department |
+-----+-------+-------+--------------+-----------------+-------+------------+
| 988 | John  | Jones | NULL         | 1 February 2009 | NULL  |        666 |
+-----+-------+-------+--------------+-----------------+-------+------------+

After hitting return, the application will continue and write the changes to the table, using an automatic transaction to perform the update.

Main.java (part 6):

session.updatePersistent(theEmployee);
System.out.println("Check the change in the table before I bulk add
Employees - hit return when you are done");
ignore = br.readLine();

The application will again pause so that we can now check that the change has been written back (persisted) to the database:

mysql> select * from clusterdb.employee;
+-----+-------+-------+--------------+-----------------+-------+------------+
| id  | first | last  | municipality | started         | ended | department |
+-----+-------+-------+--------------+-----------------+-------+------------+
| 988 | John  | Jones | London       | 1 February 2009 | NULL  |        777 |
+-----+-------+-------+--------------+-----------------+-------+------------+

The application then goes onto create and persist 100 new employees. To improve performance, a single transaction is used to that all of the changes can be written to the database at once when the commit() statement is run:

Main.java (part 7):

// Add 100 new Employees - all as part of a single transaction
 newEmployee.setFirst("Billy");
 newEmployee.setStarted("28 February 2009");
session.currentTransaction().begin();
for (int i=700;i<800;i++) {
 newEmployee.setLast("No-Mates"+i);
 newEmployee.setId(i+1000);
 newEmployee.setDepartment(i);
 session.persist(newEmployee);
 }
session.currentTransaction().commit();

The 100 new employees will now have been persisted to the database. The next step is to create and execute a query that will search the database for all employees in department 777 by using a QueryBuilder and using that to build a QueryDomain that compares the ‘department’ column with a parameter. After creating the, the department parameter is set to 777 (the query could subsequently be reused with different department numbers). The application then runs the query and iterates through and displays each of employees in the result set:

Main.java (part 8):

// Retrieve the set all of Employees in department 777
QueryBuilder builder = session.getQueryBuilder();
QueryDomainType<Employee> domain =
builder.createQueryDefinition(Employee.class);
domain.where(domain.get("department").equal(domain.param(
"department")));
Query<Employee> query = session.createQuery(domain);
query.setParameter("department",777);
List<Employee> results = query.getResultList();
for (Employee deptEmployee: results) {
System.out.println ("ID: " + deptEmployee.getId() + "; Name: " +
deptEmployee.getFirst() + " " + deptEmployee.getLast());
System.out.println ("Location: " + deptEmployee.getCity());
System.out.println ("Department: " + deptEmployee.getDepartment());
System.out.println ("Started: " + deptEmployee.getStarted());
System.out.println ("Left: " + deptEmployee.getEnded());
}
System.out.println("Last chance to check database before emptying table
- hit return when you are done");
ignore = br.readLine();

At this point, the application will display the following and prompt the user to allow it to continue:

ID: 988; Name: John Jones
Location: London
Department: 777
Started: 1 February 2009
Left: null
ID: 1777; Name: Billy No-Mates777
Location: null
Department: 777
Started: 28 February 2009
Left: null

We can compare that output with an SQL query performed on the database:

mysql> select * from employee where department=777;
 +------+-------+-------------+--------------+------------------+-------+------------+
 | id   | first | last        | municipality | started          | ended | department |
 +------+-------+-------------+--------------+------------------+-------+------------+
 |  988 | John  | Jones       | London       | 1 February 2009  | NULL  |        777 |
 | 1777 | Billy | No-Mates777 | NULL         | 28 February 2009 | NULL  |        777 |
 +------+-------+-------------+--------------+------------------+-------+------------+

Finally, after pressing return again, the application will remove all employees:

Main.java (part 9):

session.deletePersistentAll(Employee.class);
 }
}

As a final check, an SQL query confirms that all of the rows have been deleted from the ‘employee’ table.

mysql> select * from employee;
Empty set (0.00 sec)

Compiling and running the ClusterJ tutorial code

javac -classpath /usr/local/mysql/share/mysql/java/clusterj-api.jar:. Main.java Employee.java
java -classpath /usr/local/mysql/share/mysql/java/clusterj.jar:. -Djava.library.path=/usr/local/mysql/lib Main
 

Download the source code for this tutorial from here (together with the code for the up-coming ClusterJPA tutorial).





MySQL Cluster Connector for Java – replay available for part 1 of the webinar

The replay of the two webinars can now be accesed from mysql.com

Remember that the second part of the webinar will be on March 3rd (details below).

ClusterJ Architecture

ClusterJ Architecture

MySQL have been working on a new way of accessing MySQL Cluster using Java. Designed for Java developers, the MySQL Cluster Connector for Java implements an easy-to-use and high performance native Java interface and OpenJPA plug-in that maps Java classes to tables stored in the high availability, real-time MySQL Cluster database.

There is a series of 2 webinars coming up, as always these are free to attend – you just need to register in advance:

Part 1: Tuesday, February 16, 2010: 10:00 Pacific time

  • an overview of the MySQL Cluster Connector for Java
  • what these technologies bring to Java developers
  • implementation details of the MySQL Cluster Java API and Plug-In for OpenJPA
  • configuring the connection to MySQL Cluster
  • creating the Java Domain Object Model for your tables
  • managing insert, update, and delete operations
  • querying the database
  • how to get started developing new Java applications using these interfaces

Accessfrom mysql.com

an overview of the MySQL Cluster Connector for Java
what these technologies bring to Java developers
implementation details of the MySQL Cluster Java API and Plug-In for OpenJPA
configuring the connection to MySQL Cluster
creating the Java Domain Object Model for your tables
managing insert, update, and delete operations
querying the database
how to get started developing new Java applications using these interfaces

Part 2: Wednesday, March 03, 2010: 10:00 Pacific time

  • how MySQL Cluster Connector for Java coexists with existing OpenJPA / TopLink / JDBC-based apps
  • how to evaluate the MySQL Cluster Connector for Java alternatives
  • performance comparisons with both existing Java access and with native NDB API access to MySQL Cluster
  • what the future holds for this technology

Wed, Mar 03: 08:00 Hawaii time
Wed, Mar 03: 11:00 Mountain time (America)
Wed, Mar 03: 12:00 Central time (America)
Wed, Mar 03: 13:00 Eastern time (America)
Wed, Mar 03: 18:00 UTC
Wed, Mar 03: 18:00 Western European time
Wed, Mar 03: 19:00 Central European time
Wed, Mar 03: 20:00 Eastern European time

Register for Part 2 here.

This functionality isn’t GA but it is available for you to try and we’d love to get feedback (which you can provide through the MySQL Cluster forum or by emailing cluster@lists.mysql.com

If you want to see for yourself then take a look at the Blog entry from Bernhard Ocklin – the engineering manager responsible for this work.





New white paper: Guide to Optimizing Performance of the MySQL Cluster Database

MySQL Cluster Connection Pooling

MySQL Cluster Connection Pooling

This guide explores how to tune and optimize the MySQL Cluster database to handle diverse workload requirements. It discusses data access patterns and how to build distribution awareness into applications, before exploring schema and query optimization, tuning of parameters and how to get the best out of the latest innovations in hardware design.

The Guide concludes with recent performance benchmarks conducted with the MySQL Cluster database, an overview of how MySQL Cluster can be integrated with other MySQL storage engines, before summarizing additional resources that will enable you to optimize MySQL Cluster performance with your applications.

Download the white paper (as always, for free) from: http://www.mysql.com/why-mysql/white-papers/mysql_wp_cluster_perfomance.php





Using NDB API Events to mask/hide colum data when replicating

If you  have asynchronous replication where the slave database is using MySQL Cluster then you can use the NDB API events functionality to mask/overwrite data. You might do this for example if the replica is to be used for generating reports where some of the data is sensitive and not relevant to those reports. Unlike stored procedures, NDB API events will be triggered on the slave.

The first step is to set up replication (master->slave rather than multi-master) as described in Setting up MySQL Asynchronous Replication for High Availability).

In this example, the following table definition is used:

mysql> use clusterdb;
mysql> create table ASSETS (CODE int not null primary key, VALUE int) engine=ndb;

The following code should be compiled and then executed on a node within the slave Cluster:

#include <NdbApi.hpp>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <string.h>

#define APIERROR(error) 
  { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" 
  << error.code << ", msg: " << error.message << "." << std::endl; 
  exit(-1); }

int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnName,
const int noEventColumnName);

static void do_blank(Ndb*, int);

int main(int argc, char** argv)
{
  if (argc < 1)
 {
    std::cout << "Arguments are <connect_string cluster>.n";
    exit(-1);
  }
  const char *connectstring = argv[1];

  ndb_init();

  Ndb_cluster_connection *cluster_connection=
  new Ndb_cluster_connection(connectstring); // Object representing the cluster

  int r= cluster_connection->connect(5 /* retries               */,
  3 /* delay between retries */,
  1 /* verbose               */);
  if (r > 0)
  {
    std::cout << "Cluster connect failed, possibly resolved with more retries.n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout << "Cluster connect failed.n";
    exit(-1);
  }

  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
    exit(-1);
  }

  Ndb* myNdb= new Ndb(cluster_connection,
                      "clusterdb");  // Object representing the database

  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());

  const char *eventName= "CHNG_IN_ASSETS";
  const char *eventTableName= "ASSETS";
  const int noEventColumnName= 2;
  const char *eventColumnName[noEventColumnName]=
  {"CODE",
   "VALUE"};

  // Create events
  myCreateEvent(myNdb,
  eventName,
  eventTableName,
  eventColumnName,
  noEventColumnName);

  // Normal values and blobs are unfortunately handled differently..
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;

  int i;

  // Start "transaction" for handling events
  NdbEventOperation* op;
  printf("create EventOperationn");
  if ((op = myNdb->createEventOperation(eventName)) == NULL)
    APIERROR(myNdb->getNdbError());

  printf("get valuesn");
  RA_BH recAttr[noEventColumnName];
  RA_BH recAttrPre[noEventColumnName];

  for (i = 0; i < noEventColumnName; i++) {
    recAttr[i].ra    = op->getValue(eventColumnName[i]);
    recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
  }

  // set up the callbacks
  // This starts changes to "start flowing"
  if (op->execute())
    APIERROR(op->getNdbError());

  while (true) {
    int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
    if (r > 0) {
      while ((op= myNdb->nextEvent())) {
        NdbRecAttr* ra = recAttr[0].ra;
        if (ra->isNULL() >= 0) { // we have a value
          if (ra->isNULL() == 0) { // we have a non-null value
            printf("CODE: %d ", ra->u_32_value());
            do_blank(myNdb, ra->u_32_value());
          } else 
            printf("%-5s", "NULL");
          } else
            printf("%-5s", "-"); // no value
            ra = recAttr[1].ra;
            printf("n");
          }
        }
      }
    }

int myCreateEvent(Ndb* myNdb,
                  const char *eventName,
                  const char *eventTableName,
                  const char **eventColumnNames,
                  const int noEventColumnNames)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());

  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
  if (!table) APIERROR(myDict->getNdbError());

  NdbDictionary::Event myEvent(eventName, *table);
  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);

  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);

  // Add event to database
  if (myDict->createEvent(myEvent) == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
            NdbError::SchemaObjectExists) {
    printf("Event creation failed, event existsn");
    printf("dropping Event...n");
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
    // try again
    // Add event to database
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
  } else
    APIERROR(myDict->getNdbError());

    return 0;
}

static void do_blank(Ndb* myNdb, int code)
{
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("ASSETS");

  if (myTable == NULL)
  APIERROR(myDict->getNdbError());

  NdbTransaction *myTransaction= myNdb->startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb->getNdbError());

  printf("Replacing VALUE with 0 for CODE: %d ", code);

  NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
  if (myOperation == NULL) APIERROR(myTransaction->getNdbError());

  myOperation->updateTuple();
  myOperation->equal("CODE", code);
  myOperation->setValue("VALUE", 0);

  if (myTransaction->execute( NdbTransaction::Commit ) == -1)
    APIERROR(myTransaction->getNdbError());

  myNdb->closeTransaction(myTransaction);
}

shell> slave_filter 127.0.0.1:1186

From the master Cluster, insert some values (note that the example can easily be extended to cover updates too):

mysql> insert into ASSETS values (101, 50),(102, 40), (103, 99);

and then check that on the slave the value has been set to 0 for each of the entries:

mysql> select * from ASSETS;
+------+-------+
| CODE | VALUE |
+------+-------+
|  100 |     0 |
|  103 |     0 |
|  101 |     0 |
|  102 |     0 |
+------+-------+

How this works…. The table data is replicated as normal and the real values are stored in the slave. The “slave_filter” process has registered against insert operations on this table and when it’s triggered it sets the VALUE field to 0. The event is processes asynchronously from the replication and so there will be some very narrow window during which the true values would be stored in the slave.





Doxygen output for MySQL Cluster NDB API & MGM API

NDB API Documentation

NDB API Documentation


A new page has been added to this site: NDB API Docs which presents the information from the header files for both the NDB API and the NDB Management API.

The material has been generated using doxygen and will be refreshed shortly after any new major, minor or maintenance release is made generally available (starting from MySQL Cluster 7.0.6).





Intelligent user-controlled partitioning and writing distribution-aware NDB API Applications

Default partitioning

By default, Cluster will partition based on primary key

By default, Cluster will partition based on primary key

When adding rows to a table that’s using MySQL Cluster as the storage engine, each row is assigned to a partition where that partition is mastered by a particular data node in the Cluster. The best performance comes when all of the data required to satisfy a transaction is held within a single partition so that it can be satisfied within  a single data node rather than being bounced back and forth between multiple nodes where  extra latency will be introduced.

By default, Cluster partions the data by hashing the primary key. This is not always optimal.

For example, if we have 2 tables, the first using a single-column primary key (sub_id) and the second using a composite key (sub_id, service_name)…

mysql> describe names;
+--------+-------------+------+-----+---------+-------+
| Field  | Type        | Null | Key | Default | Extra |
+--------+-------------+------+-----+---------+-------+
| sub_id | int(11)     | NO   | PRI | NULL    |       |
| name   | varchar(30) | YES  |     | NULL    |       |
+--------+-------------+------+-----+---------+-------+

mysql> describe services;
+--------------+-------------+------+-----+---------+-------+
| Field        | Type        | Null | Key | Default | Extra |
+--------------+-------------+------+-----+---------+-------+
| sub_id       | int(11)     | NO   | PRI | 0       |       |
| service_name | varchar(30) | NO   | PRI |         |       |
| service_parm | int(11)     | YES  |     | NULL    |       |
+--------------+-------------+------+-----+---------+-------+

If we then add data to these (initially empty) tables, we can then use the ‘explain’ command to see which partitions (and hence phyical hosts) are used to store the data for this single subscriber…

mysql> insert into names values (1,'Billy');

mysql> insert into services values (1,'VoIP',20),(1,'Video',654),(1,'IM',878),(1,'ssh',666);

mysql> explain partitions select * from names where sub_id=1;
+----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+
| id | select_type | table | partitions | type  | possible_keys | key     | key_len | ref   | rows | Extra |
+----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+
|  1 | SIMPLE      | names | p3         | const | PRIMARY       | PRIMARY | 4       | const |    1 |       |
+----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+

mysql> explain partitions select * from services where sub_id=1;
+----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+
| id | select_type | table    | partitions  | type | possible_keys | key     | key_len | ref   | rows | Extra |
+----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+
|  1 | SIMPLE      | services | p0,p1,p2,p3 | ref  | PRIMARY       | PRIMARY | 4       | const |   10 |       |
+----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+

The service records for the same subscriber (sub_id = 1) are split accross 4 diffent partitions (p0, p1, p2 & p3). This means that the query results in messages being passed backwards and forwards between the 4 different data nodes which cnsumes extra CPU time and incurs extra latency.

User-defined partitioning to the rescue

We can override the default behaviour by telling Cluster which fields should be fed into the hash algorithm. For our example, it’s reasonable to expect a transaction to access multiple records for the same subscriber (identified by their sub_id) and so the application will perform best if all of the rows for that sub_id are held in the same partition…

mysql> drop table services;

mysql> create table services (sub_id int, service_name varchar (30), service_parm int, primary key (sub_id, service_name)) engine = ndb
-> partition by key (sub_id);

mysql> insert into services values (1,'VoIP',20),(1,'Video',654),(1,'IM',878),(1,'ssh',666);

mysql> explain partitions select * from services where sub_id=1;
+----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+
| id | select_type | table    | partitions | type | possible_keys | key     | key_len | ref   | rows | Extra |
+----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+
|  1 | SIMPLE      | services | p3         | ref  | PRIMARY       | PRIMARY | 4       | const |   10 |       |
+----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+

Now all of the rows for sub_id=1 from the services table are now held within a single partition (p3) which is the same as that holding the row for the same sub_id in the names table. Note that it wasn’t necessary to drop, recreate and re-provision the services table, the following command would have had the same effect:

mysql> alter table services partition by key (sub_id);

Writing a distribution-aware application using the NDB API

Distribution unaware NDB API application

Distribution unaware NDB API application

In our example, the data is nicely partitioned for optimum performance when accessing all of the subscriber’s data – a single data node holding all of their data. However, there is another step to take to get the best out of your NDB-API based application. By default, the NDB API will use the Transaction Coordinator (TC) on a ‘random’ data node to handle the transaction – we could get lucky and the guess is correct but it’s more likely that it will be sent to the wrong data node which with then have to proxy it to the correct data node. The probability of getting it right first time reduces as the number of node groups increases and so can prevent linear scaling.

It’s very simple to modify this behaviour so that the best data node/TC is hit first time, every time. When creating the transaction, the application can include parameters telling the NDB API one of the tables to be accessed and for what key(s). The NDB API will then use that information to identify the best TC to use…

const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *namesTable= myDict->getTable("names");
const NdbDictionary::Table *servicesTable= myDict->getTable("services");

NdbRecAttr *myRecAttr;

Ndb::Key_part_ptr dist_key[2];
dist_key[0].ptr = (const void*) &sub_id;
dist_key[0].len = sizeof(sub_id);
dist_key[1].ptr = NULL;
dist_key[1].len = NULL;

if (namesTable == NULL)
APIERROR(myDict->getNdbError());

if (servicesTable == NULL)
APIERROR(myDict->getNdbError());

NdbTransaction *myTransaction= myNdb.startTransaction(namesTable,
dist_key);
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());

NdbOperation *myOperation= myTransaction->getNdbOperation(namesTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());

myOperation->readTuple(NdbOperation::LM_Read);
myOperation->equal("sub_id",sub_id);

myRecAttr= myOperation->getValue("name", NULL);
if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());

// Perform operations on "services" table as well as part of another operation
// if required; the subscriber's data will be in the same data node

if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());

printf(" %2d    %sn",
sub_id, myRecAttr->aRef());

myNdb.closeTransaction(myTransaction);

Note that as the services table has been configured to use the same field (sub_id) for partitioning as the names table, the startTransaction method only needs to know about the namesTable as the TC that the NDB API selects will serve just as well for this subscriber’s data from the services table. The rest of the code can be found in distaware.





Batching – improving MySQL Cluster performance when using the NDB API

As many people are aware, the best performance can be achieved from MySQL Cluster by using the native (C++) NDB API (rather than using SQL via a MySQL Server). What’s less well known is that you can improve the performance of your NDB-API enabled application even further by ‘batching’. This article attempts to explain why batching helps and how to do it.

What is batching and why does it help?

NDB API accessing data from the Cluster without batching

NDB API accessing data from the Cluster without batching

Batching involves sending multiple operations from the application to the Cluster in one group rather than individually; the Cluster then processes these operations and sends back the results. Without batching, each of these operations incurs the latency of crossing the network as well as consuming CPU time on both the application and data node hosts.

By batching together multiple operations, all of the requests can be sent in one message and all of the replies received in another – thus reducing the number of messages and hence the latency and CPU time consumed.

How to use batching with the MySQL Cluster NDB API

Batched NDB API Operations

Batched NDB API Operations

The principle is that you batch together as many operations as you can, execute them together and then interpret the results. After interpretting the results, the application may then decide to send in another batch of operations.

An NDB API transaction consists of one or more operations where each operation (currently) acts on a single table and could be a simple primary key read or write or a complex table scan.

The operation is not sent to the Cluster at the point that it’s defined. Instead, the application must explicitly request that all operations defined within the transaction up to that point be executed – at which point, the NDB API can send the batch of operations to the data nodes to be processed. The application may request that the transaction be committed at that point or it may ask for the transaction to be held open so that it can analyse the results from the first set of operations and then use that information within a subsequent series of operations and then commit the transaction after executing that second batch of operations.

The following code sample shows how this can be implemented in practice (note that the application logic and all error handling has been ommited).

const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();

const NdbDictionary::Table *myTable= myDict->getTable("tbl1");
const NdbDictionary::Table *myTable2= myDict->getTable("tbl2");

NdbTransaction *myTransaction= myNdb.startTransaction();

// Read all of the required data as part of a single batch

NdbOperation *myOperation= myTransaction->getNdbOperation(myTable1);
myOperation->readTuple(NdbOperation::LM_Read);
myOperation->equal("ref", asset_num);
myRecAttr= myOperation->getValue("cost", NULL);

NdbOperation *myOperation2= myTransaction->getNdbOperation(myTable2);
myOperation2->readTuple(NdbOperation::LM_Read);
myOperation2->equal("ref", asset_num);
myRecAttr= myOperation->getValue("volume", NULL);

myTransaction->execute(NdbTransaction::NoCommit);

// NOT SHOWN: Application logic interprets results from first set of operations

// Based on the data read during the initial batch, make the necessary changes

myOperation *myOperation3= myTransaction->getNdbOperation(myTable1);
myOperation3->updateTuple();
myOperation3->equal("ref", asset_num);
myOperation2->setValue("cost", new_cost);

myOperation *myOperation4= myTransaction->getNdbOperation(myTable2);
myOperation4->updateTuple();
myOperation4->equal("ref", asset_num);
myOperation4->setValue("volume", new_volume);

myTransaction->execute( NdbTransaction::Commit);
myNdb.closeTransaction(myTransaction);




Are Stored Procedures available with MySQL Cluster?

The answer is yes – kind of.

Stored procedures are implemented in a MySQL Server and can be used regardless of the storage engine being used for a specific table. One inference from this is that they won’t work when accessing the Cluster database directly through the NDB API.

This leads to the question of whether or not that limitation actually restricts what you can achieve. This article gives a brief introduction to stored procedures and looks at how the same results can be achieved using the NDB API.

Stored procedures provide a rudimentary way of implementing functionality within the database (rather than in the application code). They are implemented by the database designer and have the ability to perform computations as well as make changes to the data in the database. A typical use of stored procedures would be to control all access to the data by a user or application – for example, to impose extra checks on the data or make sure that all linked data is updated rather than leaving it to the user or application designer to always remember to do it. To impose this, the DBA could grant permission to users to call the stored procedures but not write to the tables directly.

This functionality can be very useful when the data is being accessed through the SQL interface. If using the NDB API then you have the full power of the C++ language at your disposal and so a designer can code whatever checks and side effects are needed within a wrapper method and then have applications use those methods rather than accessing the raw NDB API directly for those changes.

There is one piece of functionality available using stored procedures which could be very helpful to applications using the NDB API – triggers. The rest of this article explains what triggers are; how they’re used and how that same results can be achieved using the NDB API.

Triggers

Triggers allow stored code to be invoked as a side effect of SQL commands being executed on the database through a MySQL Server. The database designer can implement a stored procedure and then register it to be invoked when specific actions (INSERT, DELETE etc.) are performed on a table.

The following example shows how a simple stored procedure can be implemented and then registered against a table.

mysql> USE test;
Database changed
mysql> create table ASSETS (NAME varchar(30) not null primary key,VALUE int) engine=ndb;
Query OK, 0 rows affected (0.67 sec)
mysql> create table AUDIT_LOG (NOTE varchar(30) not NULL primary key) engine=ndb;
Query OK, 0 rows affected (0.56 sec)
mysql> delimiter //
mysql> create procedure log_it (log_string varchar(30))
    -> begin
    -> insert into AUDIT_LOG values(log_string);
    -> end
    -> //
Query OK, 0 rows affected (0.00 sec)
mysql> delimiter ;
mysql> create trigger ins_asset before insert on ASSETS
    -> for each row call log_it(new.name);
Query OK, 0 rows affected (0.00 sec

The stored procedure in this example is triggered whenever a new tuple is inserted into the ASSETS table. The procedure then inserts the asset’s name into the AUDIT_LOG table. If the tuple is deleted from the ASSETS table then the entry in the AUDIT_LOG table remains intact.

The following screen capture shows the results when adding a tuple to the table that contains the trigger.

mysql> insert into ASSETS values ('Computer',350);
Query OK, 1 row affected (0.01 sec)
mysql> select * from AUDIT_LOG;
+----------+
| NOTE     |
+----------+
| Computer |
+----------+
1 row in set (0.00 sec)

Note that as the trigger and stored procedure are implemented in the MySQL Server, they need to be separately defined in all of the MySQL Server instances where they are needed.

The following NDB API code adds a new tuple to the ASSETS table in much the same way as was done through SQL above (Note: my C++ is very rusty and so there will be glitches in this code – especially for string handling).

#include <NdbApi.hpp>
#include <stdio.h>
#include <string.h>
#include <iostream>
#include <cstdlib>

static void run_application(Ndb_cluster_connection &, char*);

#define PRINT_ERROR(code,msg) 
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ 
          << ", code: " << code 
          << ", msg: " << msg << "." << std::endl

#define APIERROR(error) { 
  PRINT_ERROR(error.code,error.message); 
  exit(-1); }

int main(int argc, char** argv)
{
  if (argc != 3)
  {
    std::cout << "Arguments are <connect_string cluster><asset_name>.n";
    exit(-1);
  }
  ndb_init();

  // connect to cluster and run application
  {
    const char *connectstring = argv[1];
    char *asset_name = argv[2];
    // Object representing the cluster
    Ndb_cluster_connection cluster_connection(connectstring);

    // Connect to cluster management server (ndb_mgmd)
    if (cluster_connection.connect(4 /* retries               */,
                                   5 /* delay between retries */,
                                   1 /* verbose               */))
    {
      std::cout << "Cluster management server was not ready within 30 secs.n";
      exit(-1);
    }

    // Connect and wait for the storage nodes (ndbd's)
    if (cluster_connection.wait_until_ready(30,0) < 0)
    {
      std::cout << "Cluster was not ready within 30 secs.n";
      exit(-1);
    }
    // run the application code
    run_application(cluster_connection, asset_name);
  }
  ndb_end(0);
  return 0;
}

static void do_insert(Ndb &, char*);

static void run_application(Ndb_cluster_connection &cluster_connection,
char *asset_name)
{
  /********************************************
  * Connect to database via NdbApi           *
  ********************************************/
  // Object representing the database
  Ndb myNdb( &cluster_connection, "test" );
  if (myNdb.init()) APIERROR(myNdb.getNdbError());
  do_insert(myNdb, asset_name);
}

static void do_insert(Ndb &myNdb, char *asset_name)
{
  const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("ASSETS");
  char str[20];
  str[0] = strlen(asset_name);
  strcpy(str +1, asset_name);

  if (myTable == NULL) APIERROR(myDict->getNdbError());
  NdbTransaction *myTransaction= myNdb.startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
  NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
  if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
  myOperation->insertTuple();
  myOperation->setValue("NAME", str);
  myOperation->setValue("VALUE", 555);
  if (myTransaction->execute( NdbTransaction::Commit ) == -1)
    APIERROR(myTransaction->getNdbError());
  myNdb.closeTransaction(myTransaction);
}

This code can then be executed and then the effects verified using SQL commands through the MySQL Server – note that the stored procedure has not been triggered and so the name has not been copied into the AUDIT_LOG table.

[billy@ws1 stored]$ ./test_stored_procedures localhost:1186 Monitor

mysql> select * from ASSETS;
+----------+-------+
| NAME     | VALUE |
+----------+-------+
| Monitor  |   555 |
| Computer |   350 |
+----------+-------+
2 rows in set (0.01 sec)

mysql> select * from AUDIT_LOG;
+----------+
| NOTE     |
+----------+
| Computer |
+----------+
1 row in set (0.00 sec)

It could easily be argued that triggers are not required when using the NDB API – simply code a wrapper method that also applies the required side effects. However, it is possible to come up with scenarios where triggers would be much more convenient – for example if the application is already littered with accesses to the data and you want to retrofit the side effect.

Fortunately, the NDB API includes the ability to register triggers against operations for a specific table. The code that follows implements a process that waits for an INSERT to be performed on the ASSETS table and then creates an entry in the AUDIT_LOG table just as the earlier stored procedure did.

#include <NdbApi.hpp>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <string.h>

#define APIERROR(error) 
{ std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" 
            << error.code << ", msg: " << error.message << "." << std::endl; 
  exit(-1); }

int myCreateEvent(Ndb* myNdb,
                  const char *eventName,
                  const char *eventTableName,
                  const char **eventColumnName,
                  const int noEventColumnName);

static void do_insert(Ndb*, char*);

int main(int argc, char** argv)
{
  if (argc < 2)
  {
    std::cout << "Arguments are <connect_string cluster> <timeout>].n";
    exit(-1);
  }
  const char *connectstring = argv[1];
  int timeout = atoi(argv[2]);
  ndb_init();
  Ndb_cluster_connection *cluster_connection=
  new Ndb_cluster_connection(connectstring);

  int r= cluster_connection->connect(5 /* retries               */,
                                     3 /* delay between retries */,
                                     1 /* verbose               */);
  if (r > 0)
  {
    std::cout
       << "Cluster connect failed, possibly resolved with more retries.n";
    exit(-1);
  }
  else if (r < 0)
  {
    std::cout
       << "Cluster connect failed.n";
    exit(-1);
  }
  if (cluster_connection->wait_until_ready(30,30))
  {
    std::cout << "Cluster was not ready within 30 secs." << std::endl;
    exit(-1);
  }
  Ndb* myNdb= new Ndb(cluster_connection,
                      "test");  // Object representing the database
  if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
  const char *eventName= "CHNG_IN_ASSETS";
  const char *eventTableName= "ASSETS";
  const int noEventColumnName= 2;
  const char *eventColumnName[noEventColumnName]=
    {"NAME",
     "VALUE"};

  // Create events
  myCreateEvent(myNdb,
  eventName,
  eventTableName,
  eventColumnName,
  noEventColumnName);

  // Normal values and blobs are unfortunately handled differently..
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;

  int i, j;
  j = 0;
  while (j < timeout) {
    // Start "transaction" for handling events
    NdbEventOperation* op;
  if ((op = myNdb->createEventOperation(eventName)) == NULL)
    APIERROR(myNdb->getNdbError());
  RA_BH recAttr[noEventColumnName];
  RA_BH recAttrPre[noEventColumnName];
  for (i = 0; i < noEventColumnName; i++) {
    recAttr[i].ra    = op->getValue(eventColumnName[i]);
    recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
  }
  if (op->execute())
    APIERROR(op->getNdbError());
  NdbEventOperation* the_op = op;
  i= 0;
  while (i < timeout) {
    int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
    if (r > 0) {
    while ((op= myNdb->nextEvent())) {
      i++;
      NdbRecAttr* ra = recAttr[0].ra;
      if (ra->isNULL() >= 0) { // we have a value
        if (ra->isNULL() == 0) { // we have a non-null value
          printf("NAME: %s ", ra->aRef());
          do_insert(myNdb, ra->aRef());
        } else
          printf("%-5s", "NULL");
        } else
        printf("%-5s", "-"); // no value
        ra = recAttr[1].ra;
        printf("n");
      }
    }
  }
  if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
  the_op = 0;
  j++;
  }
  {
    NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
    if (!myDict) APIERROR(myNdb->getNdbError());
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
  }
  delete myNdb;
  delete cluster_connection;
  ndb_end(0);
  return 0;
}

int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnNames,
const int noEventColumnNames)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());
  const NdbDictionary::Table *table= myDict->getTable(eventTableName);
  if (!table) APIERROR(myDict->getNdbError());
  NdbDictionary::Event myEvent(eventName, *table);
  myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);

  // Add event to database
  if (myDict->createEvent(myEvent) == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
    NdbError::SchemaObjectExists) {
    printf("Event creation failed, event existsn");
    printf("dropping Event...n");
    if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
    // try again
    // Add event to database
    if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
  } else
    APIERROR(myDict->getNdbError());
    return 0;
}
static void do_insert(Ndb* myNdb, char *asset_name)
{
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
  const NdbDictionary::Table *myTable= myDict->getTable("AUDIT_LOG");
  char str[30];
  str[0] = strlen(asset_name);
  strcpy(str +1, asset_name);
  printf("Storing %i characters: %sn", strlen(asset_name), asset_name);
  if (myTable == NULL) APIERROR(myDict->getNdbError());
  NdbTransaction *myTransaction= myNdb->startTransaction();
  if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
  myOperation->insertTuple();
  myOperation->setValue("NOTE", str);
  if (myTransaction->execute( NdbTransaction::Commit ) == -1)
    APIERROR(myTransaction->getNdbError());
  myNdb->closeTransaction(myTransaction);
 }

We can then use the code to make the addition through the NDB API. We use one terminal to run the listener and then another to run the code to add the tuple.

[billy@ws1 stored]$ ./trigger_listener localhost:1186 100
[billy@ws1 stored]$ ./test_stored_procedures localhost:1186 Keyboard

mysql> select * from ASSETS;
+----------+-------+
| NAME     | VALUE |
+----------+-------+
| Keyboard |   555 |
| Computer |   350 |
| Monitor  |   555 |
+----------+-------+
3 rows in set (0.00 sec)

mysql> select * from AUDIT_LOG;
+-----------+
| NOTE      |
+-----------+
| Computer  |
| Keyboard  |
+-----------+
2 rows in set (0.00 sec)

A major advantage of this approach is that the trigger is implemented within the Cluster database and so is invoked regardless of where the INSERT is requested – whether it be through the NDB API or through any of the MySQL Servers. This is shown in the results that follow.

mysql> drop trigger ins_asset;
Query OK, 0 rows affected (0.00 sec)

mysql> drop procedure log_it;
Query OK, 0 rows affected (0.00 sec)
mysql> insert into ASSETS values("Printers", 200);
Query OK, 1 row affected (0.00 sec)

mysql> select * from ASSETS;
+----------+-------+
| NAME     | VALUE |
+----------+-------+
| Keyboard |   555 |
| Computer |   350 |
| Monitor  |   555 |
| Printers |   200 |
+----------+-------+
4 rows in set (0.00 sec)

mysql> select * from AUDIT_LOG;
+-----------+
| NOTE      |
+-----------+
| Printers  |
| Keyboard  |
| Computer  |
+-----------+
4 rows in set (0.00 sec)

Note that I first removed the original trigger and stored procedure that were defined in the MySQL Server.

There is another key difference between MySQL triggers and NDB events – triggers are executed as part of the MySQL transaction making the main database change whereas NDB events happen asynchronously. The effect of this is:

  • The original transaction will commit succesfully before the side effects have been processed
  • If the process waiting for the event disappears then the side effect will not be processed – for this reson, you may want to consider an audit/clean-up scripts  to cover these cases.

Conclusion

Stored procedures are fully supported for users or applications which access a Cluster database through a MySQL Server (whether directly using SQL or through any of the numerous connectors that are available). Applications which access the database through the NDB API have the full flexibility of C++ to implement functionality that can achieve the same results. Triggers are available whichever method is used to access the database – albeit with different implementations and slightly different functionality.