Wednesday, April 30, 2008

Give priorities to your tasks

Making lists is a great way to schedule the time and organize your life. A lot of people and even me make the tasks list every day and I'm sure that almost all of them poll some tasks to the top of this list giving them higher priority.

GridGain does it out-of-the-box and schedule your tasks automatically. This feature is very useful on distributed environment where multiple tasks and jobs are being executed simultaneously. Definitely one need to say what jobs should be run first and which ones could wait. Another words - prioritize them.

Here is a simple example (as you probably noticed we do our best to make it simple for developers and users) that shows how all those tasks and jobs could be prioritized.

Add following to the node configuration file:

config.xml

<property name="collisionSpi">
<bean class=
"org.gridgain.grid.spi.collision.priorityqueue.GridPriorityQueueCollisionSpi">
<property name="priorityAttributeKey" value="GRID_TASK_PRIORITY"/>
</bean>
</property>

This will set up appropriate Collision SPI and says it to look for the given priority key.

Now let's get back to tasks and jobs and give them priority.

MyGridUsualTask.java

public class MyGridUsualTask extends GridTaskSplitAdapter<Object, Object> {
public static final int SPLIT_COUNT = 20;

@GridTaskSessionResource
private GridTaskSession taskSes = null;

@Override
protected Collection<? extends GridJob> split(int gridSize, Object arg)
throws GridException {
...
// Set low task priority (note that attribute name is used by the SPI
// and should not be changed).
taskSes.setAttribute("GRID_TASK_PRIORITY", 5);

Collection<GridJob> jobs = new ArrayList<GridJob>(SPLIT_COUNT);

for (int i = 1; i <= SPLIT_COUNT; i++) {
jobs.add(new GridJobAdapter<Integer>(i) {
...
});
}
...
}
}

and

MyGridUrgentTask.java

public class MyGridUrgentTask extends GridTaskSplitAdapter<Object, Object> {
public static final int SPLIT_COUNT = 5;

@GridTaskSessionResource
private GridTaskSession taskSes = null;

@Override
protected Collection<? extends GridJob> split(int gridSize, Object arg)
throws GridException {
...
// Set high task priority (note that attribute name is used by the SPI
// and should not be changed).
taskSes.setAttribute("GRID_TASK_PRIORITY", 10);

Collection<GridJob> jobs = new ArrayList<GridJob>(SPLIT_COUNT);

for (int i = 1; i <= SPLIT_COUNT; i++) {
jobs.add(new GridJobAdapter<Integer>(i) {
...
});
}
...
}
}

Java NIO - No Input no Output :)

It's rather an offtop but I should say some words about NIO on Linux.

NIO or new input/output provides better performance and less resources consumption than simple sockets and IMHO pretty good thing to use wherever it's possible.

Starting from 1.5 GridGain uses Java NIO to give users maximum flexibility, compatibility and throughput. All messages that nodes exchange each over and even jobs and tasks exchange uses new input/output provide highest speed and consume less resources then ordinary sockets.

But recently digging into some Java 6 features on Linux we ran into the issue with NIO implementation.

Linux has so called epoll (I/O event notification facility that was implemented in 2.6 kernel). Java 6 reimplemented NIO to use Linux epoll and thus this new version won't work on old Linux installations. So to use latest Java VM you have to keep you OS up-to-date or get back to Java 5.
Some tips to check if your Linux is compatible with Java 6 NIO:
  1. Execute "nm -D /lib/libc.so.6 | grep epoll". You should get all 4 functions like below
    00000000000d8040 T epoll_create
    00000000000d8070 T epoll_ctl
    00000000000d7d10 T epoll_pwait
    00000000000d80a0 T epoll_wait
  2. Check you kernel " sudo cat /boot/System.map-* |grep epoll" (note that you need super user privileges). Your active kernel should have epoll functions implemented.
    ffffffff802dd050 T sys_epoll_wait
    ffffffff802dd530 T sys_epoll_ctl
    ffffffff802dda50 T sys_epoll_create
    ffffffff802ddb30 T sys_epoll_pwait
    ffffffff802e10b0 T compat_sys_epoll_pwait
So if you got message like below just check things above.
Caused By:
----------
>>> Type: java.io.IOException
>>> Message: Function not implemented
>>> Stack trace:
>>> at sun.nio.ch.EPollArrayWrapper.epollCreate(EPollArrayWrapper.java:1)
...

P.S. The reason was old "glibc" library. So if you got this exception check the kernel and then update "glibc" to the latest.

Saturday, April 19, 2008

Database checkpoint SPI

Recently I noticed that I have some visitors who don't speak Russian and I decided to post both English and Russian versions if I could. Obviously not all russian software developers know english good enough to read all those articles but I think mine ones are very simple to understand and in most cases they don't need anything but the code (as it is self descriptive).

Well, let's start. Today I would like to give you another example that seems to me very useful. This is a database checkpoint SPI implementation. Let me say some words about checkpoints. GridGain uses checkpoints to store interim task or job results somewhere (by default it's a shared file system but it has Oracle Coherence and GigaSpaces implementations as well). But the most useful case as I see it is a database. Jobs that takes long time definitely need a storage where they will keep calculation results and all people I know think that database is a right place.

So let's code a bit (I said a bit because I personally spent 30 mins to code this example and another 20 mins to test it on Oracle database). The example does not have Oracle specific code except those check for the ORA-00955 when I create new table (I just did not find any simple solution in my mind). Anyway code might be different because not all database drivers implement Java SQL specification properly thus take care and test this example if you are going to use it.

Simple configuration of this checkpoint SPI. One should add it to the grid configuration file:

<property name="checkpointSpi">
<bean class="org.gridgain.examples.checkpoint.db.GridDbCheckpointSpi">
<property name="connectionStr" value="test/test@box:1521/orcl"/>
</bean>
</property>


Here is a code. Notice that I did not implement timespan (expiration time) support - just because it will take me another 10 minutes :) and it's up to you to change it if you need.

GridDbCheckpointSpi.java

package org.gridgain.examples.checkpoint.db;

import java.sql.*;
import org.gridgain.grid.logger.*;
import org.gridgain.grid.resources.*;
import org.gridgain.grid.spi.*;
import org.gridgain.grid.spi.checkpoint.*;

