Saturday, April 19, 2008

Database checkpoint SPI

Recently I noticed that I have some visitors who don't speak Russian and I decided to post both English and Russian versions if I could. Obviously not all russian software developers know english good enough to read all those articles but I think mine ones are very simple to understand and in most cases they don't need anything but the code (as it is self descriptive).

Well, let's start. Today I would like to give you another example that seems to me very useful. This is a database checkpoint SPI implementation. Let me say some words about checkpoints. GridGain uses checkpoints to store interim task or job results somewhere (by default it's a shared file system but it has Oracle Coherence and GigaSpaces implementations as well). But the most useful case as I see it is a database. Jobs that takes long time definitely need a storage where they will keep calculation results and all people I know think that database is a right place.

So let's code a bit (I said a bit because I personally spent 30 mins to code this example and another 20 mins to test it on Oracle database). The example does not have Oracle specific code except those check for the ORA-00955 when I create new table (I just did not find any simple solution in my mind). Anyway code might be different because not all database drivers implement Java SQL specification properly thus take care and test this example if you are going to use it.

Simple configuration of this checkpoint SPI. One should add it to the grid configuration file:

<property name="checkpointSpi">
<bean class="org.gridgain.examples.checkpoint.db.GridDbCheckpointSpi">
<property name="connectionStr" value="test/test@box:1521/orcl"/>
</bean>
</property>


Here is a code. Notice that I did not implement timespan (expiration time) support - just because it will take me another 10 minutes :) and it's up to you to change it if you need.

GridDbCheckpointSpi.java

package org.gridgain.examples.checkpoint.db;

import java.sql.*;
import org.gridgain.grid.logger.*;
import org.gridgain.grid.resources.*;
import org.gridgain.grid.spi.*;
import org.gridgain.grid.spi.checkpoint.*;

public class GridDbCheckpointSpi extends GridSpiAdapter
implements GridCheckpointSpi{
/** */
private static final String CREATE_TABLE_SQL =
"create table grid_checkpoints(cpKey varchar(256), " +
"cpValue long raw)";

/** */
private static final String CHECK_CP_SQL =
"select 0 from grid_checkpoints where cpKey = ?";

/** */
private static final String UPDATE_CP_SQL =
"update grid_checkpoints set cpValue = ? where cpKey = ?";

/** */
private static final String INSERT_CP_SQL =
"insert into grid_checkpoints(cpKey, cpValue) values(?, ?)";

/** */
private static final String DELETE_CP_SQL =
"delete from grid_checkpoints where cpKey = ?";

/** */
private static final String SELECT_CP_SQL =
"select cpValue from grid_checkpoints where cpKey = ?";

/** */
private String connStr = null;

/** */
private Connection conn = null;

/** */
@GridLoggerResource
private GridLogger log = null;

/**
* @return the connStr
*/
public String getConnectionString() {
return connStr;
}

/**
* @param connStr the connStr to set
*/
public void setConnectionStr(String connStr) {
this.connStr = connStr;
}

/**
* {@inheritDoc}
*/
public byte[] loadCheckpoint(String key)
throws GridSpiException {
try {
CallableStatement cs = conn.prepareCall(SELECT_CP_SQL);

cs.setString(1, key);

ResultSet rs = cs.executeQuery();

if (rs.next() == false) {
// Nothing found.
return null;
}

byte[] res = rs.getBytes(1);

assert rs.next() == false : "Failed to load " +
"checkpoint (duplicate key found) [cpKey="
+ key + ']';

return res;
}
catch (SQLException e) {
throw new GridSpiException("Failed to remove " +
"checkpoint [cpKey=" + key + ']', e);
}
}

/**
* {@inheritDoc}
*/
public boolean removeCheckpoint(String key) {
try {
CallableStatement cs = conn.prepareCall(DELETE_CP_SQL);

cs.setString(1, key);

return cs.execute();
}
catch (SQLException e) {
log.warning("Failed to remove checkpoint [cpKey="
+ key + ']', e);

return false;
}
}

/**
* {@inheritDoc}
*/
public void saveCheckpoint(String key, byte[] state,
long timeout)throws GridSpiException {
try {
CallableStatement cs = conn.prepareCall(CHECK_CP_SQL);

cs.setString(1, key);

ResultSet rs = cs.executeQuery();

if (rs.next() == false) {
// Nothing found.
CallableStatement in = conn.prepareCall(
INSERT_CP_SQL);

in.setString(1, key);
in.setBytes(2, state);

if (in.execute() == false) {
throw new GridSpiException("Failed to create " +
"checkpoint [cpKey=" + key + ']');
}
}
else {
// Saved checkpoint found.
CallableStatement up = conn.prepareCall(
UPDATE_CP_SQL);

up.setBytes(1, state);
up.setString(2, key);

if (up.execute() == false) {
throw new GridSpiException("Failed to update " +
"checkpoint [cpKey=" + key + ']');
}
}

}
catch (SQLException e) {
throw new GridSpiException("Failed to remove " +
"checkpoint [cpKey=" + key + ']', e);
}
}

/**
*{@inheritDoc}
*/
public void spiStart(String gridName) throws GridSpiException {
// Start SPI start stopwatch.
startStopwatch();

assertParameter(connStr != null, "connStr != null");

// Obtain the connection.
conn = obtain(connStr);

try {
// Check database table.
CallableStatement cs = conn.prepareCall(
CREATE_TABLE_SQL);

if (cs.execute() == true) {
// Table has been created.
if (log.isInfoEnabled() == true) {
log.info("Successfully created checkpoint table");
}
} else {
// Unexpected behavior.
assert false: "This should never happen.";
}
}
catch (SQLException e) {
// If exception string has 'ORA-00955'
// then this table exists.
if (e.getMessage().contains("ORA-00955") == true) {
if (log.isDebugEnabled() == true) {
log.debug("Checkpoint table already exist.");
}
}
else {
// Re-throw.
throw new GridSpiException("Failed to create " +
"checkpoint table.", e);
}
}

// Ack ok start.
if (log.isInfoEnabled() == true) {
log.info(startInfo());
}
}

/**
* {@inheritDoc}
*/
public void spiStop() throws GridSpiException {
try {
conn.close();
}
catch(SQLException e) {
throw new GridSpiException("Failed to close database " +
"connection [conn=" + conn + ']', e);
}

// Ack ok stop.
if (log.isInfoEnabled() == true) {
log.info(stopInfo());
}
}

/**
* @param connStr
* @return FIXDOC.
* @throws GridSpiException
*/
private Connection obtain(String connStr)
throws GridSpiException {
try {
return DriverManager.getConnection(connStr);
}
catch (SQLException e) {
throw new GridSpiException("Failed to obtain database " +
"connection [connStr=" + connStr + ']', e);
}
}
}

0 comments: