-
Using ClusterJ (part of MySQL Cluster Connector for Java) – a tutorial
Posted on March 30th, 2010 1 commentClusterJ is part of the MySQL Cluster Connector for Java which is currently in beta as part of MySQL Cluster 7.1. It is designed to provide a high performance method for Java applications to store and access data in a MySQL Cluster database. It is also designed to be easy for Java developers to use and is “in the style of” Hibernate/Java Data Objects (JDO) and JPA. It uses the Domain Object Model DataMapper pattern:
- Data is represented as domain objects
- Domain objects are separate from business logic
- Domain objects are mapped to database tables
The purpose of ClusterJ is to provide a mapping from the table-oriented view of the data stored in MySQL Cluster to the Java objects used by the application. This is achieved by annotating interfaces representing the Java objects; where each persistent interface is mapped to a table and each property in that interface to a column. By default, the table name will match the interface name and the column names match the property names but this can be overridden using annotations.
If the table does not already exist (for example, this is a brand new application with new data) then the table must be created manually – unlike OpenJPA, ClusterJ will not create the table automatically.
Figure 2 shows an example of an interface that has been created in order to represent the data held in the ‘employee’ table.
ClusterJ uses the following concepts:
-
SessionFactory: There is one instance per MySQL Cluster instance for each Java Virtual Machine (JVM). The SessionFactory object is used by the application to get hold of sessions. The configuration details for the ClusterJ instance are defined in the Configuration properties which is an artifact associated with the SessionFactory.
- Session: There is one instance per user (per Cluster, per JVM) and represents a Cluster connection
- Domain Object: Objects representing the data from a table. The domain objects (and their relationships to the Cluster tables) are defined by annotated interfaces (as shown in the right-hand side of Figure 2.
- Transaction: There is one transaction per session at any point in time. By default, each operation (query, insert, update, or delete) is run under a new transaction. . The Transaction interface allows developers to aggregate multiple operations into a single, atomic unit of work.
ClusterJ will be suitable for many Java developers but it has some restrictions which may make OpenJPA with the ClusterJPA plug-in more appropriate. These ClusterJ restrictions are:
- Persistent Interfaces rather than persistent classes. The developer provides the signatures for the getter/setter methods rather than the properties and no extra methods can be added.
- No Relationships between properties or between objects can be defined in the domain objects. Properties are primitive types.
- No Multi-table inheritance; there is a single table per persistent interface
- No joins in queries (all data being queried must be in the same table/interface)
- No Table creation – user needs to create tables and indexes
- No Lazy Loading – entire record is loaded at one time, including large object (LOBs).
Tutorial
This tutorial uses MySQL Cluster 7.1.2a on Fedora 12. If using earlier or more recent versions of MySQL Cluster then you may need to change the class-paths as explained in http://dev.mysql.com/doc/ndbapi/en/mccj-using-clusterj.html
It is necessary to have MySQL Cluster up and running. For simplicity all of the nodes (processes) making up the Cluster will be run on the same physical host, along with the application.
These are the MySQL Cluster configuration files being used :
config.ini:
[ndbd default]noofreplicas=2 datadir=/home/billy/mysql/my_cluster/data [ndbd] hostname=localhost id=3 [ndbd] hostname=localhost id=4 [ndb_mgmd] id = 1 hostname=localhost datadir=/home/billy/mysql/my_cluster/data [mysqld] hostname=localhost id=101 [api] hostname=localhostmy.cnf:
[mysqld] ndbcluster datadir=/home/billy/mysql/my_cluster/data basedir=/usr/local/mysqlThis tutorial focuses on ClusterJ rather than on running MySQL Cluster; if you are new to MySQL Cluster then refer to running a simple Cluster before trying this tutorial.
ClusterJ needs to be told how to connect to our MySQL Cluster database; including the connect string (the address/port for the management node), the database to use, the user to login as and attributes for the connection such as the timeout values. If these parameters aren’t defined then ClusterJ will fail with run-time exceptions. This information represents the “configuration properties” shown in Figure 3. These parameters can be hard coded in the application code but it is more maintainable to create a clusterj.properties file that will be imported by the application. This file should be stored in the same directory as your application source code.
clusterj.properties:
com.mysql.clusterj.connectstring=localhost:1186 com.mysql.clusterj.database=clusterdb com.mysql.clusterj.connect.retries=4 com.mysql.clusterj.connect.delay=5 com.mysql.clusterj.connect.verbose=1 com.mysql.clusterj.connect.timeout.before=30 com.mysql.clusterj.connect.timeout.after=20 com.mysql.clusterj.max.transactions=1024As ClusterJ will not create tables automatically, the next step is to create ‘clusterdb’ database (referred to in clusterj.properties) and the ‘employee’ table:
[billy@ws1 ~]$ mysql -u root -h 127.0.0.1 -P 3306 -u root mysql> create database clusterdb;use clusterdb; mysql> CREATE TABLE employee ( -> id INT NOT NULL PRIMARY KEY, -> first VARCHAR(64) DEFAULT NULL, -> last VARCHAR(64) DEFAULT NULL, -> municipality VARCHAR(64) DEFAULT NULL, -> started VARCHAR(64) DEFAULT NULL, -> ended VARCHAR(64) DEFAULT NULL, -> department INT NOT NULL DEFAULT 1, -> UNIQUE KEY idx_u_hash (first,last) USING HASH, -> KEY idx_municipality (municipality) -> ) ENGINE=NDBCLUSTER;The next step is to create the annotated interface:
Employee.java:
import com.mysql.clusterj.annotation.Column; import com.mysql.clusterj.annotation.Index; import com.mysql.clusterj.annotation.PersistenceCapable; import com.mysql.clusterj.annotation.PrimaryKey;@PersistenceCapable(table="employee") @Index(name="idx_uhash") public interface Employee {@PrimaryKey int getId(); void setId(int id);String getFirst(); void setFirst(String first); String getLast(); void setLast(String last);@Column(name="municipality") @Index(name="idx_municipality") String getCity(); void setCity(String city);String getStarted(); void setStarted(String date);String getEnded(); void setEnded(String date);Integer getDepartment(); void setDepartment(Integer department); }The name of the table is specified in the annotation @PersistenceCapable(table=”employee”) and then each column from the employee table has an associated getter and setter method defined in the interface. By default, the property name in the interface is the same as the column name in the table – the column name has been overridden for the City property by explicitly including the @Column(name=”municipality”) annotation just before the associated getter method. The @PrimaryKey annotation is used to identify the property whose associated column is the Primary Key in the table. ClusterJ is made aware of the existence of indexes in the database using the @Index annotation.
The next step is to write the application code which we step through here block by block; the first of which simply contains the import statements and then loads the contents of the clusterj.properties defined above:
Main.java (part 1):
import com.mysql.clusterj.ClusterJHelper; import com.mysql.clusterj.SessionFactory; import com.mysql.clusterj.Session; import com.mysql.clusterj.Query; import com.mysql.clusterj.query.QueryBuilder; import com.mysql.clusterj.query.QueryDomainType;import java.io.File; import java.io.InputStream; import java.io.FileInputStream; import java.io.*;import java.util.Properties; import java.util.List;public class Main {public static void main (String[] args) throws java.io.FileNotFoundException,java.io.IOException {// Load the properties from the clusterj.properties fileFile propsFile = new File("clusterj.properties"); InputStream inStream = new FileInputStream(propsFile); Properties props = new Properties(); props.load(inStream);//Used later to get userinput BufferedReader br = new BufferedReader(new InputStreamReader(System.in));The next step is to get a handle for a SessionFactory from the ClusterJHelper class and then use that factory to create a session (based on the properties imported from clusterj.properties file.
Main.java (part 2):
// Create a session (connection to the database) SessionFactory factory = ClusterJHelper.getSessionFactory(props); Session session = factory.getSession();Now that we have a session, it is possible to instantiate new Employee objects and then persist them to the database. Where there are no transaction begin() or commit() statements, each operation involving the database is treated as a separate transaction.
Main.java (part 3):
// Create and initialise an Employee Employee newEmployee = session.newInstance(Employee.class); newEmployee.setId(988); newEmployee.setFirst("John"); newEmployee.setLast("Jones"); newEmployee.setStarted("1 February 2009"); newEmployee.setDepartment(666);// Write the Employee to the database session.persist(newEmployee);At this point, a row will have been added to the ‘employee’ table. To verify this, a new Employee object is created and used to read the data back from the ‘employee’ table using the primary key (Id) value of 998:
Main.java (part 4):
// Fetch the Employee from the database Employee theEmployee = session.find(Employee.class, 988);if (theEmployee == null) {System.out.println("Could not find employee");} else {System.out.println ("ID: " + theEmployee.getId() + "; Name: " + theEmployee.getFirst() + " " + theEmployee.getLast()); System.out.println ("Location: " + theEmployee.getCity()); System.out.println ("Department: " + theEmployee.getDepartment()); System.out.println ("Started: " + theEmployee.getStarted()); System.out.println ("Left: " + theEmployee.getEnded()); }This is the output seen at this point:
ID: 988; Name: John Jones Location: null Department: 666 Started: 1 February 2009 Left: null Check the database before I change the Employee - hit return when you are doneThe next step is to modify this data but it does not write it back to the database yet:
Main.java (part 5):
// Make some changes to the Employee & write back to the database theEmployee.setDepartment(777); theEmployee.setCity("London");System.out.println("Check the database before I change the Employee - hit return when you are done"); String ignore = br.readLine();The application will pause at this point and give you chance to check the database to confirm that the original data has been added as a new row but the changes have not been written back yet:
mysql> select * from clusterdb.employee; +-----+-------+-------+--------------+-----------------+-------+------------+ | id | first | last | municipality | started | ended | department | +-----+-------+-------+--------------+-----------------+-------+------------+ | 988 | John | Jones | NULL | 1 February 2009 | NULL | 666 | +-----+-------+-------+--------------+-----------------+-------+------------+After hitting return, the application will continue and write the changes to the table, using an automatic transaction to perform the update.
Main.java (part 6):
session.updatePersistent(theEmployee);System.out.println("Check the change in the table before I bulk add Employees - hit return when you are done"); ignore = br.readLine();The application will again pause so that we can now check that the change has been written back (persisted) to the database:
mysql> select * from clusterdb.employee; +-----+-------+-------+--------------+-----------------+-------+------------+ | id | first | last | municipality | started | ended | department | +-----+-------+-------+--------------+-----------------+-------+------------+ | 988 | John | Jones | London | 1 February 2009 | NULL | 777 | +-----+-------+-------+--------------+-----------------+-------+------------+The application then goes onto create and persist 100 new employees. To improve performance, a single transaction is used to that all of the changes can be written to the database at once when the commit() statement is run:
Main.java (part 7):
// Add 100 new Employees - all as part of a single transaction newEmployee.setFirst("Billy"); newEmployee.setStarted("28 February 2009");session.currentTransaction().begin();for (int i=700;i<800;i++) { newEmployee.setLast("No-Mates"+i); newEmployee.setId(i+1000); newEmployee.setDepartment(i); session.persist(newEmployee); }session.currentTransaction().commit();The 100 new employees will now have been persisted to the database. The next step is to create and execute a query that will search the database for all employees in department 777 by using a QueryBuilder and using that to build a QueryDomain that compares the ‘department’ column with a parameter. After creating the, the department parameter is set to 777 (the query could subsequently be reused with different department numbers). The application then runs the query and iterates through and displays each of employees in the result set:
Main.java (part 8):
// Retrieve the set all of Employees in department 777 QueryBuilder builder = session.getQueryBuilder(); QueryDomainType<Employee> domain = builder.createQueryDefinition(Employee.class); domain.where(domain.get("department").equal(domain.param( "department"))); Query<Employee> query = session.createQuery(domain); query.setParameter("department",777);List<Employee> results = query.getResultList(); for (Employee deptEmployee: results) { System.out.println ("ID: " + deptEmployee.getId() + "; Name: " + deptEmployee.getFirst() + " " + deptEmployee.getLast()); System.out.println ("Location: " + deptEmployee.getCity()); System.out.println ("Department: " + deptEmployee.getDepartment()); System.out.println ("Started: " + deptEmployee.getStarted()); System.out.println ("Left: " + deptEmployee.getEnded()); }System.out.println("Last chance to check database before emptying table - hit return when you are done"); ignore = br.readLine();At this point, the application will display the following and prompt the user to allow it to continue:
ID: 988; Name: John Jones Location: London Department: 777 Started: 1 February 2009 Left: null ID: 1777; Name: Billy No-Mates777 Location: null Department: 777 Started: 28 February 2009 Left: nullWe can compare that output with an SQL query performed on the database:
mysql> select * from employee where department=777; +------+-------+-------------+--------------+------------------+-------+------------+ | id | first | last | municipality | started | ended | department | +------+-------+-------------+--------------+------------------+-------+------------+ | 988 | John | Jones | London | 1 February 2009 | NULL | 777 | | 1777 | Billy | No-Mates777 | NULL | 28 February 2009 | NULL | 777 | +------+-------+-------------+--------------+------------------+-------+------------+Finally, after pressing return again, the application will remove all employees:
Main.java (part 9):
session.deletePersistentAll(Employee.class); } }As a final check, an SQL query confirms that all of the rows have been deleted from the ‘employee’ table.
mysql> select * from employee; Empty set (0.00 sec)Compiling and running the ClusterJ tutorial code
javac -classpath /usr/local/mysql/share/mysql/java/clusterj-api.jar:. Main.java Employee.javajava -classpath /usr/local/mysql/share/mysql/java/clusterj.jar:. -Djava.library.path=/usr/local/mysql/lib Main
Download the source code for this tutorial from here (together with the code for the up-coming ClusterJPA tutorial).
-
MySQL Cluster Connector for Java – replay available for part 1 of the webinar
Posted on February 19th, 2010 No commentsThe replay of the two webinars can now be accesed from mysql.com
Remember that the second part of the webinar will be on March 3rd (details below).
MySQL have been working on a new way of accessing MySQL Cluster using Java. Designed for Java developers, the MySQL Cluster Connector for Java implements an easy-to-use and high performance native Java interface and OpenJPA plug-in that maps Java classes to tables stored in the high availability, real-time MySQL Cluster database.
There is a series of 2 webinars coming up, as always these are free to attend – you just need to register in advance:
Part 1: Tuesday, February 16, 2010: 10:00 Pacific time
- an overview of the MySQL Cluster Connector for Java
- what these technologies bring to Java developers
- implementation details of the MySQL Cluster Java API and Plug-In for OpenJPA
- configuring the connection to MySQL Cluster
- creating the Java Domain Object Model for your tables
- managing insert, update, and delete operations
- querying the database
- how to get started developing new Java applications using these interfaces
Accessfrom mysql.com
an overview of the MySQL Cluster Connector for Javawhat these technologies bring to Java developersimplementation details of the MySQL Cluster Java API and Plug-In for OpenJPAconfiguring the connection to MySQL Clustercreating the Java Domain Object Model for your tablesmanaging insert, update, and delete operationsquerying the databasehow to get started developing new Java applications using these interfacesPart 2: Wednesday, March 03, 2010: 10:00 Pacific time
- how MySQL Cluster Connector for Java coexists with existing OpenJPA / TopLink / JDBC-based apps
- how to evaluate the MySQL Cluster Connector for Java alternatives
- performance comparisons with both existing Java access and with native NDB API access to MySQL Cluster
- what the future holds for this technology
Wed, Mar 03: 08:00 Hawaii time
Wed, Mar 03: 11:00 Mountain time (America)
Wed, Mar 03: 12:00 Central time (America)
Wed, Mar 03: 13:00 Eastern time (America)
Wed, Mar 03: 18:00 UTC
Wed, Mar 03: 18:00 Western European time
Wed, Mar 03: 19:00 Central European time
Wed, Mar 03: 20:00 Eastern European timeThis functionality isn’t GA but it is available for you to try and we’d love to get feedback (which you can provide through the MySQL Cluster forum or by emailing cluster@lists.mysql.com
If you want to see for yourself then take a look at the Blog entry from Bernhard Ocklin – the engineering manager responsible for this work.
-
New white paper: Guide to Optimizing Performance of the MySQL Cluster Database
Posted on December 7th, 2009 2 commentsThis guide explores how to tune and optimize the MySQL Cluster database to handle diverse workload requirements. It discusses data access patterns and how to build distribution awareness into applications, before exploring schema and query optimization, tuning of parameters and how to get the best out of the latest innovations in hardware design.
The Guide concludes with recent performance benchmarks conducted with the MySQL Cluster database, an overview of how MySQL Cluster can be integrated with other MySQL storage engines, before summarizing additional resources that will enable you to optimize MySQL Cluster performance with your applications.
Download the white paper (as always, for free) from: http://www.mysql.com/why-mysql/white-papers/mysql_wp_cluster_perfomance.php
-
Using NDB API Events to mask/hide colum data when replicating
Posted on August 13th, 2009 No commentsIf you have asynchronous replication where the slave database is using MySQL Cluster then you can use the NDB API events functionality to mask/overwrite data. You might do this for example if the replica is to be used for generating reports where some of the data is sensitive and not relevant to those reports. Unlike stored procedures, NDB API events will be triggered on the slave.
The first step is to set up replication (master->slave rather than multi-master) as described in Setting up MySQL Asynchronous Replication for High Availability).
In this example, the following table definition is used:
mysql> use clusterdb; mysql> create table ASSETS (CODE int not null primary key, VALUE int) engine=ndb;
The following code should be compiled and then executed on a node within the slave Cluster:
#include <NdbApi.hpp> #include <stdio.h> #include <iostream> #include <unistd.h> #include <cstdlib> #include <string.h> #define APIERROR(error) \ { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \ << error.code << ", msg: " << error.message << "." << std::endl; \ exit(-1); } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnName, const int noEventColumnName); static void do_blank(Ndb*, int); int main(int argc, char** argv) { if (argc < 1) { std::cout << "Arguments are <connect_string cluster>.\n"; exit(-1); } const char *connectstring = argv[1]; ndb_init(); Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(connectstring); // Object representing the cluster int r= cluster_connection->connect(5 /* retries */, 3 /* delay between retries */, 1 /* verbose */); if (r > 0) { std::cout << "Cluster connect failed, possibly resolved with more retries.\n"; exit(-1); } else if (r < 0) { std::cout << "Cluster connect failed.\n"; exit(-1); } if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } Ndb* myNdb= new Ndb(cluster_connection, "clusterdb"); // Object representing the database if (myNdb->init() == -1) APIERROR(myNdb->getNdbError()); const char *eventName= "CHNG_IN_ASSETS"; const char *eventTableName= "ASSETS"; const int noEventColumnName= 2; const char *eventColumnName[noEventColumnName]= {"CODE", "VALUE"}; // Create events myCreateEvent(myNdb, eventName, eventTableName, eventColumnName, noEventColumnName); // Normal values and blobs are unfortunately handled differently.. typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH; int i; // Start "transaction" for handling events NdbEventOperation* op; printf("create EventOperation\n"); if ((op = myNdb->createEventOperation(eventName)) == NULL) APIERROR(myNdb->getNdbError()); printf("get values\n"); RA_BH recAttr[noEventColumnName]; RA_BH recAttrPre[noEventColumnName]; for (i = 0; i < noEventColumnName; i++) { recAttr[i].ra = op->getValue(eventColumnName[i]); recAttrPre[i].ra = op->getPreValue(eventColumnName[i]); } // set up the callbacks // This starts changes to "start flowing" if (op->execute()) APIERROR(op->getNdbError()); while (true) { int r = myNdb->pollEvents(1000); // wait for event or 1000 ms if (r > 0) { while ((op= myNdb->nextEvent())) { NdbRecAttr* ra = recAttr[0].ra; if (ra->isNULL() >= 0) { // we have a value if (ra->isNULL() == 0) { // we have a non-null value printf("CODE: %d ", ra->u_32_value()); do_blank(myNdb, ra->u_32_value()); } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); // no value ra = recAttr[1].ra; printf("\n"); } } } } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnNames, const int noEventColumnNames) { NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); if (!myDict) APIERROR(myNdb->getNdbError()); const NdbDictionary::Table *table= myDict->getTable(eventTableName); if (!table) APIERROR(myDict->getNdbError()); NdbDictionary::Event myEvent(eventName, *table); myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); myEvent.addEventColumns(noEventColumnNames, eventColumnNames); // Add event to database if (myDict->createEvent(myEvent) == 0) myEvent.print(); else if (myDict->getNdbError().classification == NdbError::SchemaObjectExists) { printf("Event creation failed, event exists\n"); printf("dropping Event...\n"); if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError()); // try again // Add event to database if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError()); } else APIERROR(myDict->getNdbError()); return 0; } static void do_blank(Ndb* myNdb, int code) { const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("ASSETS"); if (myTable == NULL) APIERROR(myDict->getNdbError()); NdbTransaction *myTransaction= myNdb->startTransaction(); if (myTransaction == NULL) APIERROR(myNdb->getNdbError()); printf("Replacing VALUE with 0 for CODE: %d ", code); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->updateTuple(); myOperation->equal("CODE", code); myOperation->setValue("VALUE", 0); if (myTransaction->execute( NdbTransaction::Commit ) == -1) APIERROR(myTransaction->getNdbError()); myNdb->closeTransaction(myTransaction); } shell> slave_filter 127.0.0.1:1186From the master Cluster, insert some values (note that the example can easily be extended to cover updates too):
mysql> insert into ASSETS values (101, 50),(102, 40), (103, 99);
and then check that on the slave the value has been set to 0 for each of the entries:
mysql> select * from ASSETS; +------+-------+ | CODE | VALUE | +------+-------+ | 100 | 0 | | 103 | 0 | | 101 | 0 | | 102 | 0 | +------+-------+
How this works…. The table data is replicated as normal and the real values are stored in the slave. The “slave_filter” process has registered against insert operations on this table and when it’s triggered it sets the VALUE field to 0. The event is processes asynchronously from the replication and so there will be some very narrow window during which the true values would be stored in the slave.
-
Doxygen output for MySQL Cluster NDB API & MGM API
Posted on July 20th, 2009 No comments
A new page has been added to this site: NDB API Docs which presents the information from the header files for both the NDB API and the NDB Management API.
The material has been generated using doxygen and will be refreshed shortly after any new major, minor or maintenance release is made generally available (starting from MySQL Cluster 7.0.6). -
Intelligent user-controlled partitioning and writing distribution-aware NDB API Applications
Posted on July 6th, 2009 2 commentsDefault partitioning
When adding rows to a table that’s using MySQL Cluster as the storage engine, each row is assigned to a partition where that partition is mastered by a particular data node in the Cluster. The best performance comes when all of the data required to satisfy a transaction is held within a single partition so that it can be satisfied within a single data node rather than being bounced back and forth between multiple nodes where extra latency will be introduced.
By default, Cluster partions the data by hashing the primary key. This is not always optimal.
For example, if we have 2 tables, the first using a single-column primary key (sub_id) and the second using a composite key (sub_id, service_name)…
mysql> describe names; +--------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +--------+-------------+------+-----+---------+-------+ | sub_id | int(11) | NO | PRI | NULL | | | name | varchar(30) | YES | | NULL | | +--------+-------------+------+-----+---------+-------+ mysql> describe services; +--------------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +--------------+-------------+------+-----+---------+-------+ | sub_id | int(11) | NO | PRI | 0 | | | service_name | varchar(30) | NO | PRI | | | | service_parm | int(11) | YES | | NULL | | +--------------+-------------+------+-----+---------+-------+
If we then add data to these (initially empty) tables, we can then use the ‘explain’ command to see which partitions (and hence phyical hosts) are used to store the data for this single subscriber…
mysql> insert into names values (1,'Billy'); mysql> insert into services values (1,'VoIP',20),(1,'Video',654),(1,'IM',878),(1,'ssh',666); mysql> explain partitions select * from names where sub_id=1; +----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra | +----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+ | 1 | SIMPLE | names | p3 | const | PRIMARY | PRIMARY | 4 | const | 1 | | +----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+ mysql> explain partitions select * from services where sub_id=1; +----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra | +----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+ | 1 | SIMPLE | services | p0,p1,p2,p3 | ref | PRIMARY | PRIMARY | 4 | const | 10 | | +----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+
The service records for the same subscriber (sub_id = 1) are split accross 4 diffent partitions (p0, p1, p2 & p3). This means that the query results in messages being passed backwards and forwards between the 4 different data nodes which cnsumes extra CPU time and incurs extra latency.
User-defined partitioning to the rescue
We can override the default behaviour by telling Cluster which fields should be fed into the hash algorithm. For our example, it’s reasonable to expect a transaction to access multiple records for the same subscriber (identified by their sub_id) and so the application will perform best if all of the rows for that sub_id are held in the same partition…
mysql> drop table services; mysql> create table services (sub_id int, service_name varchar (30), service_parm int, primary key (sub_id, service_name)) engine = ndb -> partition by key (sub_id); mysql> insert into services values (1,'VoIP',20),(1,'Video',654),(1,'IM',878),(1,'ssh',666); mysql> explain partitions select * from services where sub_id=1; +----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra | +----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+ | 1 | SIMPLE | services | p3 | ref | PRIMARY | PRIMARY | 4 | const | 10 | | +----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+
Now all of the rows for sub_id=1 from the services table are now held within a single partition (p3) which is the same as that holding the row for the same sub_id in the names table. Note that it wasn’t necessary to drop, recreate and re-provision the services table, the following command would have had the same effect:
mysql> alter table services partition by key (sub_id);
Writing a distribution-aware application using the NDB API
In our example, the data is nicely partitioned for optimum performance when accessing all of the subscriber’s data – a single data node holding all of their data. However, there is another step to take to get the best out of your NDB-API based application. By default, the NDB API will use the Transaction Coordinator (TC) on a ‘random’ data node to handle the transaction – we could get lucky and the guess is correct but it’s more likely that it will be sent to the wrong data node which with then have to proxy it to the correct data node. The probability of getting it right first time reduces as the number of node groups increases and so can prevent linear scaling.
It’s very simple to modify this behaviour so that the best data node/TC is hit first time, every time. When creating the transaction, the application can include parameters telling the NDB API one of the tables to be accessed and for what key(s). The NDB API will then use that information to identify the best TC to use…
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *namesTable= myDict->getTable("names"); const NdbDictionary::Table *servicesTable= myDict->getTable("services"); NdbRecAttr *myRecAttr; Ndb::Key_part_ptr dist_key[2]; dist_key[0].ptr = (const void*) &sub_id; dist_key[0].len = sizeof(sub_id); dist_key[1].ptr = NULL; dist_key[1].len = NULL; if (namesTable == NULL) APIERROR(myDict->getNdbError()); if (servicesTable == NULL) APIERROR(myDict->getNdbError()); NdbTransaction *myTransaction= myNdb.startTransaction(namesTable, dist_key); if (myTransaction == NULL) APIERROR(myNdb.getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(namesTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->readTuple(NdbOperation::LM_Read); myOperation->equal("sub_id",sub_id); myRecAttr= myOperation->getValue("name", NULL); if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError()); // Perform operations on "services" table as well as part of another operation // if required; the subscriber's data will be in the same data node if (myTransaction->execute( NdbTransaction::Commit ) == -1) APIERROR(myTransaction->getNdbError()); printf(" %2d %s\n", sub_id, myRecAttr->aRef()); myNdb.closeTransaction(myTransaction);Note that as the services table has been configured to use the same field (sub_id) for partitioning as the names table, the startTransaction method only needs to know about the namesTable as the TC that the NDB API selects will serve just as well for this subscriber’s data from the services table. The rest of the code can be found in distaware.
-
Batching – improving MySQL Cluster performance when using the NDB API
Posted on June 29th, 2009 1 commentAs many people are aware, the best performance can be achieved from MySQL Cluster by using the native (C++) NDB API (rather than using SQL via a MySQL Server). What’s less well known is that you can improve the performance of your NDB-API enabled application even further by ‘batching’. This article attempts to explain why batching helps and how to do it.
What is batching and why does it help?
Batching involves sending multiple operations from the application to the Cluster in one group rather than individually; the Cluster then processes these operations and sends back the results. Without batching, each of these operations incurs the latency of crossing the network as well as consuming CPU time on both the application and data node hosts.
By batching together multiple operations, all of the requests can be sent in one message and all of the replies received in another – thus reducing the number of messages and hence the latency and CPU time consumed.
How to use batching with the MySQL Cluster NDB API
The principle is that you batch together as many operations as you can, execute them together and then interpret the results. After interpretting the results, the application may then decide to send in another batch of operations.
An NDB API transaction consists of one or more operations where each operation (currently) acts on a single table and could be a simple primary key read or write or a complex table scan.
The operation is not sent to the Cluster at the point that it’s defined. Instead, the application must explicitly request that all operations defined within the transaction up to that point be executed – at which point, the NDB API can send the batch of operations to the data nodes to be processed. The application may request that the transaction be committed at that point or it may ask for the transaction to be held open so that it can analyse the results from the first set of operations and then use that information within a subsequent series of operations and then commit the transaction after executing that second batch of operations.
The following code sample shows how this can be implemented in practice (note that the application logic and all error handling has been ommited).
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("tbl1"); const NdbDictionary::Table *myTable2= myDict->getTable("tbl2"); NdbTransaction *myTransaction= myNdb.startTransaction(); // Read all of the required data as part of a single batch NdbOperation *myOperation= myTransaction->getNdbOperation(myTable1); myOperation->readTuple(NdbOperation::LM_Read); myOperation->equal("ref", asset_num); myRecAttr= myOperation->getValue("cost", NULL); NdbOperation *myOperation2= myTransaction->getNdbOperation(myTable2); myOperation2->readTuple(NdbOperation::LM_Read); myOperation2->equal("ref", asset_num); myRecAttr= myOperation->getValue("volume", NULL); myTransaction->execute(NdbTransaction::NoCommit); // NOT SHOWN: Application logic interprets results from first set of operations // Based on the data read during the initial batch, make the necessary changes myOperation *myOperation3= myTransaction->getNdbOperation(myTable1); myOperation3->updateTuple(); myOperation3->equal("ref", asset_num); myOperation2->setValue("cost", new_cost); myOperation *myOperation4= myTransaction->getNdbOperation(myTable2); myOperation4->updateTuple(); myOperation4->equal("ref", asset_num); myOperation4->setValue("volume", new_volume); myTransaction->execute( NdbTransaction::Commit); myNdb.closeTransaction(myTransaction); -
Are Stored Procedures available with MySQL Cluster?
Posted on May 1st, 2009 No commentsThe answer is yes – kind of.
Stored procedures are implemented in a MySQL Server and can be used regardless of the storage engine being used for a specific table. One inference from this is that they won’t work when accessing the Cluster database directly through the NDB API.
This leads to the question of whether or not that limitation actually restricts what you can achieve. This article gives a brief introduction to stored procedures and looks at how the same results can be achieved using the NDB API.
Stored procedures provide a rudimentary way of implementing functionality within the database (rather than in the application code). They are implemented by the database designer and have the ability to perform computations as well as make changes to the data in the database. A typical use of stored procedures would be to control all access to the data by a user or application – for example, to impose extra checks on the data or make sure that all linked data is updated rather than leaving it to the user or application designer to always remember to do it. To impose this, the DBA could grant permission to users to call the stored procedures but not write to the tables directly.
This functionality can be very useful when the data is being accessed through the SQL interface. If using the NDB API then you have the full power of the C++ language at your disposal and so a designer can code whatever checks and side effects are needed within a wrapper method and then have applications use those methods rather than accessing the raw NDB API directly for those changes.
There is one piece of functionality available using stored procedures which could be very helpful to applications using the NDB API – triggers. The rest of this article explains what triggers are; how they’re used and how that same results can be achieved using the NDB API.
Triggers
Triggers allow stored code to be invoked as a side effect of SQL commands being executed on the database through a MySQL Server. The database designer can implement a stored procedure and then register it to be invoked when specific actions (INSERT, DELETE etc.) are performed on a table.
The following example shows how a simple stored procedure can be implemented and then registered against a table.
mysql> USE test; Database changed mysql> create table ASSETS (NAME varchar(30) not null primary key,VALUE int) engine=ndb; Query OK, 0 rows affected (0.67 sec) mysql> create table AUDIT_LOG (NOTE varchar(30) not NULL primary key) engine=ndb; Query OK, 0 rows affected (0.56 sec) mysql> delimiter // mysql> create procedure log_it (log_string varchar(30)) -> begin -> insert into AUDIT_LOG values(log_string); -> end -> // Query OK, 0 rows affected (0.00 sec) mysql> delimiter ; mysql> create trigger ins_asset before insert on ASSETS -> for each row call log_it(new.name); Query OK, 0 rows affected (0.00 secThe stored procedure in this example is triggered whenever a new tuple is inserted into the ASSETS table. The procedure then inserts the asset’s name into the AUDIT_LOG table. If the tuple is deleted from the ASSETS table then the entry in the AUDIT_LOG table remains intact.
The following screen capture shows the results when adding a tuple to the table that contains the trigger.
mysql> insert into ASSETS values ('Computer',350); Query OK, 1 row affected (0.01 sec) mysql> select * from AUDIT_LOG; +----------+ | NOTE | +----------+ | Computer | +----------+ 1 row in set (0.00 sec)Note that as the trigger and stored procedure are implemented in the MySQL Server, they need to be separately defined in all of the MySQL Server instances where they are needed.
The following NDB API code adds a new tuple to the ASSETS table in much the same way as was done through SQL above (Note: my C++ is very rusty and so there will be glitches in this code – especially for string handling).
#include <NdbApi.hpp> #include <stdio.h> #include <string.h> #include <iostream> #include <cstdlib> static void run_application(Ndb_cluster_connection &, char*); #define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } int main(int argc, char** argv) { if (argc != 3) { std::cout << "Arguments are <connect_string cluster><asset_name>.\n"; exit(-1); } ndb_init(); // connect to cluster and run application { const char *connectstring = argv[1]; char *asset_name = argv[2]; // Object representing the cluster Ndb_cluster_connection cluster_connection(connectstring); // Connect to cluster management server (ndb_mgmd) if (cluster_connection.connect(4 /* retries */, 5 /* delay between retries */, 1 /* verbose */)) { std::cout << "Cluster management server was not ready within 30 secs.\n"; exit(-1); } // Connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } // run the application code run_application(cluster_connection, asset_name); } ndb_end(0); return 0; } static void do_insert(Ndb &, char*); static void run_application(Ndb_cluster_connection &cluster_connection, char *asset_name) { /******************************************** * Connect to database via NdbApi * ********************************************/ // Object representing the database Ndb myNdb( &cluster_connection, "test" ); if (myNdb.init()) APIERROR(myNdb.getNdbError()); do_insert(myNdb, asset_name); } static void do_insert(Ndb &myNdb, char *asset_name) { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("ASSETS"); char str[20]; str[0] = strlen(asset_name); strcpy(str +1, asset_name); if (myTable == NULL) APIERROR(myDict->getNdbError()); NdbTransaction *myTransaction= myNdb.startTransaction(); if (myTransaction == NULL) APIERROR(myNdb.getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->insertTuple(); myOperation->setValue("NAME", str); myOperation->setValue("VALUE", 555); if (myTransaction->execute( NdbTransaction::Commit ) == -1) APIERROR(myTransaction->getNdbError()); myNdb.closeTransaction(myTransaction); }This code can then be executed and then the effects verified using SQL commands through the MySQL Server – note that the stored procedure has not been triggered and so the name has not been copied into the AUDIT_LOG table.
[billy@ws1 stored]$ ./test_stored_procedures localhost:1186 Monitor mysql> select * from ASSETS; +----------+-------+ | NAME | VALUE | +----------+-------+ | Monitor | 555 | | Computer | 350 | +----------+-------+ 2 rows in set (0.01 sec) mysql> select * from AUDIT_LOG; +----------+ | NOTE | +----------+ | Computer | +----------+ 1 row in set (0.00 sec)
It could easily be argued that triggers are not required when using the NDB API – simply code a wrapper method that also applies the required side effects. However, it is possible to come up with scenarios where triggers would be much more convenient – for example if the application is already littered with accesses to the data and you want to retrofit the side effect.
Fortunately, the NDB API includes the ability to register triggers against operations for a specific table. The code that follows implements a process that waits for an INSERT to be performed on the ASSETS table and then creates an entry in the AUDIT_LOG table just as the earlier stored procedure did.
#include <NdbApi.hpp> #include <stdio.h> #include <iostream> #include <unistd.h> #include <cstdlib> #include <string.h> #define APIERROR(error) \ { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \ << error.code << ", msg: " << error.message << "." << std::endl; \ exit(-1); } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnName, const int noEventColumnName); static void do_insert(Ndb*, char*); int main(int argc, char** argv) { if (argc < 2) { std::cout << "Arguments are <connect_string cluster> <timeout>].\n"; exit(-1); } const char *connectstring = argv[1]; int timeout = atoi(argv[2]); ndb_init(); Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(connectstring); int r= cluster_connection->connect(5 /* retries */, 3 /* delay between retries */, 1 /* verbose */); if (r > 0) { std::cout << "Cluster connect failed, possibly resolved with more retries.\n"; exit(-1); } else if (r < 0) { std::cout << "Cluster connect failed.\n"; exit(-1); } if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } Ndb* myNdb= new Ndb(cluster_connection, "test"); // Object representing the database if (myNdb->init() == -1) APIERROR(myNdb->getNdbError()); const char *eventName= "CHNG_IN_ASSETS"; const char *eventTableName= "ASSETS"; const int noEventColumnName= 2; const char *eventColumnName[noEventColumnName]= {"NAME", "VALUE"}; // Create events myCreateEvent(myNdb, eventName, eventTableName, eventColumnName, noEventColumnName); // Normal values and blobs are unfortunately handled differently.. typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH; int i, j; j = 0; while (j < timeout) { // Start "transaction" for handling events NdbEventOperation* op; if ((op = myNdb->createEventOperation(eventName)) == NULL) APIERROR(myNdb->getNdbError()); RA_BH recAttr[noEventColumnName]; RA_BH recAttrPre[noEventColumnName]; for (i = 0; i < noEventColumnName; i++) { recAttr[i].ra = op->getValue(eventColumnName[i]); recAttrPre[i].ra = op->getPreValue(eventColumnName[i]); } if (op->execute()) APIERROR(op->getNdbError()); NdbEventOperation* the_op = op; i= 0; while (i < timeout) { int r = myNdb->pollEvents(1000); // wait for event or 1000 ms if (r > 0) { while ((op= myNdb->nextEvent())) { i++; NdbRecAttr* ra = recAttr[0].ra; if (ra->isNULL() >= 0) { // we have a value if (ra->isNULL() == 0) { // we have a non-null value printf("NAME: %s ", ra->aRef()); do_insert(myNdb, ra->aRef()); } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); // no value ra = recAttr[1].ra; printf("\n"); } } } if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError()); the_op = 0; j++; } { NdbDictionary::Dictionary *myDict = myNdb->getDictionary(); if (!myDict) APIERROR(myNdb->getNdbError()); if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError()); } delete myNdb; delete cluster_connection; ndb_end(0); return 0; } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnNames, const int noEventColumnNames) { NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); if (!myDict) APIERROR(myNdb->getNdbError()); const NdbDictionary::Table *table= myDict->getTable(eventTableName); if (!table) APIERROR(myDict->getNdbError()); NdbDictionary::Event myEvent(eventName, *table); myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); myEvent.addEventColumns(noEventColumnNames, eventColumnNames); // Add event to database if (myDict->createEvent(myEvent) == 0) myEvent.print(); else if (myDict->getNdbError().classification == NdbError::SchemaObjectExists) { printf("Event creation failed, event exists\n"); printf("dropping Event...\n"); if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError()); // try again // Add event to database if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError()); } else APIERROR(myDict->getNdbError()); return 0; } static void do_insert(Ndb* myNdb, char *asset_name) { const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("AUDIT_LOG"); char str[30]; str[0] = strlen(asset_name); strcpy(str +1, asset_name); printf("Storing %i characters: %s\n", strlen(asset_name), asset_name); if (myTable == NULL) APIERROR(myDict->getNdbError()); NdbTransaction *myTransaction= myNdb->startTransaction(); if (myTransaction == NULL) APIERROR(myNdb->getNdbError()); myOperation->insertTuple(); myOperation->setValue("NOTE", str); if (myTransaction->execute( NdbTransaction::Commit ) == -1) APIERROR(myTransaction->getNdbError()); myNdb->closeTransaction(myTransaction); }
We can then use the code to make the addition through the NDB API. We use one terminal to run the listener and then another to run the code to add the tuple.
[billy@ws1 stored]$ ./trigger_listener localhost:1186 100 [billy@ws1 stored]$ ./test_stored_procedures localhost:1186 Keyboard mysql> select * from ASSETS; +----------+-------+ | NAME | VALUE | +----------+-------+ | Keyboard | 555 | | Computer | 350 | | Monitor | 555 | +----------+-------+ 3 rows in set (0.00 sec) mysql> select * from AUDIT_LOG; +-----------+ | NOTE | +-----------+ | Computer | | Keyboard | +-----------+ 2 rows in set (0.00 sec)
A major advantage of this approach is that the trigger is implemented within the Cluster database and so is invoked regardless of where the INSERT is requested – whether it be through the NDB API or through any of the MySQL Servers. This is shown in the results that follow.
mysql> drop trigger ins_asset; Query OK, 0 rows affected (0.00 sec) mysql> drop procedure log_it; Query OK, 0 rows affected (0.00 sec) mysql> insert into ASSETS values("Printers", 200); Query OK, 1 row affected (0.00 sec) mysql> select * from ASSETS; +----------+-------+ | NAME | VALUE | +----------+-------+ | Keyboard | 555 | | Computer | 350 | | Monitor | 555 | | Printers | 200 | +----------+-------+ 4 rows in set (0.00 sec) mysql> select * from AUDIT_LOG; +-----------+ | NOTE | +-----------+ | Printers | | Keyboard | | Computer | +-----------+ 4 rows in set (0.00 sec)
Note that I first removed the original trigger and stored procedure that were defined in the MySQL Server.
There is another key difference between MySQL triggers and NDB events – triggers are executed as part of the MySQL transaction making the main database change whereas NDB events happen asynchronously. The effect of this is:
- The original transaction will commit succesfully before the side effects have been processed
- If the process waiting for the event disappears then the side effect will not be processed – for this reson, you may want to consider an audit/clean-up scripts to cover these cases.
Conclusion
Stored procedures are fully supported for users or applications which access a Cluster database through a MySQL Server (whether directly using SQL or through any of the numerous connectors that are available). Applications which access the database through the NDB API have the full flexibility of C++ to implement functionality that can achieve the same results. Triggers are available whichever method is used to access the database – albeit with different implementations and slightly different functionality.












