Thursday, November 07, 2013

Understanding Vmstat Output

Below is a Vmstat output:
procs    -----------memory-------------- ---swap--  -----io----  --system--   -----cpu--------
r  b          swpd    free    buff   cache     si     so    bi     bo       in      cs  us  sy id wa st
2 0    2573144 12404  1140  47128  185  263  185  299  3173  3705  92  8  0    0 0
3 0    2574708 12304  1188  47436  192  187  192  234  3079  3468  92  8  0    0 0
Under Procs we have
       r: The number of processes waiting for run time or placed in run queue or are already executing (running)
       b: The number of processes in uninterruptible sleep. (b=blocked queue, waiting for resource (e.g. filesystem I/O blocked, inode lock))

If runnable threads (r) divided by the number of CPU is greater than one -> possible CPU bottleneck

(The (r) coulmn should be compared with number of CPUs (logical CPUs as in uptime) if we have enough CPUs or we have more threads.)

High numbers in the blocked processes column (b) indicates slow disks.

(r) should always be higher than (b); if it is not, it usually means you have a CPU bottleneck

Note: “cat /proc/cpuinfo” dispalys the cpu info on the machine
>cat /proc/cpuinfo|grep processor|wc -l
output: 16

Remember that we need to know the number of CPUs on our server because the vmstat r value must never exceed the number of CPUs. r value of 13 is perfectly acceptable for a 16-CPU server, while a value of 16 would be a serious problem for a 12-CPU server.

Whenever the value of the r column exceeds the number of CPUs on the server, tasks are forced to wait for execution.There are several solutions to managing CPU overload, and these alternatives are:
1.      Add more processors (CPUs) to the server.
2.      Load balance the system tasks by rescheduling large batch tasks to execute during off-peak hours.

Under Memory we have:

swpd: shows how many blocks are swapped out to disk (paged). The amount of Virtual memory used.
            Note: you can see the swap area configured in server using "cat proc/swaps"
>cat /proc/meminfo
>cat /proc/swaps
Filename                        Type            Size    Used    Priority
/dev/dm-7                       partition       16777208        21688   -1

free: The amount of Idle Memory
buff: Memory used as buffers, like before/after I/O operations
cache: Memory used as cache by the Operating System

Under Swap we have:
si: Amount of memory swapped in from disk (/s). This shows page-ins
so: Amount of memory swapped to disk (/s). This shows page-outs. The so column is zero consistently, indicating there are no page-outs.

In Ideal condition, si and so should be at 0 most of the time, and we definitely don’t like to see more than 10 blocks per second.

Under IO we have:
bi: Blocks received from block device - Read (like a hard disk)(blocks/s)
bo: Blocks sent to a block device – Write(blocks/s)

Under System we have:
in: The number of interrupts per second, including the clock.
cs: The number of context switches per second.

(A context switch occurs when the currently running thread is different from the previously running thread, so it is taken off of the CPU.)

It is not uncommon to see the context switch rate be approximately the same as device interrupt rate (in column)

If cs is high, it may indicate too much process switching is occurring, thus using memory inefficiently.
If cs is higher then sy, system is doing more context switching than actual work.

High r with high cs -> possible lock contention
Lock contention occurs whenever one process or thread attempts to acquire a lock held by another process or thread. The more granular the available locks, the less likely one process/thread will request a lock held by the other. (For example, locking a row rather than the entire table, or locking a cell rather than the entire row.)

When you are seeing blocked processes or high values on waiting on I/O (wa), it usually signifies either real I/O issues where you are waiting for file accesses or an I/O condition associated with paging due to a lack of memory on your system.

Note: the memory, swap, and I/O statistics are in blocks, not in bytes. In Linux, blocks are usually 1,024 bytes (1 KB).

Under CPU we have:
These are percentages of total CPU time.
       us: % of CPU time spent in user mode (not using kernel code, not able to acces to kernel resources). Time spent running non-kernel code. (user time, including nice time)
       sy: % of CPU time spent running kernel code. (system time)
       id: % of CPU  idle time
       wa: % of CPU time spent waiting for IO.

Note: the memory, swap, and I/O statistics are in blocks, not in bytes. In Linux, blocks are usually 1,024 bytes (1 KB).

To measure true idle time measure id+wa together:
- if id=0%, it does not mean all CPU is consumed, because "wait" (wa) can be 100% and waiting for an I/O to complete
- if wait=0%, it does not mean I have no I/O waiting issues, because as long I have threads which keep the CPU busy I could have additional threads waiting for I/O, but this will be masked by the running threads

If process A is running and process B is waiting on I/O, the wait% still would have a 0 number.
A 0 number doesn't mean I/O is not occurring, it means that the system is not waiting on I/O.
If process A and process B are both waiting on I/O, and there is nothing that can use the CPU, then you would see that column increase.

- if wait% is high, it does not mean I have io performance problem, it can be an indication that I am doing some IO but the cpu is not kept busy at all
- if id% is high then likely there is no CPU or I/O problem

To measure cpu utilization measure us+sy together (and compare it to physc):
- if us+sy is always greater than 80%, then CPU is approaching its limits 
- if us+sy = 100% -> possible CPU bottleneck
- if sy is high, your appl. is issuing many system calls to the kernel and asking the kernel to work. It measures how heavily the appl. is using kernel services.
- if sy  is higher than us, this means your system is spending less time on real work (not good)

Mointor System with Vmstat:
>nohup vmstat -n 10 604879 > myvmstatfile.dat &
To generate one week of Virtual Memory stats spaced out at ten second intervals (less the last one) is 60,479 10 second intervals

> nohup vmstat -n 3 5|awk '{now=strftime("%Y-%m-%d %T "); print now $0}' > vmstat.data &
Append timestamp to the vmstat output.

Friday, July 12, 2013

Dive into Spring test framework - Part2

Part1 - Dive into Spring test framework(junit3.8)

Now let's put eyes on Spring testcontext framework which is introduced since Spring 3.X.

The general Idea of Spring TextContext framework

Spring3.X has deprecated JUnit 3.8 class hierarchy, let's have a look at Spring TextContext framework. Below is a test class by means of TestContext.
  1 package com.mpos.lottery.te.draw.dao;
  2 
  3 import javax.persistence.EntityManager;
  4 import javax.persistence.PersistenceContext;
  5 
  6 import org.apache.commons.logging.Log;
  7 import org.apache.commons.logging.LogFactory;
  8 import org.junit.After;
  9 import org.junit.Before;
 10 import org.junit.Test;
 11 import org.springframework.test.annotation.Rollback;
 12 import org.springframework.test.context.ContextConfiguration;
 13 import org.springframework.test.context.junit4.AbstractTransactionalJUnit4SpringContextTests;
 14 import org.springframework.test.context.transaction.AfterTransaction;
 15 import org.springframework.test.context.transaction.BeforeTransaction;
 16 import org.springframework.test.context.transaction.TransactionConfiguration;
 17 
 18 import com.mpos.lottery.te.common.dao.ShardKeyContextHolder;
 19 
 20 /**
 21  * Spring TestContext Framework. If extending from
 22  * <code>AbstractTransactionalJUnit4SpringContextTests</code>, you don't need to
 23  * declare <code>@RunWith</code>,
 24  * <code>TestExecutionListeners(3 default listeners)</code> and
 25  * <code>@Transactional</code>. Refer to
 26  * {@link AbstractTransactionalJUnit4SpringContextTests} for more information.
 27  * <p>
 28  * Legacy JUnit 3.8 class hierarchy is deprecated.
 29  *
 30  * @author Ramon Li
 31  */
 32 //@RunWith(SpringJUnit4ClassRunner.class)
 33 @ContextConfiguration(locations = { "/spring-service.xml", "/spring-dao.xml",
 34         "/spring-shard-datasource.xml" })
 35 @TransactionConfiguration(transactionManager = "transactionManager", defaultRollback = false)
 36 //@TestExecutionListeners(listeners = { TransactionalTestExecutionListener.class,
 37 //        ShardAwareTestExecutionListener.class })
 38 //@Transactional
 39 public class GameDaoTest extends AbstractTransactionalJUnit4SpringContextTests {
 40         private Log logger = LogFactory.getLog(GameDaoTest.class);
 41         // Must declare @Autowire(by type) or @Resource(JSR-250)(by name)
 42         // explicitly, otherwise spring won't inject the dependency.
 43         private GameDao gameDao;
 44         @PersistenceContext(unitName = "lottery_te")
 45         private EntityManager entityManager;
 46 
 47         public GameDaoTest() {
 48                 logger.debug("GameDaoTest()");
 49                 // As spring test framework will create a auto-rollbacked transaction
 50                 // before setup a test case(even @BeforeTransaction, the data source has
 51                 // been determined), we must set the shard key before creating
 52                 // transaction, otherwise the default data source of
 53                 // <code>ShardKeyRoutingDataSource</code> will be returned if it has
 54                 // been set.
 55                 ShardKeyContextHolder.setShardKey(new Integer("2"));
 56         }
 57 
 58         @BeforeTransaction
 59         public void verifyInitialDatabaseState() {
 60                 // logic to verify the initial state before a transaction is started
 61                 logger.debug("@BeforeTransaction:verifyInitialDatabaseState()");
 62 
 63                 logger.debug("EntityManager:" + this.entityManager);
 64                 logger.debug("gameDao:" + this.gameDao);
 65         }
 66 
 67         @Before
 68         public void setUpTestDataWithinTransaction() {
 69                 // set up test data within the transaction
 70                 logger.debug("@Before:setUpTestDataWithinTransaction()");
 71         }
 72 
 73         @Test
 74         // overrides the class-level defaultRollback setting
 75         @Rollback(true)
 76         public void test_2() {
 77                 // logic which uses the test data and modifies database state
 78                 logger.debug("test_2()");
 79 
 80         }
 81 
 82         @Test
 83         public void test_1() {
 84                 logger.debug("test_1()");
 85                 // logger.debug("**** Start to query oracle data source.");
 86                 String sql = "select TYPE_NAME from GAME_TYPE where GAME_TYPE_ID=9";
 87                 // setSharkKey() won't affect here
 88                 ShardKeyContextHolder.setShardKey(new Integer("1"));
 89                 // Map<String, Object> result1 =
 90                 // this.getJdbcTemplate().queryForMap(sql);
 91 
 92                 // logger.debug("**** Start to query mysql data source.");
 93                 // setSharkKey() won't affect here
 94                 // ShardKeyContextHolder.setShardKey(new Integer("2"));
 95                 // Map<String, Object> result2 =
 96                 // this.getJdbcTemplate().queryForMap(sql);
 97 
 98                 // Avoid false positives when testing ORM code[Spring manual document]
 99                 this.entityManager.flush();
100         }
101 
102         @After
103         public void tearDownWithinTransaction() {
104                 // execute "tear down" logic within the transaction.
105                 logger.debug("@After:tearDownWithinTransaction()");
106         }
107 
108         @AfterTransaction
109         public void verifyFinalDatabaseState() {
110                 // logic to verify the final state after transaction has rolled back
111                 logger.debug("@AfterTransaction:verifyFinalDatabaseState()");
112 
113         }
114 
115 }