public class GridDbCheckpointSpi extends GridSpiAdapter
implements GridCheckpointSpi{
/** */
private static final String CREATE_TABLE_SQL =
"create table grid_checkpoints(cpKey varchar(256), " +
"cpValue long raw)";

/** */
private static final String CHECK_CP_SQL =
"select 0 from grid_checkpoints where cpKey = ?";

/** */
private static final String UPDATE_CP_SQL =
"update grid_checkpoints set cpValue = ? where cpKey = ?";

/** */
private static final String INSERT_CP_SQL =
"insert into grid_checkpoints(cpKey, cpValue) values(?, ?)";

/** */
private static final String DELETE_CP_SQL =
"delete from grid_checkpoints where cpKey = ?";

/** */
private static final String SELECT_CP_SQL =
"select cpValue from grid_checkpoints where cpKey = ?";

/** */
private String connStr = null;

/** */
private Connection conn = null;

/** */
@GridLoggerResource
private GridLogger log = null;

/**
* @return the connStr
*/
public String getConnectionString() {
return connStr;
}

/**
* @param connStr the connStr to set
*/
public void setConnectionStr(String connStr) {
this.connStr = connStr;
}

/**
* {@inheritDoc}
*/
public byte[] loadCheckpoint(String key)
throws GridSpiException {
try {
CallableStatement cs = conn.prepareCall(SELECT_CP_SQL);

cs.setString(1, key);

ResultSet rs = cs.executeQuery();

if (rs.next() == false) {
// Nothing found.
return null;
}

byte[] res = rs.getBytes(1);

assert rs.next() == false : "Failed to load " +
"checkpoint (duplicate key found) [cpKey="
+ key + ']';

return res;
}
catch (SQLException e) {
throw new GridSpiException("Failed to remove " +
"checkpoint [cpKey=" + key + ']', e);
}
}

/**
* {@inheritDoc}
*/
public boolean removeCheckpoint(String key) {
try {
CallableStatement cs = conn.prepareCall(DELETE_CP_SQL);

cs.setString(1, key);

return cs.execute();
}
catch (SQLException e) {
log.warning("Failed to remove checkpoint [cpKey="
+ key + ']', e);

return false;
}
}

/**
* {@inheritDoc}
*/
public void saveCheckpoint(String key, byte[] state,
long timeout)throws GridSpiException {
try {
CallableStatement cs = conn.prepareCall(CHECK_CP_SQL);

cs.setString(1, key);

ResultSet rs = cs.executeQuery();

if (rs.next() == false) {
// Nothing found.
CallableStatement in = conn.prepareCall(
INSERT_CP_SQL);

in.setString(1, key);
in.setBytes(2, state);

if (in.execute() == false) {
throw new GridSpiException("Failed to create " +
"checkpoint [cpKey=" + key + ']');
}
}
else {
// Saved checkpoint found.
CallableStatement up = conn.prepareCall(
UPDATE_CP_SQL);

up.setBytes(1, state);
up.setString(2, key);

if (up.execute() == false) {
throw new GridSpiException("Failed to update " +
"checkpoint [cpKey=" + key + ']');
}
}

}
catch (SQLException e) {
throw new GridSpiException("Failed to remove " +
"checkpoint [cpKey=" + key + ']', e);
}
}

/**
*{@inheritDoc}
*/
public void spiStart(String gridName) throws GridSpiException {
// Start SPI start stopwatch.
startStopwatch();

assertParameter(connStr != null, "connStr != null");

// Obtain the connection.
conn = obtain(connStr);

try {
// Check database table.
CallableStatement cs = conn.prepareCall(
CREATE_TABLE_SQL);

if (cs.execute() == true) {
// Table has been created.
if (log.isInfoEnabled() == true) {
log.info("Successfully created checkpoint table");
}
} else {
// Unexpected behavior.
assert false: "This should never happen.";
}
}
catch (SQLException e) {
// If exception string has 'ORA-00955'
// then this table exists.
if (e.getMessage().contains("ORA-00955") == true) {
if (log.isDebugEnabled() == true) {
log.debug("Checkpoint table already exist.");
}
}
else {
// Re-throw.
throw new GridSpiException("Failed to create " +
"checkpoint table.", e);
}
}

// Ack ok start.
if (log.isInfoEnabled() == true) {
log.info(startInfo());
}
}

/**
* {@inheritDoc}
*/
public void spiStop() throws GridSpiException {
try {
conn.close();
}
catch(SQLException e) {
throw new GridSpiException("Failed to close database " +
"connection [conn=" + conn + ']', e);
}

// Ack ok stop.
if (log.isInfoEnabled() == true) {
log.info(stopInfo());
}
}

/**
* @param connStr
* @return FIXDOC.
* @throws GridSpiException
*/
private Connection obtain(String connStr)
throws GridSpiException {
try {
return DriverManager.getConnection(connStr);
}
catch (SQLException e) {
throw new GridSpiException("Failed to obtain database " +
"connection [connStr=" + connStr + ']', e);
}
}
}

Monday, April 14, 2008

Использование атрибутов нодов для распределения задач

Как известно не все комьютеры одинаковы :) Так же и с нодами входящими в состав Грида. Одним из важных аспектов при выполнении какой то задачи на гриде является правильное распределение под-задач на ноды, где есть все необходимые ресурсы.

Одним из простых и надежных способов является "маркировка" нодов с помощию статических атрибутов. Предположим что какая то нода, скажем с именем "db_node" имеет доступ к базе данных, в то время как другие нет. Наша сегодняшняя задача будет состоять в написании примера, позволяющего при распределении jobs между нодами учитывать тип job и, скажем, если она требует доступа к базе данных направлять ее на выполнение именно туда.

На практике это очень просто:
Шаг первый - стартуем ноду db_node с пользовательской конфигурацией, включающей атрибут "db=true" (ниже идет пример кода из конфигурационного файла)

example-config.xml

<property name="gridName" value="db_node"/>
...
<property name="userAttributes">
<map>
<entry key="db" value="true">
</map>
</property>

Шаг второй это использование атрибута в методе GridTask#map(). Давайте предположим что jobs не однородны для одной и той же task, что вполне естественно - каждая задача состоит обычно из различных по своему алгортму подзадач. Например задача поиска данных может сводится к подзадаче поиска в локальных файлах, поиска в базе данных и в Интернет.

Положим для простоты что в нашем случае мы выполняем поиск в локальных файлах и базе данных.

GridSearchTask.java