Be honest to say, Spring is good at its automatical transaction rollback, if you know it very well and maintain your test code with big care. The bad side is it enlarges the transaction boundary, in general the boundary of your transaction will be the invocation of a service method, spring test framework enlarges it to the test method.
It will incur below two issues:

  • Hibernate flush. If no select on given entity, hibernate won't flush DML of that entity info underlying database, until committing or flush explicitly.
  • Hibernate lazy loading. If you want to deserialize a entity out of transaction, you will know what I mean.
Below is my base test class which all transactional integration test should inherit from.
package com.mpos.lottery.te.test.integration;

import static org.junit.Assert.assertEquals;

import java.util.Calendar;
import java.util.Date;
import java.util.UUID;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractTransactionalJUnit4SpringContextTests;
import org.springframework.test.context.transaction.AfterTransaction;
import org.springframework.test.context.transaction.BeforeTransaction;
import org.springframework.test.context.transaction.TransactionConfiguration;
import org.springframework.test.context.transaction.TransactionalTestExecutionListener;

import com.mpos.lottery.te.config.MLotteryContext;
import com.mpos.lottery.te.gamespec.prize.Payout;
import com.mpos.lottery.te.gamespec.sale.BaseTicket;
import com.mpos.lottery.te.hasplicense.domain.License;
import com.mpos.lottery.te.trans.domain.Transaction;

/**
 * This test will be ran against <code>DispatchServlet</code> directly, that
 * says we must support lookup <code>ApplicationContext</code> from
 * <code>ServletContext</code>, refer to
 * {@link org.springframework.web.context.support.WebApplicationContextUtils}
 * <p>
 * Spring TestContext Framework. If extending from
 * <code>AbstractTransactionalJUnit4SpringContextTests</code>, you don't need to
 * declare <code>@RunWith</code>,
 * <code>TestExecutionListeners(3 default listeners)</code> and
 * <code>@Transactional</code>. Refer to
 * {@link AbstractTransactionalJUnit4SpringContextTests} for more information.
 * <p>
 * Legacy JUnit 3.8 class hierarchy is deprecated. Under new sprint test context
 * framework, a field of property must be annotated with <code>@Autowired</code>
 * or <code>@Resource</code>(<code>@Autowired</code> in conjunction with
 * <code>@Qualifier</code>) explicitly to let spring inject dependency
 * automatically.
 * <p>
 * Reference:
 * <ul>
 * <li>https://jira.springsource.org/browse/SPR-5243</li>
 * <li>
 * http://forum.springsource.org/showthread.php?86124-How -to-register-
 * BeanPostProcessor-programaticaly</li>
 * </ul>
 * 
 * @author Ramon Li
 */

// @RunWith(SpringJUnit4ClassRunner.class)

// Refer to the doc of WebContextLoader.
@ContextConfiguration(loader = WebApplicationContextLoader.class, locations = { "spring/spring-core.xml",
        "spring/spring-core-dao.xml", "spring/game/spring-raffle.xml", "spring/game/spring-ig.xml",
        "spring/game/spring-extraball.xml", "spring/game/spring-lotto.xml", "spring/game/spring-toto.xml",
        "spring/game/spring-lfn.xml", "spring/spring-3rdparty.xml", "spring/game/spring-magic100.xml",
        "spring/game/spring-digital.xml" })
// this annotation defines the transaction manager for each test case.
@TransactionConfiguration(transactionManager = "transactionManager", defaultRollback = true)
// As our TEST extending from AbstractTransactionalJUnit4SpringContextTests,
// below 3 listeners have been registered by default, and it will be inherited
// by subclass.
// @TestExecutionListeners(listeners = {ShardAwareTestExecutionListener.class})
// @Transactional
public class BaseTransactionalIntegrationTest extends AbstractTransactionalJUnit4SpringContextTests {
    private static Log logger = LogFactory.getLog(BaseTransactionalIntegrationTest.class);
    // SPRING DEPENDENCIES
    /**
     * Always auto wire the data source to a javax.sql.DataSource with name
     * 'dataSource' even there are multiple data sources. It means there must be
     * a DataSource bean named 'dataSource' and a
     * <code>PlatformTransactionManager</code> named 'transactionManager'.
     * <p>
     * 
     * @see AbstractTransactionalJUnit4SpringContextTests#setDataSource(javax.sql.DataSource)
     */
    @PersistenceContext(unitName = "lottery_te")
    protected EntityManager entityManager;

    /**
     * do something if want configure test case when initialization.
     */
    public BaseTransactionalIntegrationTest() {
        // initialize MLottery context.
        MLotteryContext.getInstance();
        // enable HASP license
        this.enableLicense();
    }

    // run once for current test suite.
    @BeforeClass
    public static void beforeClass() {
        logger.trace("@BeforeClass:beforeClass()");
    }

    /**
     * logic to verify the initial state before a transaction is started.
     * <p>
     * The @BeforeTransaction methods declared in superclass will be run after
     * those of the current class. Supported by
     * {@link TransactionalTestExecutionListener}
     */
    @BeforeTransaction
    public void verifyInitialDatabaseState() throws Exception {
        logger.trace("@BeforeTransaction:verifyInitialDatabaseState()");
    }

    /**
     * Set up test data within the transaction.
     * <p>
     * The @Before methods of superclass will be run before those of the current
     * class. No other ordering is defined.
     * <p>
     * NOTE: Any before methods (for example, methods annotated with JUnit 4's
     * <code>@Before</code>) and any after methods (such as methods annotated
     * with JUnit 4's <code>@After</code>) are executed within a transaction.
     */
    @Before
    public void setUpTestDataWithinTransaction() {
        logger.trace("@Before:setUpTestDataWithinTransaction()");
        this.initializeMLotteryContext();
    }

    /**
     * execute "tear down" logic within the transaction.
     * <p>
     * The @After methods declared in superclass will be run after those of the
     * current class.
     */
    @After
    public void tearDownWithinTransaction() {
        logger.trace("@After:tearDownWithinTransaction()");
    }

    /**
     * logic to verify the final state after transaction has rolled back.
     * <p>
     * The @AfterTransaction methods declared in superclass will be run after
     * those of the current class.
     */
    @AfterTransaction
    public void verifyFinalDatabaseState() {
        logger.trace("@AfterTransaction:verifyFinalDatabaseState()");
    }

    @AfterClass
    public static void afterClass() {
        logger.trace("@AfterClass:afterClass()");
    }

    // ----------------------------------------------------------------
    // HELPER METHODS
    // ----------------------------------------------------------------

    protected void initializeMLotteryContext() {
        logger.debug("Retrieve a ApplicationContext(" + this.applicationContext + ").");
        MLotteryContext.getInstance().setBeanFactory(this.applicationContext);
    }

    protected void printMethod() {
        StringBuffer lineBuffer = new StringBuffer("+");
        for (int i = 0; i < 120; i++) {
            lineBuffer.append("-");
        }
        lineBuffer.append("+");
        String line = lineBuffer.toString();

        // Get the test method. If index=0, it means get current method.
        StackTraceElement eles[] = new Exception().getStackTrace();
        // StackTraceElement eles[] = new Exception().getStackTrace();
        // for (StackTraceElement ele : eles){
        // System.out.println("class:" + ele.getClassName());
        // System.out.println("method:" + ele.getMethodName());
        // }
        String className = eles[1].getClassName();
        int index = className.lastIndexOf(".");
        className = className.substring((index == -1 ? 0 : (index + 1)));

        String method = className + "." + eles[1].getMethodName();
        StringBuffer padding = new StringBuffer();
        for (int i = 0; i < line.length(); i++) {
            padding.append(" ");
        }
        logger.info(line);
        String methodSig = (method + padding.toString()).substring(0, line.length() - 3);
        logger.info("| " + methodSig + "|");
        logger.info(line);
    }

    protected void enableLicense() {
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.set(Calendar.YEAR, cal.get(Calendar.YEAR) + 1);
        License.getInstance().setExpireDate(cal);
    }

    protected String uuid() {
        UUID uuid = UUID.randomUUID();
        String uuidStr = uuid.toString();
        return uuidStr.replace("-", "");
    }

    // ----------------------------------------------------------------
    // ASSERTION METHODS
    // ----------------------------------------------------------------

    protected void assertTransaction(Transaction expectedTrans, Transaction actualTrans) {
        assertEquals(expectedTrans.getId(), actualTrans.getId());
        assertEquals(expectedTrans.getGameId(), actualTrans.getGameId());
        assertEquals(expectedTrans.getTotalAmount().doubleValue(),
                actualTrans.getTotalAmount().doubleValue(), 0);
        assertEquals(expectedTrans.getTicketSerialNo(), actualTrans.getTicketSerialNo());
        assertEquals(expectedTrans.getDeviceId(), actualTrans.getDeviceId());
        assertEquals(expectedTrans.getMerchantId(), actualTrans.getMerchantId());
        assertEquals(expectedTrans.getType(), actualTrans.getType());
        assertEquals(expectedTrans.getOperatorId(), actualTrans.getOperatorId());
        assertEquals(expectedTrans.getTraceMessageId(), actualTrans.getTraceMessageId());
        assertEquals(expectedTrans.getResponseCode(), actualTrans.getResponseCode());
    }

    protected void assertTicket(BaseTicket expectTicket, BaseTicket actualTicket) {
        assertEquals(expectTicket.getSerialNo(), actualTicket.getSerialNo());
        assertEquals(expectTicket.getStatus(), actualTicket.getStatus());
        assertEquals(expectTicket.getTotalAmount().doubleValue(),
                actualTicket.getTotalAmount().doubleValue(), 0);
        assertEquals(expectTicket.getMultipleDraws(), actualTicket.getMultipleDraws());
        assertEquals(expectTicket.getMobile(), actualTicket.getMobile());
        assertEquals(expectTicket.getCreditCardSN(), actualTicket.getCreditCardSN());
        assertEquals(expectTicket.getDevId(), actualTicket.getDevId());
        assertEquals(expectTicket.getMerchantId(), actualTicket.getMerchantId());
        assertEquals(expectTicket.getOperatorId(), actualTicket.getOperatorId());
        assertEquals(expectTicket.getTicketFrom(), actualTicket.getTicketFrom());
        assertEquals(expectTicket.getTicketType(), actualTicket.getTicketType());
        assertEquals(expectTicket.getTransType(), actualTicket.getTransType());
        assertEquals(expectTicket.isCountInPool(), actualTicket.isCountInPool());
        assertEquals(expectTicket.getGameInstance().getId(), actualTicket.getGameInstance().getId());
        assertEquals(expectTicket.getPIN(), actualTicket.getPIN());
    }

    protected void assertPayout(Payout exp, Payout actual) {
        assertEquals(exp.getTransaction().getId(), actual.getTransaction().getId());
        assertEquals(exp.getGameId(), actual.getGameId());
        assertEquals(exp.getGameInstanceId(), actual.getGameInstanceId());
        assertEquals(exp.getDevId(), actual.getDevId());
        assertEquals(exp.getMerchantId(), actual.getMerchantId());
        assertEquals(exp.getOperatorId(), actual.getOperatorId());
        assertEquals(exp.getTicketSerialNo(), actual.getTicketSerialNo());
        assertEquals(exp.getBeforeTaxObjectAmount().doubleValue(), actual.getBeforeTaxObjectAmount()
                .doubleValue(), 0);
        assertEquals(exp.getBeforeTaxTotalAmount().doubleValue(), actual.getBeforeTaxTotalAmount()
                .doubleValue(), 0);
        assertEquals(exp.getTotalAmount().doubleValue(), actual.getTotalAmount().doubleValue(), 0);
        assertEquals(exp.getNumberOfObject(), actual.getNumberOfObject());
    }

    // ----------------------------------------------------------------
    // SPRINT DEPENDENCIES INJECTION
    // ----------------------------------------------------------------

    public EntityManager getEntityManager() {
        return entityManager;
    }

    public void setEntityManager(EntityManager entityManager) {
        this.entityManager = entityManager;
    }
}

What happen if one single test method make 2 separated requests?

In my project, there is a services named 'sell' for client to make a sale, and a corresponding service named 'enquiry' to query that sale.

Now we plan to test the service 'enquiry' and write a test case named 'testEnquiry'. ok, how do we prepare the test data of a sale which will be quired? There are at least 2 options.

Prepare test data and import them into database before running test

By this mean, there are possibilities that your prepared test data doesn't meet the specification of service 'sell'. That says you prepared test data may write a column named 'gameId', however 'sell' service won't write that column. In such case, your test case will pass, however in production environment, the 'enquiry' service will fail.

Call 'sell' service in 'testEnquiry' method

The pseud code seem as below:
public static void testEnquiry(){
    callSellService();
    callCancelService();
    //assert ouput
}
The callSellService() and callEnquiryService() are in same single transaction. Here I will give a real case in my project. The callSellService() will generates tickets(List), and callEnquiryService() will query tickets generated by sale service, then marshell it into xml.
What makes me surprise is that the tickets entities retrieved by callEnquiryService() are same with tickets entities generated by callSellService(). I mean they are same java object, not only the same fields/properties.
However in production, may fields in tickets retrieved by callEnquiryService() are missed, as in production environemnt callSellService() and callEnquiryService() are completely different 2 transactions.

Which option is better? Or 3rd option?


I prefer the 2nd option, prepare test data by real transactions. Then how to face its problem? after some research, the solution is simple and effect.
public static void testEnquiry(){
    callSellService();
    this.entityManager.flush();
    this.entityManager.clear();
    callCancelService();
    //assert ouput
}
  • this.entityManager.flush() will flush all entity state to underlying database. This must be  called, otherwise all change of entity will be lost.
  • this.entityManager.clear() will clear all entities, and make them in detached state, then any subsequent call to entity manager will new entity.

May DBUnit is another choice, however that means I have to convert my sql script into xml, oh, that is a big challenge.

Monday, December 24, 2012

Prepare to learn Groovy

Plan to learn Groovy, as there are 2 great frameworks written in groovy, Gradle and Grails. Besides if Grinder will support groovy, it will be a great plus.