public class GridSearchTask extends GridTaskAdapter<String, Object> {
/** Grid logger. */
@GridLoggerResource
private GridLogger log = null;

/** Instance of grid. */
@GridInstanceResource
private Grid grid = null;

/**
* {@inheritDoc}
*/
public Map map(List<GridNode> subgrid, String arg) throws GridException {
Map<GridJob,GridNode> jobs = new HashMap<Gridjob,GridNode>(1);

for (GridNode node: subgrid) {
// Add additional job for DB node.
if ("true".equals(node.getAttribute("db")) == true) {
jobs.put(new GridDbJob(), node);
}

// Look for files on every node.
jobs.put(new GridFilesJob(), node);
}

return jobs;
}

/**
* {@inheritDoc}
*/
public Object reduce(List<GridJobResult> results)
throws GridException {
...
}
}

Я сознательно не публикую код GridFilesJob и GridDbJob поскольку они зависят от конкретной задачи а мы рассматриваем абстрактный пример.

Saturday, April 5, 2008

Пример параллельной обработки файлов с GridGain

Наконец то я воплотил давнюю идею написать универсальную программку которая позволяет распараллеливать обработку файлов на GridGain.
Идея очень простая - универсальная task и job которые выполняют данную команду с некоторыми параметрами на разных машинах и собирает output с этих команд а потом сборка результата с помощью простой команды.

Итак чуть более формально:
Задача: посчитать количество вхождений строки во все файлы в определенном каталоге на сетевом диске (задача распараллеливания становится очень актуальна если скажем у вас 1000 файлов по 1 гигабайту например). (Кстати не меняя Java кода вы можете выполнять любые скрипты: shell, cmd, perl что угодно)

Как это работает:
Вы стартуете универсальный лоадер GridCmdLineLoader как обычный класс с методом main() и передаете ему 3 параметра:
a) Файл конфигурацию грида
б) Команду, которая будет обрабатывать файлы и выводить в консоль результат работы
с) Команду, которая из результатов пункта б) сделает окончательный результат.

Пример запуска:
java GridCmdLineLoader config/default-spring.xml "search.sh /mount/netdrive/data searchString" result.sh

Прошу заметить что это просто пример :) конечно же надо указать все пути правильно, указать classpath Java машине и прочее.

Теперь код:
1) Создадим скрипт (для примера на bash) который умеет искать подстроку в файлах, но немного хитрее - у него будет возможность "распараллеливания". То есть проще говоря он будет обрабатывать не все файлы, а только некоторые на основе 2 переменных окружения: GRID_JOB_NUM и GRID_TOTAL_JOBS. Прошу заметить что скрипт выводит результат с помощью echo. Это и есть результат его работы. Скрипт назовем search.sh и пусть он принимает 2 параметра. 1-й это имя каталога а 2-й это строка для поиска.

search.sh

#!/bin/bash

# First script parameter is search dir
searchDir=$1

# Second is a word.
searchWord=$2

result=0;

# Variables predefined by task
totalJobs=$GRID_TOTAL_JOBS
jobNum=$GRID_JOB_NUM

# Initialize then with default values if script started manually.
if [[ -z $totalJobs ]]; then
totalJobs=1
fi

if [[ -z $jobNum ]]; then
jobNum=0
fi

# Get list of files.
files=`ls $searchDir`

i=0

# One by one count number of given word using grep.
for file in $files; do
# Next file number that should be processed. Note that if
# number of jobs is 2 for example then every job will
# process every second file with $jobNum shift.
nextNum=$((i / totalJobs * totalJobs + jobNum))

if [[ $i = $nextNum ]]; then
cnt=`grep -c $searchWord $searchDir/$file`

result=$((result+cnt))
fi

i=$((i+1))
done

echo -n $result

exit 0;

2) Создаем еще простой скрипт опять-таки на bash. Его задача "собрать" вместе результаты выполнения первого скрипта на разных машинах (просто просуммировать их). Назовем его result.sh и его параметрами будут результаты выполнения search.sh на разных машинах.

result.sh
#!/bin/bash

result=0

until [ -z "$1" ]
do
result=$((result+$1))

shift
done

echo -n $result
3) Теперь самое интересное - Это так называемый loader. Несмотря на то что GridGain поставляется с различными loaders (для WebLogic, GlassFish, WebSphere, Servlet, Command line) нам пригодится более специализированный. А по сути это обычный класс с методом main(), который поднимает грид, выполняет нашу задачу и останавливает грид :)
Ниже идет класс названный GridCmdLineLoader и реализующий эту логику.

GridCmdLineLoader.java
package org.gridgain.examples.cmdline;

import org.gridgain.grid.*;
import org.gridgain.grid.loaders.*;

@SuppressWarnings({"CallToSystemExit"})
@GridLoader(description = "Command line loader")
public final class GridCmdLineLoader {
/**
* Enforces singleton.
*/
private GridCmdLineLoader() {
// No-op.
}

/**
* Main entry point.
*
* @param args Command line arguments.
*/
@SuppressWarnings({"unchecked"})
public static void main(String[] args) {
if (args.length < 3) {
System.out.println("Too few arguments. Expected at least" +
" grid configuration target script and result script.");

System.exit(-1);
}

Grid grid = null;

Exception ex = null;

try {
grid = GridFactory.start(args[0]);

GridCmdLineTask task = new GridCmdLineTask(args[1], args[2]);

String res = grid.execute(task, null).get();

System.out.println(res);
}
catch (GridException e) {
ex = e;
}
finally {
if (grid != null) {
GridFactory.stop(grid.getName(), false);
}
}

if (ex != null) {
System.out.println("Got exception: " + ex);

System.exit(-1);
}
else {
System.exit(0);
}
}
}

4) Ну и последнее - grid task/job и пожалуй самое интересное - прошу заметитть что эта job умеет выполнять произвольный скрипт и все что скрипт выводит на консоль берет как строку и считает результатом работы - так что при желании вы можете не только считать но и возвращать результат поиска и все что хотите :)

GridCmdLineTask.java

package org.gridgain.examples.cmdline;

import java.io.*;
import java.util.*;
import org.gridgain.grid.*;
import org.gridgain.grid.logger.*;
import org.gridgain.grid.resources.*;

public class GridCmdLineTask extends GridTaskSplitAdapter<String, String> {
/** */
public static final String GRID_JOB_NUM = "GRID_JOB_NUM";

/** */
public static final String GRID_TOTAL_JOBS = "GRID_TOTAL_JOBS";

/** */
@GridLoggerResource
private GridLogger log = null;

/** Executable command. */
private final String cmd;

/** Result reducing command. */
private final String resCmd;

/**
* @param cmd Command to execute.
* @param resCmd Command that will reduce results
*/
public GridCmdLineTask(String cmd, String resCmd) {
super();

this.cmd = cmd;
this.resCmd = resCmd;
}

/**
* {@inheritDoc}
*/
@Override
public Collection<? extends GridJob> split(int gridSize, String arg) {
Collection<GridJob> refs = new ArrayList<GridJob>(gridSize);

for (int i = 0; i < job =" new"> results) throws GridException {
String params = "";

for (GridJobResult res : results) {
params = params + res.getData() + ' ';
}

if (log.isDebugEnabled() == true) {
log.debug("Reducing results. Execute command '"
+ resCmd + ' ' + params + '\'');
}

// Make a call of reducing command.
return executeCmd(resCmd + ' ' + params, null, log);
}

/**
*
* @param params command parameters
* @param env command environment.
* @param log Grid logger.
* @return command output.
* @throws GridException If command execution failed.
*/
private static String executeCmd(String params, String[] env, GridLogger log)
throws GridException {
try {
Process proc = Runtime.getRuntime().exec(params, env);

proc.waitFor();

StringBuffer sb = new StringBuffer();

InputStream is = proc.getInputStream();

while (is.available() != 0) {
sb.append((char)is.read());
}

if (log.isDebugEnabled() == true) {
log.debug("Command execution result is " + sb.toString());
}

return sb.toString();
}
catch (IOException e) {
throw new GridException(e);
}
catch (InterruptedException e) {
throw new GridException(e);
}
}

/**
*
*/
public static class GridCmdLineJob extends GridJobAdapter<String> {
/** */
@GridLoggerResource
private GridLogger log = null;

/** Job number. */
private final long jobNum;

/** Number of jobs. */
private final long totalJobs;

/**
* Creates new command line job.
*
* @param jobNum Job number on a list of jobs.
* @param totalJobs Total jobs number.
*/
public GridCmdLineJob(long jobNum, long totalJobs) {
super();

this.jobNum = jobNum;
this.totalJobs = totalJobs;
}

/**
* {@inheritDoc}
*/
public Serializable execute() throws GridException {
String[] env = new String[]{GRID_JOB_NUM + "=" + jobNum,
GRID_TOTAL_JOBS + "=" + totalJobs};

if (log.isDebugEnabled() == true) {
log.debug("Executing given command line '" + getArgument() + '\'');
}

return executeCmd(getArgument(), env, this.log);
}
}
}