Gradle
Gradle is a next-generation build tool, in my opinion, it will replace Maven and Ant.
Below is my build.gradle for one of my projects:

  1 apply plugin: 'war'
  2 // 'war' plugin will apply 'java' plugin automatically
  3 apply plugin: 'java'
  4 apply plugin: 'eclipse'
  5 // run web application
  6 apply plugin: 'jetty'
  7 
  8 /**
  9  * Gradle includes 2 phases: configuration and execution, refer to
 10  * http://stackoverflow.com/questions/11558824/gradle-tar-task-not-executed.
 11  *
 12  * - CONFIGURATION PHASE
 13  * In configuration phase, all codes except doFirst() and doLast() will be executed from top to bottom of the script.
 14  * The 'dependsOn' doesn't make any sense in configration phase, for example, the 'jar' and 'svnrev' tasks, if we put
 15  * 'svnrev' after 'jar', then variable 'svnrev.lastRev' can't be parsed at 'jar' task, as it completely hasn't been
 16  * initilized at all.
 17  *
 18  * - EXECUTION PHASE
 19  * In execution phase, the dependency mechanism of task will work. Be reminded that only doFirst() and doLast() will be
 20  * executed at execution phase, and gradle will finished configuration of whose script first to initialize and determine
 21  * what tasks should be executed and what is the execution order.
 22  */
 23 
 24 logger.quiet(">> Start building of $_name.$version.")
 25 /**
 26  * SHIT, here you can't give a statement like 'compileJava.options.encoding = $_sourcecode_encoding', if do so,
 27  * "Could not find property '$_sourcecode_encoding'" will be thrown out. '$_encoding' can't only be used in String?
 28  */
 29 compileJava.options.encoding = _sourcecode_encoding
 30 compileTestJava.options.encoding = _sourcecode_encoding
 31 // Define a temporary variable.
 32 //_tmp="define a temporary variable"
 33 //logger.quiet(">> Define a temporary variable: _tmp: $_tmp")
 34 
 35 // Properties added by the java plugin
 36 sourceCompatibility="1.6"
 37 targetCompatibility="1.6"
 38 //Properties added by the war plugin
 39 webAppDirName="src/main/WWW"
 40 
 41 configurations {
 42     provided {
 43         description = 'Non-exported comiple-time dependencies.'
 44     }
 45 }
 46 
 47 /**
 48  * In Gradle dependencies are grouped into configurations, and there are 4 pre-defined configuration:
 49  * compile, runtime, testCompile and testRuntime. In general dependencies of the later one will contain previous one.
 50  */
 51 dependencies {
 52     // configurationName dependencyNotation1, dependencyNotation2, ...
 53     // compile group: 'commons-collections', name: 'commons-collections', version: '3.2'
 54 
 55     provided files('lib/DEV/j2ee/servlet-api.jar')
 56     compile fileTree(dir: 'lib', include: '**/*.jar', exclude: 'DEV/**/*.jar')
 57     /**
 58      * Below dependency will result in a exception:
 59      *      Circular dependency between tasks. Cycle includes [task ':compileJava', task ':classes'].
 60      * As sourceSets.main.output is generated by task 'compileJava', however if we declare the dependency here, it means task
 61      * 'compileJava' will depend on this file too, then a circular dependency occurs.
 62      */
 63     //compile sourceSets.main.output
 64     testCompile fileTree(dir:"lib", include:"DEV/**/*.jar")
 65 }
 66 
 67 sourceSets {
 68     /**
 69      * The Java plugin defines two standard source sets, called main and test.
 70      * Changing the project layout, the default project layout is as below:
 71      *  - src/main/java   Production Java source
 72      *  - src/main/resources  Production resources
 73      *  - src/test/java   Test Java source
 74      *  - src/test/resources  Test resources
 75      *  - src/sourceSet/java  Java source for the given source set
 76      *  - src/sourceSet/resources Resources for the given source set
 77      * Refer to http://gradle.org/docs/current/userguide/java_plugin.html and
 78      * http://gradle.org/docs/current/dsl/org.gradle.api.tasks.SourceSet.html for more information.
 79      */
 80     main {
 81         compileClasspath = compileClasspath + configurations.provided
 82         //compileClasspath.collect().each({println it})
 83         resources {
 84             srcDir 'src/main/resource'
 85         }
 86     }
 87     test {
 88         java {
 89             srcDir 'src/test/unittest'
 90             // integration test needs database, and need to import test data first.
 91             srcDir 'src/test/integration'
 92         }
 93         resources {
 94             srcDir 'src/test/resource'
 95         }
 96     }
 97 }
 98 
 99 // Retrieve the last revision of project, refer to http://stackoverflow.com/questions/9044354/checkout-svn-using-gradle.
100 task svnrev {
101     // use ant to retrieve revision.
102     ant.taskdef(resource: 'org/tigris/subversion/svnant/svnantlib.xml') {
103         classpath {
104             fileset(dir: 'lib/DEV/svnant-1.2.1', includes: '*.jar')
105         }
106     }
107     ant.svn(javahl: 'false', svnkit: 'true', username: "${_svn_user}", password: "${_svn_password}", failonerror: 'false') {
108         ant.info(target: "${_svn_source_url}", propPrefix: 'svninfo')
109     }
110     // retrieve property of ant project and assign it to a task's property, refer to:
111     // http://gradle.1045684.n5.nabble.com/can-t-find-or-extract-properties-from-svnant-info-function-in-gradle-td3335388.html
112     ext.lastRev = ant.getProject().properties['svninfo.lastRev']
113     // retrieve property of gradle project
114     //getProject().properties['buildFile']
115 }
116 
117 import org.gradle.api.java.archives.internal.DefaultManifest
118 import org.gradle.api.internal.file.IdentityFileResolver
119 task generateManifest {
120     // define a task's property
121     ext.m = new DefaultManifest(new IdentityFileResolver())
122 
123     // add some attributes
124     m.attributes([
125         'Implementation-Title':"$_name",
126         'Implementation-Version':"${version}_${svnrev.lastRev}",
127         'Implementation-Vendor':"$_company",
128         'Created-By' : _team,
129         'Build-Time' : new Date()
130     ])
131     //manifest.writeTo('build/mymanifest.mf')
132 }
133 war.dependsOn 'generateManifest'
134 
135 war {
136     archiveName = _name + ".war"
137     manifest = generateManifest.m
138 }
139 
140 // Define a global variable
141 def user_tag
142 
143 task svntag <<{
144     def console = System.console()
145     if (console) {
146         user_tag = console.readLine("> Please enter your tag(${version}): ")
147         if (!user_tag) {
148             logger.error "Please give a tag definition."
149             System.exit(0)
150         }
151     } else {
152         logger.error "Cannot get console."
153     }
154     /**
155      * We must define below logic in doFirst/Last(), otherwise it will try to make a tag each time as it is in configuration phase.
156      */
157     ant.svn(javahl: 'false', svnkit: 'true', username: "${_svn_user}", password: "${_svn_password}", failonerror: 'false') {
158         ant.copy(srcurl: "${_svn_source_url}", desturl:"${_svn_tag_url}/${user_tag}", message:"Create tag: ${_svn_tag_url}/${user_tag}")
159     }
160 }
161 
162 task dist(type: Zip) {
163     description = "Build a distribution package containing war and shell script."
164 
165     archiveName = _name + "_v${version}.zip"
166 
167     // if use include, gradle will inform 'Skipping task ':zip' as it has no source files'...why?
168     // include 'config'
169 
170     from('.') {
171         include 'README.txt'
172         include 'CHANGELOG.txt'
173     }
174     from war.destinationDir
175     into('bin') {
176         from('bin'){
177             include 'app_backup.sh'
178             include 'gen-keypair.sh'
179             include 'gen-serialkey.sh'
180         }
181     }
182     from 'etc/manual'
183 
184     doLast {
185         //print 'source of zip:' + project['zip'].source.each({println it})
186     }
187 }
188 
189 

Grails
Grails is competing with Play!, however after trying a period time of scala, it makes me dismayed and indeed its learning curve is really hard. At present, I haven't even give Groovy a simple try, my only experience is gradle build script, what makes me dizzy is its closure. Maybe after put more effort to learn Groovy, those closures will be more friendly to me.

Grinder
A great distributed performance testing tool, at present it only support jython and closure. Python is my 2nd language, in fact just knew more than scala than groovy. I have found that someone also looking for the groovy supports of grinder, and has put real effort to implement it.

Tuesday, April 17, 2012

StatsD and Graphite

I have been kept finding a monitor solution a couple time. There are several solutions exist, including nagios, ganglia. Ganglia offers a monitoring solution, while Nagios offers a alerting solution too. Both Ganglia and Nagios seem a little complicated, and need a considerable effort to configure. Be honest that I am not familiar with Nagios and Ganglia, and just give a quick look on both of them.

After reading the post Measure Anything, Measure Everything from Esty, I started to have a try on statsd and graphite. Statsd is very simple to setup and easy to integrate with system, but Graphite really took me a very hard time to setup. As at first I tried to set up Graphite on ubuntu8.04, it failed finally due to python is not compitable with py2cario.

After re-installed ubuntu10.4 server, and followed the instruction of how to install graphite on ubuntu, both statsd and Graphite startuped successfully.

The latest version of graphite is 0.9.9, and lacking of document, i am looking forward its 1.0 which planned to release at 2012-01-01 and hugely delayed from its timeline.

Statsd is very simple, it is a daemon of node.js, and its stats.js contains only about 300 lines code. By reveiwing its code, you can fully understand the concept introduced in Measure Anything, Measure Everything, especially counter and timing.

Now it is time to run a test.

1. Launch Graphite(by default graphite will be installed to /opt/graphite).
> sudo /etc/init.d/apache2 restart # startup apache(graphite-web).
> cd /opt/graphite/> sudo ./bin/carbon-cache.py start # startup carbon to receive incoming stats.

2. Launch Statsd
> cd $STATSD_HOME
> sudo cp exampleConfig.js myconfig.js # change the configuration according to your requirement
> sudo node stats.js myconfig.js

3. Run statsd client(actually there is a java client)
> java Main

import java.util.Random;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;

public class Main{
  public static void main(String args[]) throws Exception{
    DatagramSocket socket = new DatagramSocket();
    while(true) {
      // send counter stats
      int c = new Random().nextInt(10);
      String data = "ramoncounter:" + c + "|c";
      byte[] d = data.getBytes();
      socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
      System.out.println("[Send] " + data + "...Sleep 10 seconds!");

      // send timer stats
      data = "";
      data += "ramontimer:" + new Random().nextInt(30) + "|ms";
      d = data.getBytes();
      socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
      System.out.println("[Send] " + data + "...Sleep 10 seconds!");

      Thread.sleep(10 * 1000);
    }
  }
}

Now open web browser and access http://GRAPHITE_HOST, you will be welcomed be graphite web UI.

How graphite maintain the incoming data points?

This is a big topic, graphite includes 3 components:
  1. carbon - a Twisted daemon that listens for time-series data.
  2. whisper - a simple database library for storing time-series data (similar in design to RRD)
  3. graphite webapp - A Django webapp that renders graphs on-demand using Cairo
Whisper is the data storage engine, and it contain one or more archives, each with a specific data resolution and retention (defined in number of points or max timestamp age). Archives are ordered from the highest-resolution and shortest retention archive to the lowest-resolution and longest retention period archive.

As the official document is so limited, and lacks of examples, it is hard to really understand how whisper works. I modified whisper.py by outputting more log messages to understand its mechanism.

The main functions of whisper.py are:

  • def create(path,archiveList,xFilesFactor=None,aggregationMethod=None)
  • def update(path,value,timestamp=None)
  • def fetch(path,fromTime,untilTime=None)
  • def info(path) - get header information.

By running the test client(Main.class), I have got a ramoncounter.wsp file which contains all data points of the given metrics. Its header information is as below:

maxRetention: 157784400
xFilesFactor: 0.5
aggregationMethod: average
fileSize: 3302620

Archive 0
retention: 21600
secondsPerPoint: 10
points: 2160
size: 25920
offset: 52

Archive 1
retention: 604800
secondsPerPoint: 60
points: 10080
size: 120960
offset: 25972

Archive 2
retention: 157784400
secondsPerPoint: 600
points: 262974
size: 3155688
offset: 146932

The amended whisper.py:
  1 #!/usr/bin/env python
  2 # Copyright 2008 Orbitz WorldWide
  3 #
  4 # Licensed under the Apache License, Version 2.0 (the "License");
  5 # you may not use this file except in compliance with the License.
  6 # You may obtain a copy of the License at
  7 #
  8 #   http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 # Unless required by applicable law or agreed to in writing, software
 11 # distributed under the License is distributed on an "AS IS" BASIS,
 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 # See the License for the specific language governing permissions and
 14 # limitations under the License.
 15 #
 16 #
 17 # This module is an implementation of the Whisper database API
 18 # Here is the basic layout of a whisper data file
 19 #
 20 # File = Header,Data
 21 #       Header = Metadata,ArchiveInfo+
 22 #               Metadata = aggregationType,maxRetention,xFilesFactor,archiveCount
 23 #               ArchiveInfo = Offset,SecondsPerPoint,Points
 24 #       Data = Archive+
 25 #               Archive = Point+
 26 #                       Point = timestamp,value
 27 
 28 import os, struct, time, logging
 29 
 30 logging.basicConfig(filename="whisper.log",level=logging.DEBUG)
 31 
 32 try:
 33   import fcntl
 34   CAN_LOCK = True
 35 except ImportError:
 36   CAN_LOCK = False
 37 
 38 LOCK = False
 39 CACHE_HEADERS = False
 40 AUTOFLUSH = False
 41 __headerCache = {}
 42 
 43 longFormat = "!L"
 44 longSize = struct.calcsize(longFormat)
 45 floatFormat = "!f"
 46 floatSize = struct.calcsize(floatFormat)
 47 valueFormat = "!d"
 48 valueSize = struct.calcsize(valueFormat)
 49 pointFormat = "!Ld"
 50 pointSize = struct.calcsize(pointFormat)
 51 metadataFormat = "!2LfL"
 52 metadataSize = struct.calcsize(metadataFormat)
 53 archiveInfoFormat = "!3L"
 54 archiveInfoSize = struct.calcsize(archiveInfoFormat)
 55 
 56 aggregationTypeToMethod = dict({
 57   1: 'average',
 58   2: 'sum',
 59   3: 'last',
 60   4: 'max',
 61   5: 'min'
 62 })
 63 aggregationMethodToType = dict([[v,k] for k,v in aggregationTypeToMethod.items()])
 64 aggregationMethods = aggregationTypeToMethod.values()
 65 
 66 debug = startBlock = endBlock = lambda *a,**k: None
 67 
 68 UnitMultipliers = {
 69   's' : 1,
 70   'm' : 60,
 71   'h' : 60 * 60,
 72   'd' : 60 * 60 * 24,
 73   'y' : 60 * 60 * 24 * 365,
 74 }
 75 
 76 
 77 def parseRetentionDef(retentionDef):
 78   (precision, points) = retentionDef.strip().split(':')
 79 
 80   if precision.isdigit():
 81     precisionUnit = 's'
 82     precision = int(precision)
 83   else:
 84     precisionUnit = precision[-1]
 85     precision = int( precision[:-1] )
 86 
 87   if points.isdigit():
 88     pointsUnit = None
 89     points = int(points)
 90   else:
 91     pointsUnit = points[-1]
 92     points = int( points[:-1] )
 93 
 94   if precisionUnit not in UnitMultipliers:
 95     raise ValueError("Invalid unit: '%s'" % precisionUnit)
 96 
 97   if pointsUnit not in UnitMultipliers and pointsUnit is not None:
 98     raise ValueError("Invalid unit: '%s'" % pointsUnit)
 99 
100   precision = precision * UnitMultipliers[precisionUnit]
101 
102   if pointsUnit:
103     points = points * UnitMultipliers[pointsUnit] / precision
104 
105   return (precision, points)
106 
107 class WhisperException(Exception):
108     """Base class for whisper exceptions."""
109 
110 
111 class InvalidConfiguration(WhisperException):
112     """Invalid configuration."""
113 
114 
115 class InvalidAggregationMethod(WhisperException):
116     """Invalid aggregation method."""
117 
118 
119 class InvalidTimeInterval(WhisperException):
120     """Invalid time interval."""
121 
122 
123 class TimestampNotCovered(WhisperException):
124     """Timestamp not covered by any archives in this database."""
125 
126 class CorruptWhisperFile(WhisperException):
127   def __init__(self, error, path):
128     Exception.__init__(self, error)
129     self.error = error
130     self.path = path
131 
132   def __repr__(self):
133     return "<CorruptWhisperFile[%s] %s>" % (self.path, self.error)
134 
135   def __str__(self):
136     return "%s (%s)" % (self.error, self.path)
137 
138 def enableDebug():
139   global open, debug, startBlock, endBlock
140   class open(file):
141     def __init__(self,*args,**kwargs):
142       file.__init__(self,*args,**kwargs)
143       self.writeCount = 0
144       self.readCount = 0
145 
146     def write(self,data):
147       self.writeCount += 1
148       debug('WRITE %d bytes #%d' % (len(data),self.writeCount))
149       return file.write(self,data)
150 
151     def read(self,bytes):
152       self.readCount += 1
153       debug('READ %d bytes #%d' % (bytes,self.readCount))
154       return file.read(self,bytes)
155 
156   def debug(message):
157     print 'DEBUG :: %s' % message
158 
159   __timingBlocks = {}
160 
161   def startBlock(name):
162     __timingBlocks[name] = time.time()
163 
164   def endBlock(name):
165     debug("%s took %.5f seconds" % (name,time.time() - __timingBlocks.pop(name)))
166 
167 
168 def __readHeader(fh):
169   info = __headerCache.get(fh.name)
170   if info:
171     return info
172 
173   originalOffset = fh.tell()
174   fh.seek(0)
175   packedMetadata = fh.read(metadataSize)
176 
177   try:
178     (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
179   except:
180     raise CorruptWhisperFile("Unable to read header", fh.name)
181 
182   archives = []
183 
184   for i in xrange(archiveCount):
185     packedArchiveInfo = fh.read(archiveInfoSize)
186     try:
187       (offset,secondsPerPoint,points) = struct.unpack(archiveInfoFormat,packedArchiveInfo)
188     except:
189       raise CorruptWhisperFile("Unable to read archive %d metadata" % i, fh.name)
190 
191     archiveInfo = {
192       'offset' : offset,
193       'secondsPerPoint' : secondsPerPoint,
194       'points' : points,
195       'retention' : secondsPerPoint * points,
196       'size' : points * pointSize,
197     }
198     archives.append(archiveInfo)
199 
200   fh.seek(originalOffset)
201   info = {
202     'aggregationMethod' : aggregationTypeToMethod.get(aggregationType, 'average'),
203     'maxRetention' : maxRetention,
204     'xFilesFactor' : xff,
205     'archives' : archives,
206   }
207   if CACHE_HEADERS:
208     __headerCache[fh.name] = info
209 
210   return info
211 
212 
213 def setAggregationMethod(path, aggregationMethod):
214   """setAggregationMethod(path,aggregationMethod)
215 
216 path is a string
217 aggregationMethod specifies the method to use when propogating data (see ``whisper.aggregationMethods``)
218 """
219   fh = open(path,'r+b')
220   if LOCK:
221     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
222 
223   packedMetadata = fh.read(metadataSize)
224 
225   try:
226     (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
227   except:
228     raise CorruptWhisperFile("Unable to read header", fh.name)
229 
230   try:
231     newAggregationType = struct.pack( longFormat, aggregationMethodToType[aggregationMethod] )
232   except KeyError:
233     raise InvalidAggregationMethod("Unrecognized aggregation method: %s" %
234           aggregationMethod)
235 
236   fh.seek(0)
237   fh.write(newAggregationType)
238 
239   if AUTOFLUSH:
240     fh.flush()
241     os.fsync(fh.fileno())
242 
243   if CACHE_HEADERS and fh.name in __headerCache:
244     del __headerCache[fh.name]
245 
246   fh.close()
247 
248   return aggregationTypeToMethod.get(aggregationType, 'average')
249 
250 
251 def validateArchiveList(archiveList):
252   """ Validates an archiveList. archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
253   An ArchiveList must:
254   1. Have at least one archive config. Example: (60, 86400) 
255   2. No archive may be a duplicate of another. 
256   3. Higher precision archives' precision must evenly divide all lower precision archives' precision.
257   4. Lower precision archives must cover larger time intervals than higher precision archives.
258 
259   Returns True or False
260   """
261 
262   try:
263     if not archiveList:
264       raise InvalidConfiguration("You must specify at least one archive configuration!")
265 
266     archiveList.sort(key=lambda a: a[0]) #sort by precision (secondsPerPoint)
267 
268     for i,archive in enumerate(archiveList):
269       if i == len(archiveList) - 1:
270         break
271 
272       next = archiveList[i+1]
273       if not (archive[0] < next[0]):
274         raise InvalidConfiguration("You cannot configure two archives "
275           "with the same precision %s,%s" % (archive,next))
276 
277       if (next[0] % archive[0]) != 0:
278         raise InvalidConfiguration("Higher precision archives' precision "
279           "must evenly divide all lower precision archives' precision %s,%s" \
280           % (archive[0],next[0]))
281 
282       retention = archive[0] * archive[1]
283       nextRetention = next[0] * next[1]
284 
285       if not (nextRetention > retention):
286         raise InvalidConfiguration("Lower precision archives must cover "
287           "larger time intervals than higher precision archives %s,%s" \
288           % (archive,next))
289 
290   except:
291         # RAMON: no exceptions will be thrown out???
292     return False
293   return True
294 
295 def create(path,archiveList,xFilesFactor=None,aggregationMethod=None):
296   """create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average')
297 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
298 path is a string
299 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
300 xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur
301 aggregationMethod specifies the function to use when propogating data (see ``whisper.aggregationMethods``)
302 """
303   # Set default params
304   if xFilesFactor is None:
305     xFilesFactor = 0.5
306   if aggregationMethod is None:
307     aggregationMethod = 'average'
308 
309   #Validate archive configurations...
310   validArchive = validateArchiveList(archiveList)
311   if not validArchive:
312     raise InvalidConfiguration("There was a problem creating %s due to an invalid schema config." % path)
313 
314   #Looks good, now we create the file and write the header
315   if os.path.exists(path):
316     raise InvalidConfiguration("File %s already exists!" % path)
317 
318   fh = open(path,'wb')
319   if LOCK:
320     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
321 
322   aggregationType = struct.pack( longFormat, aggregationMethodToType.get(aggregationMethod, 1) )
323   # for example a archieve list, [[10,2160],[60,10080],[600,262974]]
324   oldest = sorted([secondsPerPoint * points for secondsPerPoint,points in archiveList])[-1]
325   # the retention of [600,262974] is 600*262974
326   maxRetention = struct.pack( longFormat, oldest )
327   # xFilesFactor = 0.5
328   xFilesFactor = struct.pack( floatFormat, float(xFilesFactor) )
329   # archiveCount = 3
330   archiveCount = struct.pack(longFormat, len(archiveList))
331   packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
332   fh.write(packedMetadata)
333   # !2LfL means L+L+f+L
334   # Metadata    !2LfL   aggregationType,maxRetention,xFilesFactor,archiveCount
335   #     ArchiveInfo     !3L             Offset,SecondsPerPoint,Points
336   headerSize = metadataSize + (archiveInfoSize * len(archiveList))
337   archiveOffsetPointer = headerSize
338 
339   for secondsPerPoint,points in archiveList:
340         # record the start point(offset) of each archieve.
341     archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
342     fh.write(archiveInfo)
343     archiveOffsetPointer += (points * pointSize)
344 
345   # perserver the disk space for all archives.
346   zeroes = '\x00' * (archiveOffsetPointer - headerSize)
347   fh.write(zeroes)
348 
349   if AUTOFLUSH:
350     fh.flush()
351     os.fsync(fh.fileno())
352 
353   fh.close()
354 
355 def __aggregate(aggregationMethod, knownValues):
356   if aggregationMethod == 'average':
357     return float(sum(knownValues)) / float(len(knownValues))
358   elif aggregationMethod == 'sum':
359     return float(sum(knownValues))
360   elif aggregationMethod == 'last':
361     return knownValues[len(knownValues)-1]
362   elif aggregationMethod == 'max':
363     return max(knownValues)
364   elif aggregationMethod == 'min':
365     return min(knownValues)
366   else:
367     raise InvalidAggregationMethod("Unrecognized aggregation method %s" %
368             aggregationMethod)
369 
370 def __propagate(fh,header,timestamp,higher,lower):
371   aggregationMethod = header['aggregationMethod']
372   xff = header['xFilesFactor']
373 
374   # guarantee the timestamp can be evenly divided by secondsPerPoint...but 
375   # it will lose data precision, right??
376   lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
377   logging.info("timestamp is %d" % timestamp)
378   logging.info("lowerIntervalStart is %d" % lowerIntervalStart)
379   lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
380   logging.info("lowerIntervalEnd is %d" % lowerIntervalEnd)
381 
382   fh.seek(higher['offset'])
383   packedPoint = fh.read(pointSize)
384   (higherBaseInterval,higherBaseValue) = struct.unpack(pointFormat,packedPoint)
385   logging.info("higherBaseInterval,higherBaseValue is %d,%d" % (higherBaseInterval,higherBaseValue))
386 
387   if higherBaseInterval == 0:
388     higherFirstOffset = higher['offset']
389   else:
390     timeDistance = lowerIntervalStart - higherBaseInterval
391     pointDistance = timeDistance / higher['secondsPerPoint']
392     logging.info("higher['secondsPerPoint'] is %d" % higher['secondsPerPoint'])
393     byteDistance = pointDistance * pointSize
394         # higher[:higherFirstOffset] the first data point till the data point which timestamp matches the given timestamp
395     higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
396     logging.info("higherFirstOffset is %d" % higherFirstOffset)
397 
398   higherPoints = lower['secondsPerPoint'] / higher['secondsPerPoint']
399   higherSize = higherPoints * pointSize
400   relativeFirstOffset = higherFirstOffset - higher['offset']
401   logging.info("relativeFirstOffset is %d" % relativeFirstOffset)
402   relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
403   logging.info("relativeLastOffset is %d" % relativeLastOffset)
404   higherLastOffset = relativeLastOffset + higher['offset']
405   logging.info("higherLastOffset is %d" % relativeLastOffset)
406   fh.seek(higherFirstOffset)
407 
408   if higherFirstOffset < higherLastOffset: #we don't wrap the archive
409     seriesString = fh.read(higherLastOffset - higherFirstOffset)
410   else: #We do wrap the archive..round robin
411     higherEnd = higher['offset'] + higher['size']
412     seriesString = fh.read(higherEnd - higherFirstOffset)
413     fh.seek(higher['offset'])
414     seriesString += fh.read(higherLastOffset - higher['offset'])
415 
416   #Now we unpack the series data we just read
417   byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
418   points = len(seriesString) / pointSize
419   logging.info("points is %d" % points)
420   seriesFormat = byteOrder + (pointTypes * points)
421   unpackedSeries = struct.unpack(seriesFormat, seriesString)
422   logging.info("unpackedSeries is %s" % str(unpackedSeries))
423 
424   #And finally we construct a list of values
425   neighborValues = [None] * points
426   currentInterval = lowerIntervalStart
427   logging.info("currentInterval is %d" % currentInterval)
428   step = higher['secondsPerPoint']
429   logging.info("step is %d" % step)
430 
431   # the value of unpackedSeries will like below:
432   # (1334806980, 0.29999999999999999, 1334806990, 0.20000000000000001, 1334828600, 12.0, 1334807010, 0.29999999999999999, 1334807020, 0.5)
433   # and the xrange(0,len(unpackedSeries),2) will return [0,2,4,6...]
434   for i in xrange(0,len(unpackedSeries),2):
435     pointTime = unpackedSeries[i]
436     if pointTime == currentInterval:
437           # what does this mean???...check above comments.
438       neighborValues[i/2] = unpackedSeries[i+1]
439     currentInterval += step
440   logging.info("neighborValues is %s" % str(neighborValues))
441 
442   #Propagate aggregateValue to propagate from neighborValues if we have enough known points
443   knownValues = [v for v in neighborValues if v is not None]
444   if not knownValues:
445     return False
446   logging.info("knownValues is %s" % str(knownValues))
447 
448   knownPercent = float(len(knownValues)) / float(len(neighborValues))
449   logging.info("knownPercent is %f" % knownPercent)
450   logging.info("xff is %f" % xff)
451   if knownPercent >= xff: #we have enough data to propagate a value!
452     logging.info("aggregationMethod is %s" % str(aggregationMethod))
453     aggregateValue = __aggregate(aggregationMethod, knownValues)
454     logging.info("aggregateValue is %f" % aggregateValue)
455     myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
456     fh.seek(lower['offset'])
457     packedPoint = fh.read(pointSize)
458     (lowerBaseInterval,lowerBaseValue) = struct.unpack(pointFormat,packedPoint)
459 
460         # create or update
461     if lowerBaseInterval == 0: #First propagated update to this lower archive
462       fh.seek(lower['offset'])
463       fh.write(myPackedPoint)
464     else: #Not our first propagated update to this lower archive
465       timeDistance = lowerIntervalStart - lowerBaseInterval
466       pointDistance = timeDistance / lower['secondsPerPoint']
467       byteDistance = pointDistance * pointSize
468       lowerOffset = lower['offset'] + (byteDistance % lower['size'])
469       fh.seek(lowerOffset)
470       fh.write(myPackedPoint)
471 
472     return True
473 
474   else:
475     return False
476 
477 
478 def update(path,value,timestamp=None):
479   """update(path,value,timestamp=None)
480 
481 path is a string
482 value is a float
483 timestamp is either an int or float
484 """
485   value = float(value)
486   fh = open(path,'r+b')
487   return file_update(fh, value, timestamp)
488 
489 
490 def file_update(fh, value, timestamp):
491   if LOCK:
492     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
493 
494   header = __readHeader(fh)
495   now = int( time.time() )
496   if timestamp is None:
497     timestamp = now
498 
499   timestamp = int(timestamp)
500   diff = now - timestamp
501   logging.info("diff(now - timestamp) is %d" % diff)
502   if not ((diff < header['maxRetention']) and diff >= 0):
503     raise TimestampNotCovered("Timestamp not covered by any archives in "
504       "this database.")
505 
506   # for [[10,2160],[60,10080],[600,262974]], [10,2160] is the highest-precision archive, while 
507   # [600,262974] is the lowest-precision archive...the archive list is sorted from highest-precision
508   # to lowest-precision.
509   for i,archive in enumerate(header['archives']): #Find the highest-precision archive that covers timestamp
510     if archive['retention'] < diff: continue
511     lowerArchives = header['archives'][i+1:] #We'll pass on the update to these lower precision archives later
512     break
513 
514   # The scope of variable 'archive' in for loop is beyond the for loop...a little strange feature!
515   # First we update the highest-precision archive
516   myInterval = timestamp - (timestamp % archive['secondsPerPoint'])
517   myPackedPoint = struct.pack(pointFormat,myInterval,value)
518   fh.seek(archive['offset'])
519   packedPoint = fh.read(pointSize)
520   (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
521 
522   if baseInterval == 0: #This file's first update
523     # seek(offset) will reach the absolute position specified by offset.
524     fh.seek(archive['offset'])
525     fh.write(myPackedPoint)
526     baseInterval,baseValue = myInterval,value
527   else: #Not our first update
528     timeDistance = myInterval - baseInterval
529     pointDistance = timeDistance / archive['secondsPerPoint']
530     byteDistance = pointDistance * pointSize
531         # byteDistance % archive['size'] round-robin
532     myOffset = archive['offset'] + (byteDistance % archive['size'])
533     fh.seek(myOffset)
534     fh.write(myPackedPoint)
535 
536   #Now we propagate the update to lower-precision archives
537   higher = archive
538   logging.info("higher archive:" + str(higher))
539   for lower in lowerArchives:
540     if not __propagate(fh, header, myInterval, higher, lower):
541       break
542     higher = lower
543 
544   if AUTOFLUSH:
545     fh.flush()
546     os.fsync(fh.fileno())
547 
548   fh.close()
549 
550 
551 def update_many(path,points):
552   """update_many(path,points)
553 
554 path is a string
555 points is a list of (timestamp,value) points
556 """
557   if not points: return
558   points = [ (int(t),float(v)) for (t,v) in points]
559   points.sort(key=lambda p: p[0],reverse=True) #order points by timestamp, newest first
560   fh = open(path,'r+b')
561   return file_update_many(fh, points)
562 
563 
564 def file_update_many(fh, points):
565   if LOCK:
566     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
567 
568   header = __readHeader(fh)
569   now = int( time.time() )
570   archives = iter( header['archives'] )
571   currentArchive = archives.next()
572   currentPoints = []
573 
574   for point in points:
575     age = now - point[0]
576 
577     while currentArchive['retention'] < age: #we can't fit any more points in this archive
578       if currentPoints: #commit all the points we've found that it can fit
579         currentPoints.reverse() #put points in chronological order
580         __archive_update_many(fh,header,currentArchive,currentPoints)
581         currentPoints = []
582       try:
583         currentArchive = archives.next()
584       except StopIteration:
585         currentArchive = None
586         break
587 
588     if not currentArchive:
589       break #drop remaining points that don't fit in the database
590 
591     currentPoints.append(point)
592 
593   if currentArchive and currentPoints: #don't forget to commit after we've checked all the archives
594     currentPoints.reverse()
595     __archive_update_many(fh,header,currentArchive,currentPoints)
596 
597   if AUTOFLUSH:
598     fh.flush()
599     os.fsync(fh.fileno())
600 
601   fh.close()
602 
603 
604 def __archive_update_many(fh,header,archive,points):
605   step = archive['secondsPerPoint']
606   alignedPoints = [ (timestamp - (timestamp % step), value)
607                     for (timestamp,value) in points ]
608   #Create a packed string for each contiguous sequence of points
609   packedStrings = []
610   previousInterval = None
611   currentString = ""
612   for (interval,value) in alignedPoints:
613     if (not previousInterval) or (interval == previousInterval + step):
614       currentString += struct.pack(pointFormat,interval,value)
615       previousInterval = interval
616     else:
617       numberOfPoints = len(currentString) / pointSize
618       startInterval = previousInterval - (step * (numberOfPoints-1))
619       packedStrings.append( (startInterval,currentString) )
620       currentString = struct.pack(pointFormat,interval,value)
621       previousInterval = interval
622   if currentString:
623     numberOfPoints = len(currentString) / pointSize
624     startInterval = previousInterval - (step * (numberOfPoints-1))
625     packedStrings.append( (startInterval,currentString) )
626 
627   #Read base point and determine where our writes will start
628   fh.seek(archive['offset'])
629   packedBasePoint = fh.read(pointSize)
630   (baseInterval,baseValue) = struct.unpack(pointFormat,packedBasePoint)
631   if baseInterval == 0: #This file's first update
632     baseInterval = packedStrings[0][0] #use our first string as the base, so we start at the start
633 
634   #Write all of our packed strings in locations determined by the baseInterval
635   for (interval,packedString) in packedStrings:
636     timeDistance = interval - baseInterval
637     pointDistance = timeDistance / step
638     byteDistance = pointDistance * pointSize
639     myOffset = archive['offset'] + (byteDistance % archive['size'])
640     fh.seek(myOffset)
641     archiveEnd = archive['offset'] + archive['size']
642     bytesBeyond = (myOffset + len(packedString)) - archiveEnd
643 
644     if bytesBeyond > 0:
645       fh.write( packedString[:-bytesBeyond] )
646       assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString))
647       fh.seek( archive['offset'] )
648       fh.write( packedString[-bytesBeyond:] ) #safe because it can't exceed the archive (retention checking logic above)
649     else:
650       fh.write(packedString)
651 
652   #Now we propagate the updates to lower-precision archives
653   higher = archive
654   lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
655 
656   for lower in lowerArchives:
657     fit = lambda i: i - (i % lower['secondsPerPoint'])
658     lowerIntervals = [fit(p[0]) for p in alignedPoints]
659     uniqueLowerIntervals = set(lowerIntervals)
660     propagateFurther = False
661     for interval in uniqueLowerIntervals:
662       if __propagate(fh, header, interval, higher, lower):
663         propagateFurther = True
664 
665     if not propagateFurther:
666       break
667     higher = lower
668 
669 
670 def info(path):
671   """info(path)
672 
673 path is a string
674 """
675   fh = open(path,'rb')
676   info = __readHeader(fh)
677   fh.close()
678   return info
679 
680 
681 def fetch(path,fromTime,untilTime=None):
682   """fetch(path,fromTime,untilTime=None)
683 
684 path is a string
685 fromTime is an epoch time
686 untilTime is also an epoch time, but defaults to now
687 """
688   fh = open(path,'rb')
689   return file_fetch(fh, fromTime, untilTime)
690 
691 
692 def file_fetch(fh, fromTime, untilTime):
693   header = __readHeader(fh)
694   now = int( time.time() )
695   if untilTime is None:
696     untilTime = now
697   fromTime = int(fromTime)
698   untilTime = int(untilTime)
699 
700   oldestTime = now - header['maxRetention']
701   if fromTime < oldestTime:
702     fromTime = oldestTime
703 
704   if not (fromTime < untilTime):
705     raise InvalidTimeInterval("Invalid time interval")
706   if untilTime > now:
707     untilTime = now
708   if untilTime < fromTime:
709     untilTime = now
710 
711   diff = now - fromTime
712   for archive in header['archives']:
713     if archive['retention'] >= diff:
714       break
715 
716   fromInterval = int( fromTime - (fromTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
717   untilInterval = int( untilTime - (untilTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
718   fh.seek(archive['offset'])
719   packedPoint = fh.read(pointSize)
720   (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
721 
722   if baseInterval == 0:
723     step = archive['secondsPerPoint']
724     points = (untilInterval - fromInterval) / step
725     timeInfo = (fromInterval,untilInterval,step)
726     valueList = [None] * points
727     return (timeInfo,valueList)
728 
729   #Determine fromOffset
730   timeDistance = fromInterval - baseInterval
731   pointDistance = timeDistance / archive['secondsPerPoint']
732   byteDistance = pointDistance * pointSize
733   fromOffset = archive['offset'] + (byteDistance % archive['size'])
734 
735   #Determine untilOffset
736   timeDistance = untilInterval - baseInterval
737   pointDistance = timeDistance / archive['secondsPerPoint']
738   byteDistance = pointDistance * pointSize
739   untilOffset = archive['offset'] + (byteDistance % archive['size'])
740 
741   #Read all the points in the interval
742   fh.seek(fromOffset)
743   if fromOffset < untilOffset: #If we don't wrap around the archive
744     seriesString = fh.read(untilOffset - fromOffset)
745   else: #We do wrap around the archive, so we need two reads
746     archiveEnd = archive['offset'] + archive['size']
747     seriesString = fh.read(archiveEnd - fromOffset)
748     fh.seek(archive['offset'])
749     seriesString += fh.read(untilOffset - archive['offset'])
750 
751   #Now we unpack the series data we just read (anything faster than unpack?)
752   byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
753   points = len(seriesString) / pointSize
754   seriesFormat = byteOrder + (pointTypes * points)
755   unpackedSeries = struct.unpack(seriesFormat, seriesString)
756 
757   #And finally we construct a list of values (optimize this!)
758   valueList = [None] * points #pre-allocate entire list for speed
759   currentInterval = fromInterval
760   step = archive['secondsPerPoint']
761 
762   for i in xrange(0,len(unpackedSeries),2):
763     pointTime = unpackedSeries[i]
764     if pointTime == currentInterval:
765       pointValue = unpackedSeries[i+1]
766       valueList[i/2] = pointValue #in-place reassignment is faster than append()
767     currentInterval += step
768 
769   fh.close()
770   timeInfo = (fromInterval,untilInterval,step)
771   return (timeInfo,valueList)
772 
773 now = int( time.time() - 24*24*60 )
774 print 'update'
775 update("e:/tmp/ramoncounter-2.wsp", 12, now)
776 
777 

Be noticed that ramoncounter.wsp was created at about 4PM 2012/04/18, and kept receiving data points until 10AM 2012/04/19, and I ran 'python whisper.py' at about 5PM 2012/04/19. The last 3 lines of whisper.py will call the update() method which is the most complicated one, and you find the input timestamp parameter is 'int( time.time() - 24*24*60 )', it is necessary, as all data points  is between 4PM 2012/04/18 and 10AM 2012/04/19.

Below is the whisper.log:

INFO:root:diff(now - timestamp) is 34560
INFO:root:higher archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}
INFO:root:timestamp is 1334795580
INFO:root:lowerIntervalStart is 1334795400
INFO:root:lowerIntervalEnd is 1334796000
INFO:root:higherBaseInterval,higherBaseValue is 1334745540,0
INFO:root:higher['secondsPerPoint'] is 60
INFO:root:higherFirstOffset is 35944
INFO:root:relativeFirstOffset is 9972
INFO:root:relativeLastOffset is 10092
INFO:root:higherLastOffset is 10092
INFO:root:points is 10
INFO:root:unpackedSeries is (1334795400, 0.51666666666666672, 1334795460, 0.25, 1334795520, 0.40000000000000008, 1334795580, 12.0, 1334795640, 0.46666666666666673, 1334795700, 0.5, 1334795760, 0.31666666666666665, 1334795820, 0.71666666666666667, 1334795880, 0.56666666666666665, 1334795940, 0.5)
INFO:root:currentInterval is 1334795400
INFO:root:step is 60
INFO:root:neighborValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownPercent is 1.000000
INFO:root:xff is 0.500000
INFO:root:aggregationMethod is average
INFO:root:aggregateValue is 1.623333

It is important to understand how whisper update and propagate the updating to lower-precision archives.
Based on the log, you can find that the first archive found is 'archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}', not 'archive:{'retention': 21600, 'secondsPerPoint': 10, 'points': 2160, 'size': 25920, 'offset': 52}', why?

As we ran 'python whisper.py' at 5PM 2012/04/19, and the timestamp of incoming data point is about 5PM 2012/04/18('int( time.time() - 24*24*60 )'), the time distance is 24 hours(retention is 24*60*60=86400), it is greater that the highest-precision archive(secondsPerPoing:10, retention:21600), so the next archive selected. Just check code for detailed information.

How propagate occurs?
It is better to explain it by a example(graphite official documents really lack of this).
If the input data point is "1334795700, 0.5", whisper will first update archive(secondsPerPoint:10), and then propagate to the other 2 lower-precision archives. Lets focus on how it update archive(secondsPerPoint:60).
1) It will match a definite data point by:

lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])

2) Find the position of timestamp(lowerIntervalStart) in archive(secondsPerPoint:10)
3) Find next 6 data points in  archive(secondsPerPoint:10). why 6 points? secondsPerPoint_of_Lower_archive/secondsPerPoint_of_Higher_archive=60/10=6.
4) Apply the aggregate method to the 6 data points.
5) Update archive(secondsPerPoint:60) by timestamp(lowerIntervalStart) and aggregate value.

---------------------------------------------------
Some terminology of Graphite's Whisper(a round-robin-database)
- data point: A float data value paired with a timestamp in seconds since UNIX Epoch(01-01-1970).
- resolution: The number of seconds per data point. For example in our test, statsd will flush the counter('ramoncounter') every 10 seconds, from the perspective of whisper, it receives 1 data point per 10 seconds, so the resolution is 10/1 = 10seconds. And resolution 10 seconds is higher than a resolution of 60 seconds.